第14章
分布式爬取

由于受到计算能力和网络带宽的限制,单台计算机上运行的爬虫在爬取的数据量较大时,需要耗费很长的时间。分布式爬取的思想是“人多力量大”,在网络中的多台计算机上同时运行爬虫程序,共同完成一个大型爬取任务。这一章来学习使用Scrapy框架进行分布式爬取。

Scrapy本身并不是一个为分布式爬取而设计的框架,但第三方库scrapy-redis为其拓展了分布式爬取的功能,两者结合便是一个分布式Scrapy爬虫框架。在分布式爬虫框架中,需要使用某种通信机制协调各个爬虫的工作,让每一个爬虫明确自己的任务,其中包括:

(1)当前的爬取任务,即下载+提取数据(分配任务)。

广告:个人专属 VPN,独立 IP,无限流量,多机房切换,还可以屏蔽广告和恶意软件,每月最低仅 5 美元

(2)当前爬取任务是否已经被其他爬虫执行过(任务去重)。

(3)如何存储爬取到的数据(数据存储)。

scrapy-redis利用Redis数据库作为多个爬虫的数据共享实现以上功能,接下来我们学习如何使用scrapy-redis进行分布式爬取。

14.1 Redis的使用

首先来学习Redis数据库的使用。Redis是一个速度非常快的非关系型数据库,使用内存作为主存储,内存中的数据也可以被持久化到硬盘。Redis以键值形式(key-value)存储数据,其中的值可以分为以下5种类型:

● 字符串(string)

● 哈希(hash)

● 列表(list )

● 集合(set)

● 有序集合(zset)

14.1.1 安装Redis

接下来安装Redis,在Ubuntu下可以使用apt-get安装:

     sudo apt-get install redis-server

Redis数据库进程是一个网络服务器,可以使用以下命令开启/重启/停止Redis:

     $ sudo service redis-server start   # 开启Redis
     $ sudo service redis-server restart # 重启Redis
     $ sudo service redis-server start   # 停止Redis

默认情况下,Redis会在127.0.0.1:6379上开启服务,可以使用netstat命令进行查询:

     $ netstat -ntl
     Proto Recv-Q Send-Q Local Address    Foreign Address   State
     ...
     tcp        0       0 127.0.0.1:6379   *:*             LISTEN
     ...

此时,我们运行的Redis仅能被本机使用,因为它只接收来自本机(localhost)的请求。如果想让Redis能被其他计算机访问,需要在配置文件中修改Redis服务器的监听地址,在Redis配置文件/etc/redis/redis.conf中做以下修改:

     $ sudo vi /etc/redis/redis.conf
     ...
     # bind 127.0.0.1
     bind 0.0.0.0  # 接收来自任意IP的请求
     ...

重启Redis后,修改生效:

     $ sudo service redis-server restart
     Stopping redis-server: redis-server.
     Starting redis-server: redis-server.
     $ netstat -ntl
     Proto Recv-Q Send-Q Local Address     Foreign Address  State
     ...
     $ tcp        0       0 0.0.0.0:6379      0.0.0.0:*       LISTEN
     ...

现在,可以在任意机器上使用客户端连接Redis数据库:

     $ redis-cli -h 192.168.0.103  # 使用-h 参数指定主机IP
     192.168.1.102:6379> PING      # 测试链接数据库是否成功
     PONG

14.1.2 Redis基本命令

由于篇幅有限,这里仅介绍一些Redis基本使用命令,按照值的5种类型依次讲解。想学习Redis更多详细内容,请参考相关书籍或Redis官方网站https://redis.io。

字符串

Redis的字符串(String)可以存储字符串、整数、浮点数(数字也是字符串)。String命令及描述如表14-1所示。

表14-1 String命令及描述

阅读 ‧ 电子书库
192.168.1.102:6379> SET x abcde
OK
192.168.1.102:6379> GET x
"abcde"
192.168.1.102:6379> DEL x
(integer) 1
192.168.1.102:6379> GET x
(nil)

列表

Redis的列表(List)可以有序地存储多个字符串。List命令及描述如表14-2所示。

表14-2 List命令及描述

