预计阅读本页时间:-
第5章
使用Item Pipeline处理数据
在之前的章节中,我们学习了提取数据以及封装数据的方法,这一章来学习如何对爬取到的数据进行处理。在Scrapy中,Item Pipeline是处理数据的组件,一个Item Pipeline就是一个包含特定接口的类,通常只负责一种功能的数据处理,在一个项目中可以同时启用多个Item Pipeline,它们按指定次序级联起来,形成一条数据处理流水线。
以下是Item Pipeline的几种典型应用:
● 清洗数据。
广告:个人专属 VPN,独立 IP,无限流量,多机房切换,还可以屏蔽广告和恶意软件,每月最低仅 5 美元
● 验证数据的有效性。
● 过滤掉重复的数据。
● 将数据存入数据库。
5.1 Item Pipeline
通过一个例子讲解Item Pipeline的使用,在第1章的example项目中,我们爬取到的书籍价格是以英镑为单位的:
$ scrapy crawl books -o books.csv ... $ head -5 books.csv # 查看文件开头的5 行 name,price A Light in the Attic,£51.77 Tipping the Velvet,£53.74 Soumission,£50.10 Sharp Objects,£47.82
如果我们期望爬取到的书价是人民币价格,就需要用英镑价格乘以汇率计算出人民币价格(处理数据),此时可以实现一个价格转换的Item Pipeline来完成这个工作。接下来在example项目中实现它。
5.1.1 实现Item Pipeline
在创建一个Scrapy项目时,会自动生成一个pipelines.py文件,它用来放置用户自定义的Item Pipeline,在example项目的pipelines.py中实现PriceConverterPipeline,代码如下:
class PriceConverterPipeline(object): # 英镑兑换人民币汇率 exchange_rate = 8.5309 def process_item(self, item, spider): # 提取item的price 字段(如£53.74) # 去掉前面英镑符号£,转换为float 类型,乘以汇率 price = float(item['price'][1:]) * self.exchange_rate # 保留2 位小数,赋值回item的price 字段 item['price'] = '¥%.2f' % price return item
对上述代码解释如下:
● 一个Item Pipeline不需要继承特定基类,只需要实现某些特定方法,例如process_item、open_spider、close_spider。
● 一个Item Pipeline必须实现一个process_item(item, spider)方法,该方法用来处理每一项由Spider爬取到的数据,其中的两个参数:
Item 爬取到的一项数据(Item或字典)。
Spider 爬取此项数据的Spider对象。
上述代码中的process_item方法实现非常简单,将书籍的英镑价格转换为浮点数,乘以汇率并保留2位小数,然后赋值回item的price字段,最后返回被处理过的item。
可以看出,process_item方法是Item Pipeline的核心,对该方法还需再做两点补充说明:
● 如果process_item在处理某项item时返回了一项数据(Item或字典),返回的数据会递送给下一级Item Pipeline(如果有)继续处理。
● 如果process_item在处理某项item时抛出(raise)一个DropItem异常(scrapy.exceptions.DropItem),该项item便会被抛弃,不再递送给后面的Item Pipeline继续处理,也不会导出到文件。通常,我们在检测到无效数据或想要过滤数据时,抛出DropItem异常。
除了必须实现的process_item方法外,还有3个比较常用的方法,可根据需求选择实现:
● open_spider(self, spider)
Spider打开时(处理数据前)回调该方法,通常该方法用于在开始处理数据之前完成某些初始化工作,如连接数据库。
● close_spider(self, spider)
Spider关闭时(处理数据后)回调该方法,通常该方法用于在处理完所有数据之后完成某些清理工作,如关闭数据库。
● from_crawler(cls, crawler)
创建Item Pipeline对象时回调该类方法。通常,在该方法中通过crawler.settings读取配置,根据配置创建Item Pipeline对象。
在后面的例子中,我们展示了以上方法的应用场景。
5.1.2 启用Item Pipeline
在Scrapy中,Item Pipeline是可选的组件,想要启用某个(或某些)Item Pipeline,需要在配置文件settings.py中进行配置:
ITEM_PIPELINES = { 'example.pipelines.PriceConverterPipeline': 300, }
ITEM_PIPELINES是一个字典,我们把想要启用的Item Pipeline添加到这个字典中,其中每一项的键是每一个Item Pipeline类的导入路径,值是一个0~1000的数字,同时启用多个Item Pipeline时,Scrapy根据这些数值决定各Item Pipeline处理数据的先后次序,数值小的在前。
启用PriceConverterPipeline后,重新运行爬虫,并观察结果:
$ scrapy crawl books -o books.csv ... $ head -5 books.csv # 查看文件开头的5 行 name,price A Light in the Attic,¥441.64 Tipping the Velvet,¥458.45 Soumission,¥427.40 Sharp Objects,¥407.95
使用PriceConverterPipeline对数据进行处理后,books.csv中的书价转换成了人民币价格。
5.2 更多例子
我们通过一个例子学习了如何使用Item Pipeline对数据进行处理,下面再看两个实际例子。
5.2.1 过滤重复数据
为了确保爬取到的书籍信息中没有重复项,可以实现一个去重Item Pipeline。这里,我们就以书名作为主键(实际应以ISBN编号为主键,但是仅爬取了书名和价格)进行去重,实现DuplicatesPipeline代码如下:
from scrapy.exceptions import DropItem class DuplicatesPipeline(object): def __init__(self): self.book_set = set() def process_item(self, item, spider): name = item['name'] if name in self.book_set: raise DropItem("Duplicate book found: %s" % item) self.book_set.add(name) return item
对上述代码解释如下:
● 增加构造器方法,在其中初始化用于对书名去重的集合。
● 在process_item方法中,先取出item的name字段,检查书名是否已在集合book_set中,如果存在,就是重复数据,抛出DropItem异常,将item抛弃;否则,将item的name字段存入集合,返回item。
接下来测试DuplicatesPipeline。首先在不启用DuplicatesPipeline的情况下,运行爬虫,查看结果:
$ scrapy crawl books -o book1.csv ... $ cat -n book1.csv 1 price,name 2 ¥441.64,A Light in the Attic 3 ¥458.45,Tipping the Velvet 4 ¥427.40,Soumission 5 ¥407.95,Sharp Objects 6 ¥462.63,Sapiens: A Brief History of Humankind 7 ¥193.22,The Requiem Red 8 ¥284.42,The Dirty Little Secrets of Getting Your Dream Job ... 993 ¥317.86,Bounty (Colorado Mountain #7) 994 ¥173.18,Blood Defense (Samantha Brinkman #1) 995 ¥295.60,"Bleach, Vol. 1: Strawberry and the Soul Reapers (Bleach #1)" 996 ¥370.07,Beyond Good and Evil 997 ¥473.72,Alice in Wonderland (Alice's Adventures in Wonderland #1) 998 ¥486.77,"Ajin: Demi-Human, Volume 1 (Ajin: Demi-Human #1)" 999 ¥144.77,A Spy's Devotion (The Regency Spies of London #1) 1000 ¥460.50,1st to Die (Women's Murder Club #1) 1001 ¥222.49,"1,000 Places to See Before You Die"
此时有1000本书。
然后在配置文件settings.py中启用DuplicatesPipeline:
ITEM_PIPELINES = { 'example.pipelines.PriceConverterPipeline': 300, 'example.pipelines.DuplicatesPipeline': 350, }
运行爬虫,对比结果:
$ scrapy crawl books -o book2.csv ... $ cat -n book2.csv 1 name,price 2 A Light in the Attic,¥441.64 3 Tipping the Velvet,¥458.45 4 Soumission,¥427.40 5 Sharp Objects,¥407.95 6 Sapiens: A Brief History of Humankind,¥462.63 7 The Requiem Red,¥193.22 8 The Dirty Little Secrets of Getting Your Dream Job,¥284.42 ... 993 Blood Defense (Samantha Brinkman #1),¥173.18 994 "Bleach, Vol. 1: Strawberry and the Soul Reapers (Bleach #1)",¥295.60 995 Beyond Good and Evil,¥370.07 996 Alice in Wonderland (Alice's Adventures in Wonderland #1),¥473.72 997 "Ajin: Demi-Human, Volume 1 (Ajin: Demi-Human #1)",¥486.77 998 A Spy's Devotion (The Regency Spies of London #1),¥144.77 999 1st to Die (Women's Murder Club #1),¥460.50 1000 "1,000 Places to See Before You Die",¥222.49
只有999本了,比之前少了1本,说明有两本书是同名的,翻阅爬虫的log信息可以找到重复项:
[scrapy.core.scraper] WARNING: Dropped: Duplicate book found: {'name': 'The Star-Touched Queen', 'price': '¥275.55'}
5.2.2 将数据存入MongoDB
有时,我们想把爬取到的数据存入某种数据库中,可以实现Item Pipeline完成此类任务。下面实现一个能将数据存入MongoDB数据库的Item Pipeline,代码如下:
from scrapy.item import Item import pymongo class MongoDBPipeline(object): DB_URI = 'mongodb://localhost:27017/' DB_NAME = 'scrapy_data' def open_spider(self, spider): self.client = pymongo.MongoClient(self.DB_URI) self.db = self.client[self.DB_NAME] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): collection = self.db[spider.name] post = dict(item) if isinstance(item, Item) else item collection.insert_one(post) return item
对上述代码解释如下。
● 在类属性中定义两个常量:
DB_URI 数据库的URI地址。
DB_NAME 数据库的名字。
● 在Spider整个爬取过程中,数据库的连接和关闭操作只需要进行一次,应在开始处理数据之前连接数据库,并在处理完所有数据之后关闭数据库。因此实现以下两个方法(在Spider打开和关闭时被调用):
open_spider(spider)
close_spider(spider)
分别在open_spider和close_spider方法中实现数据库的连接与关闭。
● 在process_item中实现MongoDB数据库的写入操作,使用self.db和spider.name获取一个集合(collection),然后将数据插入该集合,集合对象的insert_one方法需传入一个字典对象(不能传入Item对象),因此在调用前先对item的类型进行判断,如果item是Item对象,就将其转换为字典。
接下来测试MongoDBPipeline,在配置文件settings.py中启用MongoDBPipeline:
ITEM_PIPELINES = { 'example.pipelines.PriceConverterPipeline': 300, 'example.pipelines.MongoDBPipeline': 400, }
运行爬虫,并查看数据库中的结果:
$ scrapy crawl books ... $ mongo MongoDB shell version: 2.4.9 connecting to: test > use scrapy_data switched to db scrapy_data > db.books.count() 1000 > db.books.find() { "_id" : ObjectId("58ae39a89dcd191973cc588f"), "price" : "¥441.64", "name" : "A Light in the Attic" } { "_id" : ObjectId("58ae39a89dcd191973cc5890"), "price" : "¥458.45", "name" : "Tipping the Velvet" } { "_id" : ObjectId("58ae39a89dcd191973cc5891"), "price" : "¥427.40", "name" : "Soumission" } { "_id" : ObjectId("58ae39a89dcd191973cc5892"), "price" : "¥407.95", "name" : "Sharp Objects" } { "_id" : ObjectId("58ae39a89dcd191973cc5893"), "price" : "¥462.63", "name" : "Sapiens: A Brief History of Humankind" } { "_id" : ObjectId("58ae39a89dcd191973cc5894"), "price" : "¥193.22", "name" : "The Requiem Red" } { "_id" : ObjectId("58ae39a89dcd191973cc5895"), "price" : "¥284.42", "name" : "The Dirty Little Secrets of Getting Your Dream Job" } { "_id" : ObjectId("58ae39a89dcd191973cc5896"), "price" : "¥152.96", "name" : "The Coming Woman: A Novel Based on the Life of the Infamous Feminist, Victoria Woodhull" } { "_id" : ObjectId("58ae39a89dcd191973cc5897"), "price" : "¥192.80", "name" : "The Boys in the Boat: Nine Americans and Their Epic Quest for Gold at the 1936 Berlin Olympics" } { "_id" : ObjectId("58ae39a89dcd191973cc5898"), "price" : "¥444.89", "name" : "The Black Maria" } { "_id" : ObjectId("58ae39a89dcd191973cc5899"), "price" : "¥119.35", "name" : "Starving Hearts (Triangular Trade Trilogy, #1)" } { "_id" : ObjectId("58ae39a89dcd191973cc589a"), "price" : "¥176.25", "name" : "Shakespeare's Sonnets" } { "_id" : ObjectId("58ae39a89dcd191973cc589b"), "price" : "¥148.95", "name" : "Set Me Free" } { "_id" : ObjectId("58ae39a89dcd191973cc589c"), "price" : "¥446.08", "name" : "Scott Pilgrim's Precious Little Life (Scott Pilgrim #1)" } { "_id" : ObjectId("58ae39a89dcd191973cc589d"), "price" : "¥298.75", "name" : "Rip it Up and Start Again" } { "_id" : ObjectId("58ae39a89dcd191973cc589e"), "price" : "¥488.39", "name" : "Our Band Could Be Your Life: Scenes from the American Indie Underground, 1981-1991" } { "_id" : ObjectId("58ae39a89dcd191973cc589f"), "price" : "¥203.72", "name" : "Olio" } { "_id" : ObjectId("58ae39a89dcd191973cc58a0"), "price" : "¥320.68", "name" : "Mesaerion: The Best Science Fiction Stories 1800-1849" } { "_id" : ObjectId("58ae39a89dcd191973cc58a1"), "price" : "¥437.89", "name" : "Libertarianism for Beginners" } { "_id" : ObjectId("58ae39a89dcd191973cc58a2"), "price" : "¥385.34", "name" : "It's Only the Himalayas" } Type "it" for more
在上述实现中,数据库的URI地址和数据库的名字硬编码在代码中,如果希望通过配置文件设置它们,只需稍作改动,代码如下:
from scrapy.item import Item import pymongo class MongoDBPipeline(object): @classmethod def from_crawler(cls, crawler): cls.DB_URI = crawler.settings.get('MONGO_DB_URI', 'mongodb://localhost:27017/') cls.DB_NAME = crawler.settings.get('MONGO_DB_NAME', 'scrapy_data') return cls() def open_spider(self, spider): self.client = pymongo.MongoClient(self.DB_URI) self.db = self.client[self.DB_NAME] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): collection = self.db[spider.name] post = dict(item) if isinstance(item, Item) else item collection.insert_one(post) return item
对上述改动解释如下:
● 增加类方法from_crawler(cls, crawler),替代在类属性中定义DB_URI和DB_NAME。
● 如果一个Item Pipeline定义了from_crawler方法,Scrapy就会调用该方法来创建Item Pipeline对象。该方法有两个参数:
cls Item Pipeline类的对象(这里为MongoDBPipeline类对象)。
crawler Crawler是Scrapy中的一个核心对象,可以通过crawler的settings属性访问配置文件。
● 在from_crawler方法中,读取配置文件中的MONGO_DB_URI和MONGO_DB_NAME(不存在使用默认值),赋给cls的属性,即MongoDBPipeline类属性。
● 其他代码并没有任何改变,因为这里只是改变了设置MongoDBPipeline类属性的方式。
现在,我们可在配置文件settings.py中对所要使用的数据库进行设置:
MONGO_DB_URI = 'mongodb://192.168.1.105:27017/' MONGO_DB_NAME = 'liushuo_scrapy_data'
5.3 本章小结
本章学习了如何使用Item Pipeline对爬取到的数据进行处理,先以一个简单的例子讲解了Item Pipeline的应用场景以及具体使用,然后展示了Item Pipeline实际应用的两个例子。