第5章
使用Item Pipeline处理数据

在之前的章节中,我们学习了提取数据以及封装数据的方法,这一章来学习如何对爬取到的数据进行处理。在Scrapy中,Item Pipeline是处理数据的组件,一个Item Pipeline就是一个包含特定接口的类,通常只负责一种功能的数据处理,在一个项目中可以同时启用多个Item Pipeline,它们按指定次序级联起来,形成一条数据处理流水线。

以下是Item Pipeline的几种典型应用:

● 清洗数据。

广告:个人专属 VPN,独立 IP,无限流量,多机房切换,还可以屏蔽广告和恶意软件,每月最低仅 5 美元

● 验证数据的有效性。

● 过滤掉重复的数据。

● 将数据存入数据库。

5.1 Item Pipeline

通过一个例子讲解Item Pipeline的使用,在第1章的example项目中,我们爬取到的书籍价格是以英镑为单位的:

     $ scrapy crawl books -o books.csv
     ...
     $ head -5 books.csv    # 查看文件开头的5 行
     name,price
     A Light in the Attic,£51.77
     Tipping the Velvet,£53.74
     Soumission,£50.10
     Sharp Objects,£47.82

如果我们期望爬取到的书价是人民币价格,就需要用英镑价格乘以汇率计算出人民币价格(处理数据),此时可以实现一个价格转换的Item Pipeline来完成这个工作。接下来在example项目中实现它。

5.1.1 实现Item Pipeline

在创建一个Scrapy项目时,会自动生成一个pipelines.py文件,它用来放置用户自定义的Item Pipeline,在example项目的pipelines.py中实现PriceConverterPipeline,代码如下:

     class PriceConverterPipeline(object):


        # 英镑兑换人民币汇率
        exchange_rate = 8.5309


        def process_item(self, item, spider):
            # 提取item的price 字段(如£53.74)
            # 去掉前面英镑符号£,转换为float 类型,乘以汇率
            price = float(item['price'][1:]) * self.exchange_rate


            # 保留2 位小数,赋值回item的price 字段
            item['price'] = '¥%.2f' % price


            return item

对上述代码解释如下:

● 一个Item Pipeline不需要继承特定基类,只需要实现某些特定方法,例如process_item、open_spider、close_spider。

● 一个Item Pipeline必须实现一个process_item(item, spider)方法,该方法用来处理每一项由Spider爬取到的数据,其中的两个参数:

阅读 ‧ 电子书库 Item 爬取到的一项数据(Item或字典)。

阅读 ‧ 电子书库 Spider 爬取此项数据的Spider对象。

上述代码中的process_item方法实现非常简单,将书籍的英镑价格转换为浮点数,乘以汇率并保留2位小数,然后赋值回item的price字段,最后返回被处理过的item。

可以看出,process_item方法是Item Pipeline的核心,对该方法还需再做两点补充说明:

● 如果process_item在处理某项item时返回了一项数据(Item或字典),返回的数据会递送给下一级Item Pipeline(如果有)继续处理。

● 如果process_item在处理某项item时抛出(raise)一个DropItem异常(scrapy.exceptions.DropItem),该项item便会被抛弃,不再递送给后面的Item Pipeline继续处理,也不会导出到文件。通常,我们在检测到无效数据或想要过滤数据时,抛出DropItem异常。

除了必须实现的process_item方法外,还有3个比较常用的方法,可根据需求选择实现:

● open_spider(self, spider)

Spider打开时(处理数据前)回调该方法,通常该方法用于在开始处理数据之前完成某些初始化工作,如连接数据库。

● close_spider(self, spider)

Spider关闭时(处理数据后)回调该方法,通常该方法用于在处理完所有数据之后完成某些清理工作,如关闭数据库。

● from_crawler(cls, crawler)

创建Item Pipeline对象时回调该类方法。通常,在该方法中通过crawler.settings读取配置,根据配置创建Item Pipeline对象。

在后面的例子中,我们展示了以上方法的应用场景。

5.1.2 启用Item Pipeline

