Tornado 源码分析(一)

简介: Tornao 源码分析一,回调管理

tornado.gen.engine 是 tornado v3.0.0之前用于异步方法的。 具体点说如果一个方法内部使用yield 方法返回一个异步调用的结果, 那么这个方法必须使用tornado.gen.engine装饰。 tornado v3.0.0之后被tornado.gen.coroutines取代。

如果没有engine, 异步也是可行的,只不过需要大量callback, 就像Javascript那样。
一个最经典的异步handler如下:

class SyncDemo(web.RequestHandler):                                                                            

  @asynchronous
  def get(self):
      http_client = httpclient.AsyncHTTPClient()
      http_client.fetch("http://www.baidu.com", self.on_fetch)

  def on_fetch(self, response):
      return self.finish(response)

@asynchronous 用来说明这个get方法是异步的, tornado 不在负责finish request, 由handler自己处理。
tornado.httpclient.AsyncHTTPClient.fetch 是进行异步网路请求的方法。其函数声明为:
def fetch(self, request, callback, **kwargs)
可见他有一个callback参数, 当网络请求完成后, 将调用callback方法, 这一切是有ioloop实现的。

使用engine后,代码会更接近顺序代码,没有回调如下:

class AsyncDemo(web.RequestHandler):                                                                           
    @asynchronous
    @engine
    def get(self):
        http_client = httpclient.AsyncHTTPClient()
        response = yield Task(httpclient.fetch, 'http://www.baidu.com')
        self.finish(response.body)

但是在AsyncDemo 的例子中, tornado.httpclient.AsyncHTTPClient.fetch 放到了Task中, 当异步方法完成时, response被直接赋值给了response 。 callback还是存在的, 也是执行的,只不过Task和engine 这两个东西,让代码接近顺序代码了。

原理是这样的:
函数内部如果有yield, 那么调用这个函数讲产生types.Generator实例, 这个函数的调用将由这个生成器控制。

words = ['a', 'b', 'c']

def async_func():
    for i in range(len(words)):
        w = yield i
        print "in_func : ", w

runner = async_func()
yield_value = runner.send(None)
print "out_func : ", yield_value                                                                                 
while 1:
    try:
        yield_value = runner.send(words[yield_value])
        print "out_func : ", yield_value
    except StopIteration:
        break

运行结果如下:

out_func :  0
in_func :  a
out_func :  1
in_func :  b
out_func :  2
in_func :  c

tornado.gen.Runner是一个管理生成器和生成器内部异步函数运行结果的类。被engine装饰的函数, 每当调用都会产生一个生成器,同时tornado都会为他生成一个Runner实例,Runner实例负责在合适的时候调用这个生成器的send方法,把函数执行完。

