预计阅读本页时间:-
12.1 用Flask和PostgreSQL流化数据
前面一节讨论了掌握数据存储系统有多么重要。这里将展示如何用PostgreSQL的一个高级特性构造一个HTTP事件流系统。
这个小应用的目的是将消息存储在一个SQL表中并通过HTTP REST API提供对这些消息的访问。每个消息由一个整数类型的channel、一个字符串类型的source、一个字符串类型的content组成。创建这个表的代码非常简单,如示例12.1所示。
广告:个人专属 VPN,独立 IP,无限流量,多机房切换,还可以屏蔽广告和恶意软件,每月最低仅 5 美元
示例 12.1 创建message表
CREATE TABLE message (
id SERIAL PRIMARY KEY,
channel INTEGER NOT NULL,
source TEXT NOT NULL,
content TEXT NOT NULL
);
另外还需要做的是序列化这些消息,以便客户端能够实时对它们进行处理。这需要用到PostgreSQL的LISTEN(http://www.postgresql.org/docs/9.2/static/sql-listen.html)和NOTIFY(http://www.postgresql.org/docs/9.2/static/sql-notify.html)功能。这些功能可以监听来自函数的消息,这个函数由用户提供,由PostgreSQL执行,如示例12.2所示。
示例 12.2 notify_on_insert函数
CREATE OR REPLACE FUNCTION notify_on_insert() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('channel_' || NEW.channel,
CAST(row_to_json(NEW) AS TEXT));
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
这会创建一个用pl/pgsql编写的触发器函数,pl/pgsql语言只有PostgreSQL可以理解。需要注意的是,这个函数也可以用其他语言编写,如Python本身,因为PostgreSQL是通过嵌入Python解释器支持pl/python语言的。
函数会执行一个对pg_notify的调用。这是实际发送通知的函数。第一个参数是一个代表一个信道的字符串,第二个参数是携带实际净荷(palyload)的字符串。这里根据channel列在行内的的值来动态定义信道。在这个例子中,净荷是以JSON格式表示的整个行。没错,PostgreSQL原生地就知道如何将行转换为JSON。
我们希望对message表的每一次INSERT操作都发送通知消息,所以需要在这样的事件上出发这个函数,如示例12.3所示。
示例 12.3 notify_on_insert的触发器
CREATE TRIGGER notify_on_message_insert AFTER INSERT ON message
FOR EACH ROW EXECUTE PROCEDURE notify_on_insert();
搞定。这个函数已经插入并且在message表每一次INSERT操作成功后都会被 执行。
可以通过psql中的LISTEN操作检查它是否工作正常:
$ psql
psql (9.3rc1)
SSL connection (cipher: DHE-RSA-AES256-SHA, bits: 256)
Type "help" for help.
mydatabase=> LISTEN channel_1;
LISTEN
mydatabase=> INSERT INTO message(channel, source, content)
mydatabase-> VALUES(1, 'jd', 'hello world');
INSERT 0 1
Asynchronous notification "channel_1" with payload
"{"id":1,"channel":1,"source":"jd","content":"hello world"}"
received from server process with PID 26393.
一旦行被插入,通知就被发送,并且可以通过PostgreSQL客户端进行接收。现在需要做的就是构建Python应用对这个事件进行流化(stream),如示例12.4所示。
示例 12.4 在 Python 中接收通知
import psycopg2
import psycopg2.extensions
import select
conn = psycopg2.connect(database='mydatabase', user='myuser',
password='idkfa', host='localhost')
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
curs.execute("LISTEN channel_1;")
while True:
select.select([conn], [], [])
conn.poll()
while conn.notifies:
notify = conn.notifies.pop()
print("Got NOTIFY:", notify.pid, notify.channel, notify.payload)
上面的代码利用库psycopg2连接PostgreSQL。也可以使用一个提供了抽象层的库,如SQLAlchemy,但是它们都无法提供对PostgreSQLLISTEN/NOTIFY功能的访问。通过访问底层数据库连接去执行代码也是可能的,但是在这个例子中没必要那么做,因为这里并不需要任何ORM库提供的其他功能。
这个程序会在channel_1上进行监听。一旦收到通知则将其打印到屏幕上。如果运行这个程序并向message表中插入一行,则会得到如下输出:
$ python3 listen.py
Got NOTIFY: 28797 channel_1
{"id":10,"channel":1,"source":"jd","content":"hello world"}
现在我们将使用Flask(http://flask.pocoo.org/),一个简单的HTTP微型框架,去构造应用程序。这里将使用由HTML52中定义的Server-Sent Events(http://www.w3.org/TR/2009/ WD-eventsource-20090423/)消息协议,如示例12.5所示。
示例 12.5 Flask 流化应用程序
import flask
import psycopg2
import psycopg2.extensions
import select
app = flask.Flask(__name__)
def stream_messages(channel):
conn = psycopg2.connect(database='mydatabase', user='mydatabase',
password='mydatabase', host='localhost')
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
curs.execute("LISTEN channel_%d;" % int(channel))
while True:
select.select([conn], [], [])
conn.poll()
while conn.notifies:
notify = conn.notifies.pop()
yield "data: " + notify.payload + "\n\n"
@app.route("/message/<channel>", methods=['GET'])
def get_messages(channel):
return flask.Response(stream_messages(channel),
mimetype='text/event-stream')
if __name__ == "__main__":
app.run()
这个应用程序非常简单并且只是为这个例子支持了流化。我们使用Flask将请求路由到GET/message/<channel>,一旦代码被调用,它将以mimetype为text/event-stream的格式进行响应,发回一个生成器函数而非一个字符串。Flask接下来将调用这个函数并在每次生成器生成东西时发送结果。
生成器stream_messages重用了之前写的用来监听PostgreSQL通知的代码。它接收信道Id作为参数,监听这个信道,并生成其有效载荷。记住,我们在触发器函数中用的是PostgreSQL的JSON编码函数,所以从PostgreSQL收到的就是JSON格式的数据,因为发送JSON数据给HTTP客户端没有任何问题,所以无需转换编码。
注意
为简单起见,这个示例应用程序被写在了一个单独的文件中。在一本书中描述一个横跨多个模块的例子有点儿困难。如果这是一个真正的应用程序,那么最好是将存储处理的实现放到一个自己的Python模块中。
现在可以运行这个服务器了:
$ python listen+http.py
* Running on http://127.0.0.1:5000/
在另一个终端中,可以进行连接并在事件进入时对数据进行抽取。在连接时,不会接收收据并且连接保持开放状态。
$ curl -v http://127.0.0.1:5000/message/1
* About to connect() to 127.0.0.1 port 5000 (#0)
* Trying 127.0.0.1...
* Adding handle: conn: 0x1d46e90
* Adding handle: send: 0
* Adding handle: recv: 0
* Curl_addHandleToPipeline: length: 1
* - Conn 0 (0x1d46e90) send_pipe: 1, recv_pipe: 0
* Connected to 127.0.0.1 (127.0.0.1) port 5000 (#0)
> GET /message/1 HTTP/1.1
> User-Agent: curl/
> Host: 127.0.0.1:5000
> Accept: */*
>
但一旦插入一些行到message表中时:
mydatabase=> INSERT INTO message(channel, source, content)
mydatabase-> VALUES(1, 'jd', 'hello world');
INSERT 0 1
mydatabase=> INSERT INTO message(channel, source, content)
mydatabase-> VALUES(1, 'jd', 'it works');
INSERT 0 1
终端上curl运行的位置就会有数据输出:
data: {"id":71,"channel":1,"source":"jd","content":"hello world"}
data: {"id":72,"channel":1,"source":"jd","content":"it works"}
关于这个应用程序的一个朴素的且可以说更轻便的实现3不是通过一个SELECT语句一次次查询是否有新数据插入表内。不过,没有必要在这里展示这样一个推送系统(push system),尽管它比持续地轮询数据库要效率高。