在Scrapy中,Item Pipeline是可选的组件,想要启用某个(或某些)Item Pipeline,需要在配置文件settings.py中进行配置:

     ITEM_PIPELINES = {
       'example.pipelines.PriceConverterPipeline': 300,
     }

ITEM_PIPELINES是一个字典,我们把想要启用的Item Pipeline添加到这个字典中,其中每一项的键是每一个Item Pipeline类的导入路径,值是一个0~1000的数字,同时启用多个Item Pipeline时,Scrapy根据这些数值决定各Item Pipeline处理数据的先后次序,数值小的在前。

启用PriceConverterPipeline后,重新运行爬虫,并观察结果:

     $ scrapy crawl books -o books.csv
     ...
     $ head -5 books.csv # 查看文件开头的5 行
     name,price
     A Light in the Attic,¥441.64
     Tipping the Velvet,¥458.45
     Soumission,¥427.40
     Sharp Objects,¥407.95

使用PriceConverterPipeline对数据进行处理后,books.csv中的书价转换成了人民币价格。

5.2 更多例子

我们通过一个例子学习了如何使用Item Pipeline对数据进行处理,下面再看两个实际例子。

5.2.1 过滤重复数据

为了确保爬取到的书籍信息中没有重复项,可以实现一个去重Item Pipeline。这里,我们就以书名作为主键(实际应以ISBN编号为主键,但是仅爬取了书名和价格)进行去重,实现DuplicatesPipeline代码如下:

     from scrapy.exceptions import DropItem


     class DuplicatesPipeline(object):


        def __init__(self):
        self.book_set = set()


     def process_item(self, item, spider):
        name = item['name']
        if name in self.book_set:
            raise DropItem("Duplicate book found: %s" % item)


        self.book_set.add(name)
        return item

对上述代码解释如下:

● 增加构造器方法,在其中初始化用于对书名去重的集合。

● 在process_item方法中,先取出item的name字段,检查书名是否已在集合book_set中,如果存在,就是重复数据,抛出DropItem异常,将item抛弃;否则,将item的name字段存入集合,返回item。

