scrapy中scrapy_redis分布式内置pipeline源码及其工作原理

  1. 云栖社区>
  2. 博客>
  3. 正文

scrapy中scrapy_redis分布式内置pipeline源码及其工作原理

python之战 2019-03-14 09:22:13 浏览853 评论0

摘要: scrapy_redis分布式实现了一套自己的组件,其中也提供了Redis数据存储的数据管道,位于scrapy_redis.pipelines,这篇文章主要分析器源码及其工作流程,源码如下: from scrapy.

scrapy_redis分布式实现了一套自己的组件,其中也提供了Redis数据存储的数据管道,位于scrapy_redis.pipelines,这篇文章主要分析器源码及其工作流程,源码如下:

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from . import connection, defaults


default_serialize = ScrapyJSONEncoder().encode


class RedisPipeline(object):
    """Pushes serialized item into a redis list/queue

    Settings
    --------
    REDIS_ITEMS_KEY : str
        Redis key where to store items.
    REDIS_ITEMS_SERIALIZER : str
        Object path to serializer function.

    """


    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize)
:

        """Initialize pipeline.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        key : str
            Redis key where to store items.
        serialize_func : callable
            Items serializer function.

        """

        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )

        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        """

        return self.key % {'spider': spider.name}


关于scrapy自定义数据管道在此前文章已经说过详见《scrapy中数据处理的两个模块:Item Pipeline与Exporter》,本篇文章阐述RedisPipeline的实现。

Redis类初始化参数,server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize,其中第一个参数sever是Redis客户端的实例、key是scrapy_defaults默认的配置格式如下:

import redis


# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

PIPELINE_KEY = '%(spider)s:items'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False

serialize_func是序列化Item的函数默认是json.dumps。

那我们对其工作原理就有了大致的认识,pipeline初始化传入一个Redis客户端、一个key、一个序列化函数;继续向下看,from_settings、from_crawler都是读取setting.py文件,完成初始化,核心函数process_item在下。

process_item源码为:

 def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        "
""
        return self.key % {'spider': spider.name}


process_主要实现了一个deferToThread方法,该方法作用是返回一个deferred对象,不过回调函数在另一个线程处理,主要用于数据库/文件读取操作。继续看deferToThread

def deferToThread(f, *args, **kwargs):
    """
    Run a function in a thread and return the result as a Deferred.

    @param f: The function to call.
    @param *args: positional arguments to pass to f.
    @param **kwargs: keyword arguments to pass to f.

    @return: A Deferred which fires a callback with the result of f,
    or an errback with a L{twisted.python.failure.Failure} if f throws
    an exception.
    """

    from twisted.internet import reactor
    return deferToThreadPool(reactor, reactor.getThreadPool(),
                             f, *args, **kwargs)


主要使用twisted.internet的 reactor模式,反应堆(reactor)模式,这种模式在单线程环境中调度多个事件源产生的事件到它们各自的事件处理例程中去,在这里实现一个线程池的效果,达到最后异步写入的效果。

2019-03-12-22_18_10.png

【云栖快讯】阿里巴巴小程序繁星计划,20亿补贴第一弹云应用免费申请,限量从速!  详情请点击

网友评论