# tornado v2.4.1 tornado/gen.py
def engine(func):
"""Decorator for asynchronous generators.
.....
"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        runner = None
        def handle_exception(typ, value, tb):
            if runner is not None:
                return runner.handle_exception(typ, value, tb)
            return False
        with ExceptionStackContext(handle_exception) as deactivate:
            gen = func(*args, **kwargs)
            if isinstance(gen, types.GeneratorType):
                runner = Runner(gen, deactivate)
                runner.run()
                return
            assert gen is None, gen
            deactivate()
    return wrapper

Runner在什么时候调用生成器的send方法呢? 这是由回调函数决定的。当回调函数执行时, 我们有了异步函数的执行结果response, 同时有了函数运行体Runner, 由Runner负责send(response)。 这样我们的函数就能向下运行了。 所以需要将Runner和callback关联。 tornao.gen.Task完成了这项任务。

# torando v2.4.1 torando/gen.py
class Task(YieldPoint):
    """Runs a single asynchronous operation.
    ...
    """
    def __init__(self, func, *args, **kwargs):
        assert "callback" not in kwargs
        self.args = args
        self.kwargs = kwargs
        self.func = func

    def start(self, runner):
        self.runner = runner
        self.key = object()
        runner.register_callback(self.key)
        self.kwargs["callback"] = runner.result_callback(self.key)
        self.func(*self.args, **self.kwargs)

    def is_ready(self):
        return self.runner.is_ready(self.key)

    def get_result(self):
        return self.runner.pop_result(self.key)

注意start方法, 首先向Runner注册了一个key, 在Runner内部, func的回调结果会和这个key关联。
然后构造了一个callback函数, Runner.result_callback源码如下:

  # tornado v2.4.1 tornado.gen.Runner.result_callback
    def result_callback(self, key):
        def inner(*args, **kwargs):
            if kwargs or len(args) > 1:
                result = Arguments(args, kwargs)
            elif args:
                result = args[0]
            else:
                result = None
            self.set_result(key, result)
        return inner

    # tornado v2.4.1 tornado.gen.Runner.set_result
    def set_result(self, key, result):
        """Sets the result for ``key`` and attempts to resume the generator."""
        self.results[key] = result
        self.run()

可见,当回调函数运行时, Runner记录了异步函数的回调结果,并和key关联,然后触发了Runner继续往后运行。

Runner.run 方法也就很好理解了

# torando v2.4.1 torndo.gen.Runner.run
def run(self):
    """Starts or resumes the generator, running until it reaches a
    yield point that is not ready.
    """
    if self.running or self.finished:
        return
    try:
        self.running = True
        while True:
            if self.exc_info is None:
                try:
                    if not self.yield_point.is_ready():
                        return
                    next = self.yield_point.get_result()
                except Exception:
                    self.exc_info = sys.exc_info()
            try:
                if self.exc_info is not None:
                    self.had_exception = True
                    exc_info = self.exc_info
                    self.exc_info = None
                    yielded = self.gen.throw(*exc_info)
                else:
                    yielded = self.gen.send(next)
            except StopIteration:
                self.finished = True
                if self.pending_callbacks and not self.had_exception:
                    # If we ran cleanly without waiting on all callbacks
                    # raise an error (really more of a warning).  If we
                    # had an exception then some callbacks may have been
                    # orphaned, so skip the check in that case.
                    raise LeakedCallbackError(
                        "finished without waiting for callbacks %r" %
                        self.pending_callbacks)
                self.deactivate_stack_context()
                self.deactivate_stack_context = None
                return
            except Exception:
                self.finished = True
                raise
            if isinstance(yielded, list):
                yielded = Multi(yielded)
            if isinstance(yielded, YieldPoint):
                self.yield_point = yielded
                try:
                    self.yield_point.start(self)
                except Exception:
                    self.exc_info = sys.exc_info()
            else:
                self.exc_info = (BadYieldError("yielded unknown object %r" % yielded),)
    finally:
        self.running = False

原文出处: http://shaolianbo.github.io/web/tornado/2015/03/01/tornado1

目录
相关文章
|
5月前
|
JSON 前端开发 数据库
17 Tornado - Tornado异步
17 Tornado - Tornado异步
51 1
|
5月前
|
Unix Linux Python
03 Tornado - 入门程序
03 Tornado - 入门程序
33 0
|
5月前
|
调度 Python
16 Tornado - 认识异步
16 Tornado - 认识异步
32 1
|
5月前
|
Ubuntu 应用服务中间件 开发工具
19 Tornado - 部署Tornado
19 Tornado - 部署Tornado
30 0
|
5月前
|
应用服务中间件 数据库 nginx
01 Tornado - 介绍
01 Tornado - 介绍
25 0
|
9月前
|
JSON 网络协议 中间件
Sanic源码剖析
Sanic源码剖析
|
设计模式 API 开发者
为什么选择学习 Sanic 框架
Sanic 称自己既是一个网络框架,也是一个网络服务器。这是什么意思?更重要的是,为什么这很重要?
|
Java Android开发
关于Rxjava的简单使用
本篇只是讲一下Rxjava的简单入门使用,想要详解的请移步其他博主文章,关于RxJava详解的文章网上一大堆,本片文章内容适合小白学习。
130 1
|
消息中间件 存储 前端开发
celery 源码阅读 - 1
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 是一款消息队列工具,可用于处理实时数据以及任务调度。
204 0
|
存储 算法 前端开发
werkzeug源码阅读-下
Werkzeug是一个全面的WSGI Web应用程序库。它最初是WSGI实用程序各种工具的简单集合,现已成为最高级的WSGI实用程序库之一,是Flask背后的项目。
303 0
werkzeug源码阅读-下