class \_TimeoutGarbageCollector(object): """Base class for objects that periodically clean up timed-out waiters. Avoids memory leak in a common pattern like: while True: yield condition.wait(short\_timeout) print('looping....') """ def \_\_init\_\_(self) -> None: self.\_waiters = collections.deque() \# type: Deque\[Future\] self.\_timeouts = 0 def \_garbage\_collect(self) -> None: \# Occasionally clear timed-out waiters. self.\_timeouts += 1 if self.\_timeouts > 100: self.\_timeouts = 0 self.\_waiters = collections.deque(w for w in self.\_waiters if not w.done()) class Condition(\_TimeoutGarbageCollector): def \_\_init\_\_(self) -> None: super(Condition, self).\_\_init\_\_() self.io\_loop = ioloop.IOLoop.current() def \_\_repr\_\_(self) -> str: result = "<%s" % (self.\_\_class\_\_.\_\_name\_\_,) if self.\_waiters: result += " waiters\[%s\]" % len(self.\_waiters) return result + ">" def wait(self, timeout: Union\[float, datetime.timedelta\] = None) -> Awaitable\[bool\]: """Wait for \`.notify\`. Returns a \`.Future\` that resolves \`\`True\`\` if the condition is notified, or \`\`False\`\` after a timeout. """ waiter = Future() \# type: Future\[bool\] self.\_waiters.append(waiter) if timeout: def on\_timeout() -> None: if not waiter.done(): future\_set\_result\_unless\_cancelled(waiter, False) self.\_garbage\_collect() io\_loop = ioloop.IOLoop.current() timeout\_handle = io\_loop.add\_timeout(timeout, on\_timeout) waiter.add\_done\_callback(lambda \_: io\_loop.remove\_timeout(timeout\_handle)) return waiter def notify(self, n: int = 1) -> None: """Wake \`\`n\`\` waiters.""" waiters = \[\] \# Waiters we plan to run right now. while n and self.\_waiters: waiter = self.\_waiters.popleft() if not waiter.done(): \# Might have timed out. n -= 1 waiters.append(waiter) for waiter in waiters: future\_set\_result\_unless\_cancelled(waiter, True) def notify\_all(self) -> None: """Wake all waiters.""" self.notify(len(self.\_waiters))
def get\_messages\_since(self, cursor): """Returns a list of messages newer than the given cursor. \`\`cursor\`\` should be the \`\`id\`\` of the last message received. """ results = \[\] for msg in reversed(self.cache): if msg\["id"\] == cursor: break results.append(msg) results.reverse() return results
class MessageNewHandler(tornado.web.RequestHandler): """Post a new message to the chat room.""" def post(self): message = {"id": str(uuid.uuid4()), "body": self.get\_argument("body")} \# render\_string() returns a byte string, which is not supported \# in json, so we must convert it to a character string. message\["html"\] = tornado.escape.to\_unicode( self.render\_string("message.html", message=message) ) if self.get\_argument("next", None): self.redirect(self.get\_argument("next")) else: self.write(message) global\_message\_buffer.add\_message(message)
class MessageUpdatesHandler(tornado.web.RequestHandler): """Long-polling request for new messages. Waits until new messages are available before returning anything. """ async def post(self): cursor = self.get\_argument("cursor", None) messages = global\_message\_buffer.get\_messages\_since(cursor) while not messages: \# Save the Future returned here so we can cancel it in \# on\_connection\_close. self.wait\_future = global\_message\_buffer.cond.wait() try: await self.wait\_future except asyncio.CancelledError: return messages = global\_message\_buffer.get\_messages\_since(cursor) if self.request.connection.stream.closed(): return self.write(dict(messages=messages)) def on\_connection\_close(self): self.wait\_future.cancel()