阅读 ‧ 电子书库
192.168.1.102:6379> RPUSH color orange           # 右边入队1 个值
(integer) 1
192.168.1.102:6379> RPUSH color red yellow blue  # 右边入队3 个值
(integer) 4
192.168.1.102:6379> LINDEX color 2               # 获取位置2的值
"yellow"
192.168.1.102:6379> LRANGE color 0 3             # 获取位置在0~3 范围的值
1) "orange"
2) "red"
3) "yellow"
4) "blue"
192.168.1.102:6379> LPOP color                   # 左边出队1 个值
"orange"
192.168.1.102:6379> LRANGE color 0 -1            # 获取全部值(负引索表示倒数第几个)
1) "red"
2) "yellow"
3) "blue"

哈希

Redis的哈希(Hash)可以存储多个键值对,其中的键和值都是字符串。Hash命令及描述如表14-3所示。

表14-3 Hash命令与描述

阅读 ‧ 电子书库
192.168.1.102:6379> HSET point x 34  # 赋值x、y、z 三个字段
(integer) 1
192.168.1.102:6379> HSET point y 55
(integer) 1
192.168.1.102:6379> HSET point z 47
(integer) 1
192.168.1.102:6379> HGET point x      # 获取字段x的值
"34"
192.168.1.102:6379> HGETALL point      # 获取所有字段和值
1) "x"
2) "34"
3) "y"
4) "55"
5) "z"
6) "47"
192.168.1.102:6379> HDEL point y      # 删除字段y
(integer) 1
192.168.1.102:6379> HGETALL point
1) "x"
2) "34"
3) "z"
4) "47"

集合

Redis中的集合(Set)可以存储多个唯一的字符串。Set命令及描述如表14-4所示。

表14-4 Set命令及描述

阅读 ‧ 电子书库
192.168.1.102:6379> SADD color_set white black red blue # 添加4 个成员
(integer) 4
192.168.1.102:6379> SMEMBERS color_set                   # 获取全部成员
1) "blue"
2) "red"
3) "black"
4) "white"
192.168.1.102:6379> SADD color_set white                 # 添加已存在的成员会失败
(integer) 0
192.168.1.102:6379> SCARD color_set                      # 查询成员个数
(integer) 4
192.168.1.102:6379> SREM color_set red                   # 删除1 个成员
(integer) 1
192.168.1.102:6379> SMEMBERS color_set
1) "black"
2) "white"
3) "blue"
192.168.1.102:6379> SISMEMBER color_set blue             # 判断某成员是否存在
(integer) 1
192.168.1.102:6379> SISMEMBER color_set red
(integer) 0

有序集合

Redis中的有序集合(ZSet)与集合(Set)类似,可以存储多个唯一的字符串,但在有序集合中,每个成员都有一个分数,所有成员按给定分数在集合中有序排列。Zset命令及描述如表14-5所示。

表14-5 ZSet命令及描述

阅读 ‧ 电子书库
192.168.1.102:6379> ZADD language 3 python 6 perl 9 lisp # 添加3 个成员
(integer) 3
192.168.1.102:6379> ZADD language 1 C 4 java 5 C++       # 再添加3 个成员
(integer) 3
192.168.1.102:6379> ZREM language C++                    # 删除1 个成员
(integer) 1
192.168.1.102:6379> ZRANGE language 2 4                  # 获取位置在2~4 范围的成员
1) "java"
2) "perl"
3) "lisp"
192.168.1.102:6379> ZRANGEBYSCORE language 3 6           # 获取分值在3~6 范围的成员
1) "python"
2) "java"
3) "perl"

14.1.3 Python访问Redis

Redis支持多种语言API,在Python中可以使用第三方库redis-py访问Redis数据库。

使用pip安装redis-py:

     $ sudo pip install redis

redis-py的使用非常简单,只需先连接Redis获得连接对象,然后调用该对象上与每一条Redis命令相对应的方法即可。请看下面的简单示例:

     >>> import redis
     >>> r = redis.StrictRedis(host='localhost', port=6379)  # 连接Redis,返回连接对象
     >>> r.set('s', 'hello world')                           # 字符串
     True
     >>> r.get('s')
     b'hello world'
     >>> r.rpush('queue', 1, 2, 3)  # 列表
     3
     >>> r.lpop('queue')
     b'1'
     >>> r.llen('queue')
     2
     >>> r.lrange('queue', 0, -1)
     [b'2', b'3']

Redis简介部分到此结束了,希望这部分内容能够帮助大家读懂scrapy-redis的源码。

14.2 scrapy-redis源码分析

在使用scrapy-redis前,我们先来分析scrapy-redis的源码,了解其实现原理。

使用git从github网站下载scrapy-redis源码:

