scrapy分布式去重组件源码及其实现过程

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: scrapy_redis在继承scrapy去重组件的基础上覆盖了某些方法,原scrapy去重是基于单机情况下的内部去重,但是分布式是多机条件下的多爬虫协同去重,因此需要让不同及其上的同一个爬虫能够在同一个地方进行去重,这就是Redis的集合。

scrapy_redis在继承scrapy去重组件的基础上覆盖了某些方法,原scrapy去重是基于单机情况下的内部去重,但是分布式是多机条件下的多爬虫协同去重,因此需要让不同及其上的同一个爬虫能够在同一个地方进行去重,这就是Redis的集合。

先看看scrapy_redis 去重组件dupefilter的源码:

import logging
import time

from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint

from .connection import get_redis_from_settings


DEFAULT_DUPEFILTER_KEY = "dupefilter:%(timestamp)s"

logger = logging.getLogger(__name__)


TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.
    This class can also be used with default Scrapy's scheduler.
    """


    logger = logger

    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.
        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.
        """

        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.
        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.
        Parameters
        ----------
        settings : scrapy.settings.Settings
        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.
        """

        server = get_redis_from_settings(settings)
        XXX: This creates one-time key. needed to support to use this
        # class as standalone dupefilter with scrapy's default scheduler
        # if scrapy passes spider on open() method this wouldn't be needed
        TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
        key = DEFAULT_DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.
        Parameters
        ----------
        crawler : scrapy.crawler.Crawler
        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.
        """

        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """Returns True if request was already seen.
        Parameters
        ----------
        request : scrapy.http.Request
        Returns
        -------
        bool
        """

        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)
        return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.
        Parameters
        ----------
        request : scrapy.http.Request
        Returns
        -------
        str
        """

        return request_fingerprint(request)

    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.
        Parameters
        ----------
        reason : str, optional
        """

        self.clear()

    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    def log(self, request, spider):
        """Logs given request.
        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider
        """

        if self.debug:
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request)s"
                   " - no more duplicates will be shown"
                   " (see DUPEFILTER_DEBUG to show all duplicates)")
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False


from_settings、from_crawler方法不用解释,就是读取配置文件连接Redis设置key,关键在request_seen、request_fingerprint这两个方法;request_seen调用self.request_fingerprint进而调用from scrapy.utils.request import request_fingerprint生成request的指纹。

再看scrapy.utils.request 中的request_fingerprint源码:

"""
This module provides some useful functions for working with
scrapy.http.Request objects
"""


from __future__ import print_function
import hashlib
import weakref
from six.moves.urllib.parse import urlunparse

from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str

from w3lib.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached


_fingerprint_cache = weakref.WeakKeyDictionary()
def request_fingerprint(request, include_headers=None):
    """
    Return the request fingerprint.
    The request fingerprint is a hash that uniquely identifies the resource the
    request points to. For example, take the following two urls:
    http://www.example.com/query?id=111&cat=222
    http://www.example.com/query?cat=222&id=111
    Even though those are two different URLs both point to the same resource
    and are equivalent (ie. they should return the same response).
    Another example are cookies used to store session ids. Suppose the
    following page is only accesible to authenticated users:
    http://www.example.com/members/offers.html
    Lot of sites use a cookie to store the session id, which adds a random
    component to the HTTP Request and thus should be ignored when calculating
    the fingerprint.
    For this reason, request headers are ignored by default when calculating
    the fingeprint. If you want to include specific headers use the
    include_headers argument, which is a list of Request headers to include.
    """

    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]


def request_authenticate(request, username, password):
    """Autenticate the given request (in place) using the HTTP basic access
    authentication mechanism (RFC 2617) and the given username and password
    """

    request.headers['Authorization'] = basic_auth_header(username, password)


def request_httprepr(request):
    """Return the raw HTTP representation (as bytes) of the given request.
    This is provided only for reference since it's not the actual stream of
    bytes that will be send when performing the request (that's controlled
    by Twisted).
    """

    parsed = urlparse_cached(request)
    path = urlunparse(('''', parsed.path or '/', parsed.params, parsed.query, ''))
    s = to_bytes(request.method) + b" " + to_bytes(path) + b" HTTP/1.1\r\n"
    s += b"Host: " + to_bytes(parsed.hostname or b'') + b"\r\n"
    if request.headers:
        s += request.headers.to_string() + b"\r\n"
    s += b"\r\n"
    s += request.body
    return s


def referer_str(request):
    """ Return Referer HTTP header suitable for logging. """
    referrer = request.headers.get('Referer')
    if referrer is None:
        return referrer
    return to_native_str(referrer, errors='replace')


request_fingerprint用于返回唯一指纹,并且对于携带参数顺序不同的URL返回相同的指纹,通过哈希计算实现,参与哈希计算的有request.method、request.url、request.body,同时提供了一个备选列表参数,该列表存放需要加入计算的request.headers中的字段信息,也就是cookie也可以参与指纹计算,针对同一个URL但是header中的字段不同那么也可以视为不同的请求,从而不被去重。

当然获得一个请求的指纹之后,便可与Redis中的去重列表比对,如果存在,那么抛弃,如果不存在那么将指纹加入去重集合,同时通知调度器该request可以进入队列,这便是去重的实现过程。

2019-03-15-21_10_17.png


相关文章
|
1月前
|
消息中间件 存储 监控
消息队列:分布式系统中的重要组件
消息队列:分布式系统中的重要组件
|
3月前
|
负载均衡 Java 开发者
【分布式】Spring Cloud 组件综述
【1月更文挑战第25天】【分布式】Spring Cloud 组件综述
|
25天前
|
设计模式 安全 Java
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
29 0
|
6月前
|
前端开发 JavaScript 数据挖掘
医院LIS管理系统源码,DEV报表、前端js封装、分布式文件存储
云LIS系统还支持质控管理,为实验室提供科学的质量控制机制。用户可以根据需要调整质控参数,并可自动生成质控图表和统计分析报告。提供了数据分析功能,可以通过数据挖掘和分析来发现潜在关联性,为实验室提供更多的参考和决策支持
|
25天前
|
NoSQL Java Redis
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件(二)
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件
15 0
|
4月前
|
监控 Java Nacos
【分布式流控组件 Sentinel 快速入门】——图文详解操作流程(上)
【分布式流控组件 Sentinel 快速入门】——图文详解操作流程
136 0
【分布式流控组件 Sentinel 快速入门】——图文详解操作流程(上)
|
1月前
|
SpringCloudAlibaba Java 持续交付
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(一)基础知识+各个组件介绍+聚合父工程创建
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(一)基础知识+各个组件介绍+聚合父工程创建
83 1
|
4月前
|
监控 Dubbo Linux
【分布式流控组件 Sentinel 快速入门】——图文详解操作流程(下)
【分布式流控组件 Sentinel 快速入门】——图文详解操作流程(下)
103 0
|
4月前
|
算法 API 网络架构
【分布式流控组件 Sentinel 快速入门】——图文详解操作流程(中)
【分布式流控组件 Sentinel 快速入门】——图文详解操作流程(中)
|
5月前
|
数据采集 存储 中间件
【 ⑭】Scrapy架构(组件介绍、架构组成和工作原理)
【 ⑭】Scrapy架构(组件介绍、架构组成和工作原理)
144 0