tornado聊天室demo.py源码分析

在tornado的源码之中有facebook的工程师写的demo,结构十分的优雅,可以用来学习.这篇文章主要是来分析chatdemo.py,tornado使用长轮询的方式建立聊天室的方式.不足之处,欢迎留言讨论.


chatdemo.py的文件结构如下

在tornado的源码之中有facebook的工程师写的demo,结构十分的优雅,可以用来学习.这篇文章主要是来分析chatdemo.py,tornado使用长轮询的方式建立聊天室的方式.不足之处,欢迎留言讨论.


chatdemo.py的文件结构如下:

1
2
3
4
D:.  
├─chatdemo.py
├─static
└─templates

首先来看chatdemo.py中的内容.

1
2
3
4
5
6
7

class MessageBuffer(object):
def \_\_init\_\_(self):
\# cond is notified whenever the message cache is updated
self.cond = tornado.locks.Condition()
self.cache = \[\]
self.cache\_size = 200

首先是创建了一个messagebuffer,一个用来缓存聊天记录的类,在析构函数中有一个self.cond,它是一个tornado.locks.Condition(),具体看一下locks.condition的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class \_TimeoutGarbageCollector(object):  
"""Base class for objects that periodically clean up timed-out waiters.

Avoids memory leak in a common pattern like:

while True:
yield condition.wait(short\_timeout)
print('looping....')
"""

def \_\_init\_\_(self) -> None:
self.\_waiters = collections.deque() \# type: Deque\[Future\]
self.\_timeouts = 0

def \_garbage\_collect(self) -> None:
\# Occasionally clear timed-out waiters.
self.\_timeouts += 1
if self.\_timeouts > 100:
self.\_timeouts = 0
self.\_waiters = collections.deque(w for w in self.\_waiters if not w.done())
class Condition(\_TimeoutGarbageCollector):
def \_\_init\_\_(self) -> None:
super(Condition, self).\_\_init\_\_()
self.io\_loop = ioloop.IOLoop.current()

def \_\_repr\_\_(self) -> str:
result = "<%s" % (self.\_\_class\_\_.\_\_name\_\_,)
if self.\_waiters:
result += " waiters\[%s\]" % len(self.\_waiters)
return result + ">"

def wait(self, timeout: Union\[float, datetime.timedelta\] = None) -> Awaitable\[bool\]:
"""Wait for \`.notify\`.

Returns a \`.Future\` that resolves \`\`True\`\` if the condition is notified,
or \`\`False\`\` after a timeout.
"""
waiter = Future() \# type: Future\[bool\]
self.\_waiters.append(waiter)
if timeout:

def on\_timeout() -> None:
if not waiter.done():
future\_set\_result\_unless\_cancelled(waiter, False)
self.\_garbage\_collect()

io\_loop = ioloop.IOLoop.current()
timeout\_handle = io\_loop.add\_timeout(timeout, on\_timeout)
waiter.add\_done\_callback(lambda \_: io\_loop.remove\_timeout(timeout\_handle))
return waiter

def notify(self, n: int = 1) -> None:
"""Wake \`\`n\`\` waiters."""
waiters = \[\] \# Waiters we plan to run right now.
while n and self.\_waiters:
waiter = self.\_waiters.popleft()
if not waiter.done(): \# Might have timed out.
n -= 1
waiters.append(waiter)

for waiter in waiters:
future\_set\_result\_unless\_cancelled(waiter, True)

def notify\_all(self) -> None:
"""Wake all waiters."""
self.notify(len(self.\_waiters))

condition使用了_TimeoutGarbageCollector基类,在_TimeoutGarbageCollector中有一个用来存放future对象的self._waiters = collections.deque() # type: Deque[Future]队列,deque是一个双向队列,可以当队列来用,也可以当栈来使用.在conditoin中有一个wait函数,它的作用是等待一个通知,如果condition类被通知,则wait将会返回一个future对象,返回true.当condition.wait()被调用时,将会在condition类中_witer中存放进这个waiter,他是一个future对象.如果超时,IOLoop线程执行add_timeout,添加一个超时回调,请注意,从其他线程调用add_timeout是不安全的. 相反,您必须使用add_callback将控制权转移到IOLoop的线程,然后从那里调用add_timeout . waiter在完成时,可以执行一些回调,在这里执行一个lambda表达式,删除超时任务.on_timeout是一个闭包,当超时时间达到之后,如果waiter还没有set_result,则被添加进入_timeout中.

notify用来进行通知,self._waiters.popleft()从左侧出队,

再来看messagebuffer中的get_message_since:

1
2
3
4
5
6
7
8
9
10
11
12
13

def get\_messages\_since(self, cursor):
"""Returns a list of messages newer than the given cursor.

\`\`cursor\`\` should be the \`\`id\`\` of the last message received.
"""
results = \[\]
for msg in reversed(self.cache):
if msg\["id"\] == cursor:
break
results.append(msg)
results.reverse()
return results

返回一个消息列表.而add_message,在cache中补充新消息,同时进行notify_all.即把消息通知各所有的waiter

1
2
3
4
5
def add\_message(self, message):  
self.cache.append(message)
if len(self.cache) > self.cache\_size:
self.cache = self.cache\[-self.cache\_size :\]
self.cond.notify\_all()

在发送一条新消息的处理函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class MessageNewHandler(tornado.web.RequestHandler):  
"""Post a new message to the chat room."""

def post(self):
message = {"id": str(uuid.uuid4()), "body": self.get\_argument("body")}
\# render\_string() returns a byte string, which is not supported
\# in json, so we must convert it to a character string.
message\["html"\] = tornado.escape.to\_unicode(
self.render\_string("message.html", message=message)
)
if self.get\_argument("next", None):
self.redirect(self.get\_argument("next"))
else:
self.write(message)
global\_message\_buffer.add\_message(message)

global_message_buffer.add_message(message)即将消息进行notify_all.

前端长轮询,tornado对应的处理函数如下,是整个demo的关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MessageUpdatesHandler(tornado.web.RequestHandler):  
"""Long-polling request for new messages.

Waits until new messages are available before returning anything.
"""

async def post(self):
cursor = self.get\_argument("cursor", None)
messages = global\_message\_buffer.get\_messages\_since(cursor)
while not messages:
\# Save the Future returned here so we can cancel it in
\# on\_connection\_close.
self.wait\_future = global\_message\_buffer.cond.wait()
try:
await self.wait\_future
except asyncio.CancelledError:
return
messages = global\_message\_buffer.get\_messages\_since(cursor)
if self.request.connection.stream.closed():
return
self.write(dict(messages=messages))

def on\_connection\_close(self):
self.wait\_future.cancel()

当请求到达之后,先查看cache中是否有未读的消息,如果有的话,只要连接还没有断开,就发送出去.
如果没有,就hold住该请求,进入while.await self.wait_future,到这里异步处理完成,该请求挂在这里,等待self.wait_future产生结果,而不影响其他连接的生产,即不会发生阻塞.
只有当有新消息达到之后,调用MessageNewHandler,执行了global_message_buffer.add_message(message),waiter才会产生set_result,产生返回.