阅读 ‧ 电子书库

scrapy-redis的源码并不多,因为它仅是利用Redis数据库重新实现了Scrapy中的某些组件。

对于一个分布式爬虫框架,需要解决以下两个最基本的问题。

● 分配爬取任务:为每个爬虫分配不重复的爬取任务。

● 汇总爬取数据:将所有爬虫爬取到的数据汇总到一处。

接下来我们看scrapy-redis是如何解决的。

14.2.1 分配爬取任务部分

scrapy-redis为多个爬虫分配爬取任务的方式是:让所有爬虫共享一个存在于Redis数据库中的请求队列(替代各爬虫独立的请求队列),每个爬虫从请求队列中获取请求,下载并解析页面后,将解析出的新请求再添加到请求队列中,因此每个爬虫既是下载任务的生产者又是消费者。

为实现多个爬虫的任务分配,scrapy-redis重新实现了以下组件:

● 基于Redis的请求队列(优先队列、FIFO、LIFO)。

● 基于Redis的请求去重过滤器(过滤掉重复的请求)。

● 基于以上两个组件的调度器。

1.调度器的实现

首先来看调度器Scheduler的实现,它位于scheduler.py中,Scheduler的核心代码如下:

     import importlib
     import six


     from scrapy.utils.misc import load_object
     from . import connection, defaults


     class Scheduler(object):
        ...
        def open(self, spider):
           ...
           # 初始化请求队列
           try:
              self.queue = load_object(self.queue_cls)(
                 server=self.server,
                 spider=spider,
                 key=self.queue_key % {'spider': spider.name},
               serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                           self.queue_cls, e)


        # 初始化去重过滤器
        try:
            self.df = load_object(self.dupefilter_cls)(
               server=self.server,
               key=self.dupefilter_key % {'spider': spider.name},
               debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                           self.dupefilter_cls, e)
        ...


     ...
     def enqueue_request(self, request):
        # 调用去重过滤器的request_seen 方法, 判断该request 对应的页面是否已经爬取过
        # 如果页面已经爬取过,且用户没有强制忽略过滤,就直接返回False
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        # 将request 入队到请求队列
        self.queue.push(request)
        return True


     def next_request(self):
        block_pop_timeout = self.idle_before_close
        # 从请求队列出队一个request
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request
     ...

分析上述代码如下:

● 调度器中最核心的两个方法是enqueue_request和next_request,它们分别对应请求的入队和出队操作。Spider提交的Request对象最终由Scrapy引擎调用enqueue_request添加到请求队列中,Scrapy引擎同时也调用next_request从请求队列中取出请求,送给下载器下载。

● self.queue和self.df分别是请求队列和去重过滤器对象。在enqueue_request方法中,使用去重过滤器的request_seen方法判断request是否重复,即request对应的页面是否已经爬取过,如果用户没有强制忽略过滤,并且request是重复的,就抛弃该request,并直接返回False,否则调用self.queue的push方法将request入队,返回True。在next_request方法中,调用self.queue的pop方法出队一个request并返回。

再来看一下创建请求队列和去重过滤器对象的相关代码:

     import importlib
     import six


     from scrapy.utils.misc import load_object
     from . import connection, defaults


     class Scheduler(object):
        def __init__(self, server, ...,
                    queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                    dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS):
            ...
            self.server = server
            self.queue_cls = queue_cls
            self.dupefilter_cls = dupefilter_cls
            ...


        @classmethod
        def from_settings(cls, settings):
            ...
            # 从配置中读取请求队列和去重过滤器的类名
            optional = {
                ...
            'queue_cls': 'SCHEDULER_QUEUE_CLASS',
            'dupefilter_cls': 'DUPEFILTER_CLASS',
            ...
        }
        for name, setting_name in optional.items():
            val = settings.get(setting_name)
            if val:
               kwargs[name] = val
        ...
        server = connection.from_settings(settings)
        server.ping()


        return cls(server=server, **kwargs)
     ...


     def open(self, spider):
        ...
        # 初始化请求队列
        try:
            self.queue = load_object(self.queue_cls)(
               server=self.server,
               spider=spider,
               key=self.queue_key % {'spider': spider.name},
               serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                           self.queue_cls, e)


        # 初始化去重过滤器
        try:
            self.df = load_object(self.dupefilter_cls)(
               server=self.server,
               key=self.dupefilter_key % {'spider': spider.name},
               debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class '%s': %s",
             self.dupefilter_cls, e)
        ...
     ...