接下来测试DuplicatesPipeline。首先在不启用DuplicatesPipeline的情况下,运行爬虫,查看结果:

     $ scrapy crawl books -o book1.csv
     ...
     $ cat -n book1.csv
         1 price,name
         2 ¥441.64,A Light in the Attic
         3 ¥458.45,Tipping the Velvet
         4 ¥427.40,Soumission
         5 ¥407.95,Sharp Objects
         6 ¥462.63,Sapiens: A Brief History of Humankind
         7 ¥193.22,The Requiem Red
         8 ¥284.42,The Dirty Little Secrets of Getting Your Dream Job
        ...
       993 ¥317.86,Bounty (Colorado Mountain #7)
       994 ¥173.18,Blood Defense (Samantha Brinkman #1)
       995 ¥295.60,"Bleach, Vol. 1: Strawberry and the Soul Reapers (Bleach #1)"
       996 ¥370.07,Beyond Good and Evil
       997 ¥473.72,Alice in Wonderland (Alice's Adventures in Wonderland #1)
       998 ¥486.77,"Ajin: Demi-Human, Volume 1 (Ajin: Demi-Human #1)"
       999 ¥144.77,A Spy's Devotion (The Regency Spies of London #1)
      1000 ¥460.50,1st to Die (Women's Murder Club #1)
      1001 ¥222.49,"1,000 Places to See Before You Die"

此时有1000本书。

然后在配置文件settings.py中启用DuplicatesPipeline:

     ITEM_PIPELINES = {
        'example.pipelines.PriceConverterPipeline': 300,
        'example.pipelines.DuplicatesPipeline': 350,
     }

运行爬虫,对比结果:

     $ scrapy crawl books -o book2.csv
     ...
     $ cat -n book2.csv
         1   name,price
         2   A Light in the Attic,¥441.64
         3   Tipping the Velvet,¥458.45
         4   Soumission,¥427.40
         5   Sharp Objects,¥407.95
         6   Sapiens: A Brief History of Humankind,¥462.63
         7   The Requiem Red,¥193.22
         8   The Dirty Little Secrets of Getting Your Dream Job,¥284.42
        ...
       993   Blood Defense (Samantha Brinkman #1),¥173.18
       994   "Bleach, Vol. 1: Strawberry and the Soul Reapers (Bleach #1)",¥295.60
       995   Beyond Good and Evil,¥370.07
       996   Alice in Wonderland (Alice's Adventures in Wonderland #1),¥473.72
       997   "Ajin: Demi-Human, Volume 1 (Ajin: Demi-Human #1)",¥486.77
       998   A Spy's Devotion (The Regency Spies of London #1),¥144.77
       999   1st to Die (Women's Murder Club #1),¥460.50
      1000   "1,000 Places to See Before You Die",¥222.49

只有999本了,比之前少了1本,说明有两本书是同名的,翻阅爬虫的log信息可以找到重复项:

     [scrapy.core.scraper] WARNING: Dropped: Duplicate book found:
     {'name': 'The Star-Touched Queen', 'price': '¥275.55'}

5.2.2 将数据存入MongoDB

有时,我们想把爬取到的数据存入某种数据库中,可以实现Item Pipeline完成此类任务。下面实现一个能将数据存入MongoDB数据库的Item Pipeline,代码如下:

     from scrapy.item import Item
     import pymongo


     class MongoDBPipeline(object):


        DB_URI = 'mongodb://localhost:27017/'
        DB_NAME = 'scrapy_data'


        def open_spider(self, spider):
           self.client = pymongo.MongoClient(self.DB_URI)
           self.db = self.client[self.DB_NAME]


        def close_spider(self, spider):
           self.client.close()


        def process_item(self, item, spider):
           collection = self.db[spider.name]
           post = dict(item) if isinstance(item, Item) else item
           collection.insert_one(post)
           return item

对上述代码解释如下。

● 在类属性中定义两个常量:

阅读 ‧ 电子书库 DB_URI 数据库的URI地址。

阅读 ‧ 电子书库 DB_NAME 数据库的名字。

● 在Spider整个爬取过程中,数据库的连接和关闭操作只需要进行一次,应在开始处理数据之前连接数据库,并在处理完所有数据之后关闭数据库。因此实现以下两个方法(在Spider打开和关闭时被调用):

阅读 ‧ 电子书库 open_spider(spider)

阅读 ‧ 电子书库 close_spider(spider)

分别在open_spider和close_spider方法中实现数据库的连接与关闭。

● 在process_item中实现MongoDB数据库的写入操作,使用self.db和spider.name获取一个集合(collection),然后将数据插入该集合,集合对象的insert_one方法需传入一个字典对象(不能传入Item对象),因此在调用前先对item的类型进行判断,如果item是Item对象,就将其转换为字典。

接下来测试MongoDBPipeline,在配置文件settings.py中启用MongoDBPipeline:

     ITEM_PIPELINES = {
        'example.pipelines.PriceConverterPipeline': 300,
        'example.pipelines.MongoDBPipeline': 400,
     }

运行爬虫,并查看数据库中的结果:

          $ scrapy crawl books
          ...
          $ mongo
          MongoDB shell version: 2.4.9
          connecting to: test
          > use scrapy_data
          switched to db scrapy_data
          > db.books.count()
          1000
          > db.books.find()
          { "_id" : ObjectId("58ae39a89dcd191973cc588f"), "price" : "¥441.64", "name" : "A Light in the
     Attic" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5890"), "price" : "¥458.45", "name" : "Tipping the
     Velvet" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5891"), "price" : "¥427.40", "name" : "Soumission" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5892"), "price" : "¥407.95", "name" : "Sharp Objects" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5893"), "price" : "¥462.63", "name" : "Sapiens: A Brief
     History of Humankind" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5894"), "price" : "¥193.22", "name" : "The Requiem
     Red" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5895"), "price" : "¥284.42", "name" : "The Dirty Little
     Secrets of Getting Your Dream Job" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5896"), "price" : "¥152.96", "name" : "The Coming
     Woman: A Novel Based on the Life of the Infamous Feminist, Victoria Woodhull" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5897"), "price" : "¥192.80", "name" : "The Boys in the
     Boat: Nine Americans and Their Epic Quest for Gold at the 1936 Berlin Olympics" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5898"), "price" : "¥444.89", "name" : "The Black Maria" }
          { "_id" : ObjectId("58ae39a89dcd191973cc5899"), "price" : "¥119.35", "name" : "Starving Hearts
     (Triangular Trade Trilogy, #1)" }
          { "_id" : ObjectId("58ae39a89dcd191973cc589a"), "price" : "¥176.25", "name" : "Shakespeare's
     Sonnets" }
          { "_id" : ObjectId("58ae39a89dcd191973cc589b"), "price" : "¥148.95", "name" : "Set Me Free" }
          { "_id" : ObjectId("58ae39a89dcd191973cc589c"), "price" : "¥446.08", "name" : "Scott Pilgrim's
     Precious Little Life (Scott Pilgrim #1)" }
          { "_id" : ObjectId("58ae39a89dcd191973cc589d"), "price" : "¥298.75", "name" : "Rip it Up and Start
     Again" }
          { "_id" : ObjectId("58ae39a89dcd191973cc589e"), "price" : "¥488.39", "name" : "Our Band Could Be
     Your Life: Scenes from the American Indie Underground, 1981-1991" }
          { "_id" : ObjectId("58ae39a89dcd191973cc589f"), "price" : "¥203.72", "name" : "Olio" }
          { "_id" : ObjectId("58ae39a89dcd191973cc58a0"), "price" : "¥320.68", "name" : "Mesaerion: The
     Best Science Fiction Stories 1800-1849" }
          { "_id" : ObjectId("58ae39a89dcd191973cc58a1"), "price" : "¥437.89", "name" : "Libertarianism for
     Beginners" }
          { "_id" : ObjectId("58ae39a89dcd191973cc58a2"), "price" : "¥385.34", "name" : "It's Only the
     Himalayas" }
          Type "it" for more

在上述实现中,数据库的URI地址和数据库的名字硬编码在代码中,如果希望通过配置文件设置它们,只需稍作改动,代码如下:

     from scrapy.item import Item
     import pymongo


     class MongoDBPipeline(object):


        @classmethod
        def from_crawler(cls, crawler):
           cls.DB_URI = crawler.settings.get('MONGO_DB_URI',
                                        'mongodb://localhost:27017/')
           cls.DB_NAME = crawler.settings.get('MONGO_DB_NAME', 'scrapy_data')


           return cls()


        def open_spider(self, spider):
           self.client = pymongo.MongoClient(self.DB_URI)
           self.db = self.client[self.DB_NAME]
     def close_spider(self, spider):
        self.client.close()


     def process_item(self, item, spider):
        collection = self.db[spider.name]
        post = dict(item) if isinstance(item, Item) else item
        collection.insert_one(post)


        return item

对上述改动解释如下:

● 增加类方法from_crawler(cls, crawler),替代在类属性中定义DB_URI和DB_NAME。

● 如果一个Item Pipeline定义了from_crawler方法,Scrapy就会调用该方法来创建Item Pipeline对象。该方法有两个参数:

阅读 ‧ 电子书库 cls Item Pipeline类的对象(这里为MongoDBPipeline类对象)。

阅读 ‧ 电子书库 crawler Crawler是Scrapy中的一个核心对象,可以通过crawler的settings属性访问配置文件。

● 在from_crawler方法中,读取配置文件中的MONGO_DB_URI和MONGO_DB_NAME(不存在使用默认值),赋给cls的属性,即MongoDBPipeline类属性。

● 其他代码并没有任何改变,因为这里只是改变了设置MongoDBPipeline类属性的方式。

现在,我们可在配置文件settings.py中对所要使用的数据库进行设置:

     MONGO_DB_URI = 'mongodb://192.168.1.105:27017/'
     MONGO_DB_NAME = 'liushuo_scrapy_data'

5.3 本章小结

本章学习了如何使用Item Pipeline对爬取到的数据进行处理,先以一个简单的例子讲解了Item Pipeline的应用场景以及具体使用,然后展示了Item Pipeline实际应用的两个例子。