self.queue和self.df的创建是在open方法中调用load_object方法完成的,load_object方法的参数是类的导入路径(如scrapy_redis.queue.PriorityQueue),这种实现的好处是用户可以使用字符串在配置文件中灵活指定想要使用的队列类和过滤器类。self.queue_cls和self.dupefilter_cls便是从配置文件中读取的导入路径(或默认值)。

2.请求队列的实现

接下来看基于Redis的请求队列的实现。在queque.py中,包含以下3种请求队列:

● PriorityQueue优先级队列(默认)

● FifoQueue先进先出队列

● LifoQueue后进先出队列

我们分析其中代码简短且容易理解的FifoQueue,代码如下:

     class FifoQueue(Base):
        """Per-spider FIFO queue"""


        def __len__(self):
            """Return the length of the queue"""
            return self.server.llen(self.key)


        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))


        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                # brpop: 阻塞的rpop,可以设置超时
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                   data = data[1]
            else:
                data = self.server.rpop(self.key)
            if data:
                return self._decode_request(data)

● self.server是Redis数据库的连接对象(可理解为self.server = redis.StrictRedis(...)),该连接对象是在Scheduler的from_settings方法中创建的,在创建请求队列对象时,被传递给请求队列类的构造器。

● 观察在self.server上调用的方法可知,FifoQueue使用Redis中的一个列表实现队列,该列表在数据库中的键为self.key的值,可以通过配置文件设置(SCHEDULER_QUEUE_KEY),默认为<spider_name>:requests。

● push方法对应请求的入队操作,先调用基类的_encode_request方法对request进行编码,然后调用Redis的lpush命令将其插入数据库中列表的最左端(入队)。

● pop方法对应请求的出队操作,调用Redis的rpop或brpop命令从数据库中列表的最右端弹出一个经过编码的request(出队),再调用基类的_decode_request方法对其进行解码,然后返回。

● __len__方法调用Redis的llen命令获取数据库中列表的长度,即请求队列的长度。

下面是FifoQueue、LifoQueue、PriorityQueue共同基类Base的部分代码:

     from . import picklecompat


     class Base(object):
        """Per-spider base queue class"""


        def __init__(self, server, spider, key, serializer=None):
            """Initialize per-spider redis queue.


            Parameters
            ----------
            server : StrictRedis
                Redis client instance.
            spider : Spider
                Scrapy spider instance.
            key: str
                Redis key where to put and get messages.
            serializer : object
                Serializer object with ``loads`` and ``dumps`` methods.
            """


            if serializer is None:
                serializer = picklecompat
            ...
        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer


     def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)


     def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)
     ...

可以看到,在对request进行编、解码时,调用的是self.serializer的dumps和loads方法。self.serializer同样可以通过配置文件指定(SCHEDULER_SERIALIZER),默认为Python标准库中的pickle模块。

3.去重过滤器的实现

最后来看基于Redis的去重过滤器RFPDupeFilter的实现,它位于dupefilter.py中,部分代码如下:

     ...
     from scrapy.dupefilters import BaseDupeFilter
     from scrapy.utils.request import request_fingerprint
     ...
     class RFPDupeFilter(BaseDupeFilter):
        ...
        def __init__(self, server, key, debug=False):
            """Initialize the duplicates filter.


            Parameters
            ----------
            server : redis.StrictRedis
                The redis server instance.
            key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.


        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True


     ...
     def request_seen(self, request):
        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)
        return added == 0


     def request_fingerprint(self, request):
        return request_fingerprint(request)
     ...

● self.server是Redis数据库的连接对象,与FifoQueue中相同。

● 观察在self.server上调用的方法可知,RFPDupeFilter使用Redis中的一个集合对请求进行去重,该集合在数据库中的键为self.key的值,可以通过配置文件设置(SCHEDULER_DUPEFILTER_KEY),默认为<spider_name>:dupefilter。

● request_fingerprint方法用来获取一个请求的指纹,即请求的唯一标识,请求的指纹是使用Python标准库hashlib中的sha1算法计算得到的(详见scrapy.utils.request中的request_fingerprint函数)。

● request_seen方法用来判断一个请求是否是重复的,先调用request_fingerprint方法计算request的指纹,然后调用Redis的sadd命令尝试将指纹添加到数据库中的集合中,根据sadd返回值判断请求是否重复,返回相应的布尔值结果,即重复返回True,否则返回False。

14.2.2 汇总爬取数据部分

1.RedisPipeline的实现

在分布式爬虫框架中,各个主机爬取到的数据最终要汇总到一处,通常是某种数据库。scrapy-redis提供了一个Item Pipeline(RedisPipeline),用于将各个爬虫爬取到的数据存入同一个Redis数据库中。

RedisPipeline位于pipeline.py中,代码如下:

     from scrapy.utils.misc import load_object
     from scrapy.utils.serialize import ScrapyJSONEncoder
     from twisted.internet.threads import deferToThread


     from . import connection, defaults


     default_serialize = ScrapyJSONEncoder().encode


     class RedisPipeline(object):
        """Pushes serialized item into a redis list/queue


        Settings
        --------
        REDIS_ITEMS_KEY : str
            Redis key where to store items.
        REDIS_ITEMS_SERIALIZER : str
            Object path to serializer function.


        """


        def __init__(self, server,
                    key=defaults.PIPELINE_KEY,
                    serialize_func=default_serialize):
            """Initialize pipeline.


            Parameters
            ----------
            server : StrictRedis
                Redis client instance.
            key : str
                Redis key where to store items.
            serialize_func : callable
                Items serializer function.


            """
            self.server = server
        self.key = key
        self.serialize = serialize_func


     @classmethod
     def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
               settings['REDIS_ITEMS_SERIALIZER']
            )


        return cls(**params)


     @classmethod
     def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)


     def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)


     def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item


     def item_key(self, item, spider):
        """Returns redis key based on given spider.


        Override this function to use a different key depending on the item
        and/or spider.


        """
        return self.key % {'spider': spider.name}

● self.server是Redis数据库的连接对象,与FifoQueue中相同。

● 观察在self.server上调用的方法可知,RedisPipeline使用Redis中的一个列表存储所有爬虫爬取到的数据,该列表在数据库中的键为调用item_key方法的结果,即self.key %{'spider': spider.name}。self.key可以通过配置文件设置(REDIS_ITEMS_KEY),默认情况下列表的键为<spider_name>:items。

● Redis的列表只能存储字符串,而Spider爬取到的数据item的类型是Item或Python字典,所以先要将item串行化成字符串,再存入Redis列表。串行化函数也可以通过配置文件指定(REDIS_ITEMS_SERIALIZER),默认情况下item被串行化成json串。

● process_item方法处理爬取到的每一项数据,因为写入数据库为I/O操作,速度较慢,所以可以在线程中执行,调用twisted中的deferToThread方法,启动线程执行_process_item方法。

● _process_item方法实际处理爬取到的每一项数据,先使用self.serial函数将item串行化成字符串,再调用Redis的rpush命令将其写入数据库中的列表。

14.3 使用scrapy-redis进行分布式爬取

了解了scrapy-redis的原理后,我们学习使用scrapy + scrapy-redis进行分布式爬取。

14.3.1 搭建环境

首先搭建scrapy-redis分布式爬虫环境,当前我们有3台Linux主机。

云服务器(A):116.29.35.201 (Redis Server)

云服务器(B):123.59.45.155

本机(C):1.13.41.127

在3台主机上安装scrapy和scrapy-redis:

     $ pip install scrapy
     $ pip install scrapy-redis

选择其中一台云服务器搭建供所有爬虫使用的Redis数据库,步骤如下:

步骤 01 在云服务器上安装redis-server。

步骤 02 在Redis配置文件中修改服务器的绑定地址(确保数据库可被所有爬虫访问)。

步骤 03 启动/重启Redis服务器。

登录云服务器(A),在bash中完成上述步骤:

     116.29.35.201$ sudo apt-get install redis-server
     116.29.35.201$ sudo vi /etc/redis/redis.conf
     ...
     # bind 127.0.0.1
     bind 0.0.0.0
     ...
     116.29.35.201$ sudo service redis-server restart

最后,在3台主机上测试能否访问云服务器(A)上的Redis数据库:

     $ redis-cli -h 116.29.35.201 ping
     PONG

到此,Scrapy分布式爬虫环境搭建完毕。

14.3.2 项目实战

本章的核心知识点是分布式爬取,因此本项目不再对分析页面、编写Spider等大家熟知的技术进行展示。我们可以任意挑选一个在之前章节中做过的项目,将其改为分布式爬取的版本,这里以第8章的toscrape_book项目(爬取books.toscrape.com中的书籍信息)为例进行讲解。

复制toscrape_book项目,得到新项目toscrape_book_distributed:

     $ cp -r toscrape_book toscrape_book_distributed
     $ cd toscrape_book_distributed

在配置文件settings.py中添加scrapy-redis的相关配置:

     # 必选项
     # ==================================================================
     # 指定爬虫所使用的Redis数据库(在云服务器116.29.35.201 上)
     REDIS_URL = 'redis://116.29.35.201:6379'


     # 使用scrapy_redis的调度器替代Scrapy 原版调度器
     SCHEDULER = "scrapy_redis.scheduler.Scheduler"


     # 使用scrapy_redis的RFPDupeFilter作为去重过滤器
     DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
     # 启用scrapy_redis的RedisPipeline将爬取到的数据汇总到Redis数据库
     ITEM_PIPELINES = {
        'scrapy_redis.pipelines.RedisPipeline': 300,
     }
     # 可选项
     # ==================================================================
     #爬虫停止后,保留/清理Redis中的请求队列以及去重集合
     # True:保留,False:清理,默认为False
     SCHEDULER_PERSIST = True

将单机版本的Spider改为分布式版本的Spider,只需做如下简单改动:

     from scrapy_redis.spiders import RedisSpider


     # 1.更改基类
     # class BooksSpider(spider.Spider):
     class BooksSpider(RedisSpider):
        ...
        # 2.注释start_urls
        #start_urls = ['http://books.toscrape.com/']
        ...

上述改动针对“如何为多个爬虫设置起始爬取点”这个问题,解释如下:

● 在分布式爬取时,所有主机上的代码是相同的,如果使用之前单机版本的Spider代码,那么每一台主机上的Spider都通过start_urls属性定义了起始爬取点,在构造起始爬取点的Request对象时,dont_filter参数设置为了True,即忽略去重过滤器的过滤。因此多个(数量等于爬虫数量)重复请求将强行进入Redis中的请求队列,这可能导致爬取到重复数据。

● 为了解决上述问题,scrapy-redis提供了一个新的Spider基类RedisSpider,RedisSpider重写了start_requests方法,它尝试从Redis数据库的某个特定列表中获取起始爬取点,并构造Request对象(dont_filter=False),该列表的键可通过配置文件设置(REDIS_START_URLS_KEY),默认为<spider_name>:start_urls。在分布式爬取时,用户运行所有爬虫后,需手动使用Redis命令向该列表添加起始爬取点,之后只有其中一个爬虫能获取到起始爬取点,因此对应的请求也就只有一个,从而避免了重复。

到此,分布式版本的项目代码已经完成了,分发到各个主机:

     $ scp -r toscrape_book_distributed liushuo@116.29.35.201:~/scrapy_book
     $ scp -r toscrape_book_distributed liushuo@123.59.45.155:~/scrapy_book

分别在3台主机使用相同命令运行爬虫:

         $ scrapy crawl books
         2017-05-14 17:56:42 [scrapy.utils.log] INFO: Scrapy 1.3.3 started (bot: toscrape_book)
         2017-05-14 17:56:42 [scrapy.utils.log] INFO: Overridden settings: {'DUPEFILTER_CLASS':
     'scrapy_redis.dupefilter.RFPDupeFilter', 'FEED_EXPORT_FIELDS': ['upc', 'name', 'price', 'stock',
     'review_rating', 'review_num'], 'SCHEDULER': 'scrapy_redis.scheduler.Scheduler', 'BOT_NAME':
     'toscrape_book', 'ROBOTSTXT_OBEY': True, 'NEWSPIDER_MODULE': 'toscrape_book.spiders',
     'SPIDER_MODULES': ['toscrape_book.spiders']}
         2017-05-14 17:56:42 [scrapy.middleware] INFO: Enabled extensions:
         ['scrapy.extensions.logstats.LogStats',
          'scrapy.extensions.telnet.TelnetConsole',
          'scrapy.extensions.corestats.CoreStats']
         2017-05-14 17:56:42 [books] INFO: Reading start URLs from redis key 'books:start_urls' (batch size:
     16, encoding: utf-8
         2017-05-14 17:56:42 [scrapy.middleware] INFO: Enabled downloader middlewares:
         ['scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware',
          'scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware',
          'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware',
          'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware',
          'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware',
          'scrapy.downloadermiddlewares.retry.RetryMiddleware',
          'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware',
          'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware',
          'scrapy.downloadermiddlewares.redirect.RedirectMiddleware',
          'scrapy.downloadermiddlewares.cookies.CookiesMiddleware',
          'scrapy.downloadermiddlewares.stats.DownloaderStats']
         2017-05-14 17:56:42 [scrapy.middleware] INFO: Enabled spider middlewares:
         ['scrapy.spidermiddlewares.httperror.HttpErrorMiddleware',
          'scrapy.spidermiddlewares.offsite.OffsiteMiddleware',
          'scrapy.spidermiddlewares.referer.RefererMiddleware',
          'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware',
          'scrapy.spidermiddlewares.depth.DepthMiddleware']
         2017-05-14 17:56:42 [scrapy.middleware] INFO: Enabled item pipelines:
          ['scrapy_redis.pipelines.RedisPipeline']
          2017-05-14 17:56:42 [scrapy.core.engine] INFO: Spider opened
          2017-05-14 17:56:42 [scrapy.extensions.logstats] INFO: Crawled 0 pages (at 0 pages/min), scraped 0
     items (at 0 items/min)
          2017-05-14 17:56:42 [scrapy.extensions.telnet] DEBUG: Telnet console listening on 127.0.0.1:6023
          ...阻塞在此处...

运行后,由于Redis中的起始爬取点列表和请求队列都是空的,3个爬虫都进入了暂停等待的状态,因此在任意主机上使用Redis客户端设置起始爬取点:

     $ redis-cli -h 116.29.35.201
     116.29.35.201:6379> lpush books:start_urls 'http://books.toscrape.com/'
     (integer) 1

随后,其中一个爬虫(本实验中是云服务器A)从起始爬取点列表中获取到了url,在其log中观察到如下信息:

     2017-05-14 17:57:18 [books] DEBUG: Read 1 requests from 'books:start_urls'

该爬虫用起始爬取点url构造的Request对象最终被添加到Redis中的请求队列之后。各个爬虫相继开始工作了,可在各爬虫的log中观察到类似于如下的信息:

          2017-05-14 18:00:42 [scrapy.core.scraper] DEBUG: Scraped from <200
     http://books.toscrape.com/catalogue/arena_587/index.html>
          {'name': 'Arena',
           'price': '£21.36',
           'review_num': '0',
           'review_rating': 'Four',
           'stock': '11',
           'upc': '2c34f9432069b52b'}
          2017-05-14 18:00:42 [scrapy.core.engine] DEBUG: Crawled (200)  (referer:
     http://books.toscrape.com/catalogue/page-21.html)
          2017-05-14 18:00:42 [scrapy.core.scraper] DEBUG: Scraped from <200
     http://books.toscrape.com/catalogue/adultery_586/index.html>
          {'name': 'Adultery',
           'price': '£20.88',
           'review_num': '0',
           'review_rating': 'Five',
           'stock': '11',
           'upc': 'bb967277222e689c'}
          2017-05-14 18:00:42 [scrapy.core.engine] DEBUG: Crawled (200)  (referer:
     http://books.toscrape.com/catalogue/page-21.html)
          2017-05-14 18:00:42 [scrapy.core.scraper] DEBUG: Scraped from <200
     http://books.toscrape.com/catalogue/a-mothers-reckoning-living-in-the-aftermath-of-tragedy_585/index.htm
     l>
          {'name': "A Mother's Reckoning: Living in the Aftermath of Tragedy",
           'price': '£19.53',
           'review_num': '0',
           'review_rating': 'Three',
           'stock': '11',
           'upc': '2b69dec0193511d9'}
          2017-05-14 18:00:43 [scrapy.core.scraper] DEBUG: Scraped from <200
     http://books.toscrape.com/catalogue/112263_583/index.html>
          {'name': '11/22/63',
           'price': '£48.48',
           'review_num': '0',
           'review_rating': 'Three',
           'stock': '11',
           'upc': 'a9d7b75461084a26'}
          2017-05-14 18:00:43 [scrapy.core.engine] DEBUG: Crawled (200)  (referer:
     http://books.toscrape.com/catalogue/page-21.html)
          2017-05-14 18:00:43 [scrapy.core.scraper] DEBUG: Scraped from <200
     http://books.toscrape.com/catalogue/10-happier-how-i-tamed-the-voice-in-my-head-reduced-stress-without-
     losing-my-edge-and-found-self-help-that-actually-works_582/index.html>
          {'name': '10% Happier: How I Tamed the Voice in My Head, Reduced Stress '
                  'Without Losing My Edge, and Found Self-Help That Actually Works',
           'price': '£24.57',
           'review_num': '0',
           'review_rating': 'Two',
           'stock': '10',
           'upc': '34669b2e9d407d3a'}

等待全部爬取完成后,在Redis中查看爬取到的数据:

     116.29.35.201:6379> keys *
     1) "books:items"
     2) "books:dupefilter"
     116.29.35.201:6379> llen books:items
     (integer) 1000
     116.29.35.201:6379> LRANGE books:items 0 4
          1) "{\"stock\": \"22\", \"review_num\": \"0\", \"upc\": \"a897fe39b1053632\", \"name\": \"A Light in
     the Attic\", \"review_rating\": \"Three\", \"price\": \"\\u00a351.77\"}"
          2) "{\"stock\": \"20\", \"review_num\": \"0\", \"upc\": \"e00eb4fd7b871a48\", \"name\": \"Sharp
     Objects\", \"review_rating\": \"Four\", \"price\": \"\\u00a347.82\"}"
          3) "{\"stock\": \"20\", \"review_num\": \"0\", \"upc\": \"90fa61229261140a\", \"name\": \"Tipping the
     Velvet\", \"review_rating\": \"One\", \"price\": \"\\u00a353.74\"}"
          4) "{\"stock\": \"20\", \"review_num\": \"0\", \"upc\": \"6957f44c3847a760\", \"name\":
     \"Soumission\", \"review_rating\": \"One\", \"price\": \"\\u00a350.10\"}"
          5) "{\"stock\": \"19\", \"review_num\": \"0\", \"upc\": \"2597b5a345f45e1b\", \"name\": \"The Dirty
     Little Secrets of Getting Your Dream Job\", \"review_rating\": \"Four\", \"price\": \"\\u00a333.34\"}"
          116.29.35.201:6379> LRANGE books:items -5 -1
          1) "{\"name\": \"Shameless\", \"price\": \"\\u00a358.35\", \"review_rating\": \"Three\", \"upc\":
     \"c068c013d6921fea\", \"review_num\": \"0\", \"stock\": \"1\"}"
          2) "{\"stock\": \"1\", \"review_num\": \"0\", \"upc\": \"19fec36a1dfb4c16\", \"name\": \"A Spy's
     Devotion (The Regency Spies of London #1)\", \"review_rating\": \"Five\", \"price\": \"\\u00a316.97\"}"
          3) "{\"stock\": \"1\", \"review_num\": \"0\", \"upc\": \"f684a82adc49f011\", \"name\": \"1st to Die
     (Women's Murder Club #1)\", \"review_rating\": \"One\", \"price\": \"\\u00a353.98\"}"
          4) "{\"stock\": \"1\", \"review_num\": \"0\", \"upc\": \"228ba5e7577e1d49\", \"name\": \"1,000 Places
     to See Before You Die\", \"review_rating\": \"Five\", \"price\": \"\\u00a326.08\"}"
          5) "{\"name\": \"Girl in the Blue Coat\", \"price\": \"\\u00a346.83\", \"review_rating\": \"Two\",
     \"upc\": \"41fc5dce044f16f5\", \"review_num\": \"0\", \"stock\": \"3\"}"

如上所示,我们成功地爬取到了1000项数据(由各爬虫最后的log信息得知,爬虫A:514项,爬虫B:123项,爬虫C:363项)。每一项数据以json形式存储在Redis的列表中,需要使用这些数据时,可以编写Python程序将它们从Redis中读出,代码框架如下:

     import redis
     import json


     ITEM_KEY = 'books:items'


     def process_item(item):
        # 添加处理数据的代码
        ...


     def main():
        r = redis.StrictRedis(host='116.29.35.201', port=6379)
        for _ in range(r.llen(ITEM_KEY)):
            data = r.lpop(ITEM_KEY)
            item = json.loads(data.decode('utf8'))
            process_item(item)


     if __name__ == '__main__':
        main()

到此,我们完成了分布式爬取的项目。

14.4 本章小结

本章我们学习了如何利用scrapy-redis构建分布式Scrapy爬虫,首先介绍了一些Redis数据库的基础知识,然后对scrapy-redis源码进行了分析,最后通过案例展示了一个分布式爬虫的开发流程。