第12章
存入数据库
在之前的章节中,曾讨论过将爬取到的数据导出到文件的相关话题,但在某些时候,我们希望将爬取到的数据存储到数据库中,这一章来学习使用Item Pipeline实现Scrapy爬虫和几种常用数据库的接口。
以第8章toscrape_book项目作为环境展开本章内容的讲解。在toscrape_book项目中,我们爬取了网站http://books.toscrape.com中的书籍信息,其中每一本书的信息包括:
● 书名
● 价格
● 评价等级
● 产品编码
● 库存量
● 评价数量
下面我们来学习如何在爬取数据的过程中将书籍信息存储到各种数据库,这些数据库主要有:SQLite、MySQL、MongoDB、Redis。
12.1 SQLite
SQLite是一个文件型轻量级数据库,它的处理速度很快,在数据量不是很大的情况下,使用SQLite足够了。
首先,创建一个供Scrapy使用的SQLite数据库,取名为scrapy.db:
$ sqlite3 scrapy.db
...
sqlite>
接下来,在客户端中创建数据表(Table):
CREATE TABLE books (
upc CHAR(16) NOT NULL PRIMARY KEY,
name VARCHAR(256) NOT NULL,
price VARCHAR(16) NOT NULL,
review_rating INT,
review_num INT,
stock INT
);
在Python中访问SQLite数据库可使用Python标准库中的sqlite3模块。下面是使用sqlite3模块将数据写入SQLite数据库的简单示例:
import sqlite3
#连接数据库,得到Connection 对象
conn = sqlite3.connect('example.db')
#创建Curosr 对象,用来执行SQL语句
cur = conn.cursor()
#创建数据表
cur.execute('CREATE TABLE person (name VARCHAR(32), age INT, sex char(1))')
#插入一条数据
cur.execute('INSERT INTO person VALUES (?,?,?)', ('刘硕', 34, 'M'))
#保存变更, commit 后数据才被实际写入数据库
conn.commit()
#关闭连接
conn.close()
了解了在Python中如何操作SQLite数据库后,接下来编写一个能将爬取到的数据写入SQLite数据库的Item Pipeline。在pipelines.py中实现SQLitePipeline的代码如下:
import sqlite3
class SQLitePipeline(object):
def open_spider(self, spider):
db_name = spider.settings.get('SQLITE_DB_NAME', 'scrapy_defaut.db')
self.db_conn = sqlite3.connect(db_name)
self.db_cur = self.db_conn.cursor()
def close_spider(self, spider):
self.db_conn.commit()
self.db_conn.close()
def process_item(self, item, spider):
self.insert_db(item)
return item
def insert_db(self, item):
values = (
item['upc'],
item['name'],
item['price'],
item['review_rating'],
item['review_num'],
item['stock'],
)
sql = 'INSERT INTO books VALUES (?,?,?,?,?,?)'
self.db_cur.execute(sql, values)
# 每插入一条就commit一次会影响效率
# self.db_conn.commit()
解释上述代码如下:
● open_spider方法在开始爬取数据之前被调用,在该方法中通过spider.settings对象读取用户在配置文件中指定的数据库,然后建立与数据库的连接,将得到的Connection对象和Cursor对象分别赋值给self.db_conn和self.db_cur,以便之后使用。
● process_item方法处理爬取到的每一项数据,在该方法中调用insert_db方法,执行插入数据操作的SQL语句。但需要注意的是,在insert_db中并没有调用连接对象的commit方法,也就意味着此时数据并没有实际写入数据库。如果每插入一条数据都调用一次commit方法,会严重降低程序执行效率,并且我们对数据插入数据库的实时性并没有什么要求,因此可以在爬取完全部数据后再调用commit方法。
● close_spider方法在爬取完全部数据后被调用,在该方法中,调用连接对象的commit方法将之前所有的插入数据操作一次性提交给数据库,然后关闭连接对象。
在配置文件settings.py中指定我们所要使用的SQLite数据库,并启用SQLitePipeline:
SQLITE_DB_NAME = 'scrapy.db'
ITEM_PIPELINES = {
'toscrape_book.pipelines.SQLitePipeline': 400,
}
运行爬虫,并查看数据库:
$ scrapy crawl books
...
$ sqlite3 scrapy.db
SQLite version 3.8.2 2013-12-06 14:53:30
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite> select count(*) from books;
1000
sqlite> select * from books;
a22124811bfa8350|It's Only the Himalayas|£45.17|2|0|19
feb7cc7701ecf901|Olio|£23.88|1|0|19
a34ba96d4081e6a4|Rip it Up and Start Again|£35.02|5|0|19
a18a4f574854aced|Libertarianism for Beginners|£51.33|2|0|19
ce6396b0f23f6ecc|Set Me Free|£17.46|5|0|19
fa9610a50a1bf149|Masks and Shadows|£56.40|2|0|16
3c346ab1e76ae1f6|Obsidian (Lux #1)|£14.86|2|0|16
09b6cc87e62c2c58|Danganronpa Volume 1|£51.99|4|0|16
...
结果表明,我们成功地将1000条数据存储到了SQLite数据库。
12.2 MySQL
MySQL是一个应用极其广泛的关系型数据库,它是开源免费的,可以支持大型数据库,在个人用户和中小企业中成为技术首选。
使用客户端登录MySQL,创建一个供Scrapy使用的数据库,取名为scrapy_db:
$ mysql -hlocalhost -uliushuo -p12345678
...
mysql> CREATE DATABASE scrapy_db CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
Query OK, 1 row affected (0.00 sec)
mysql> USE scrapy_db;
Database changed
接下来,创建存储书籍数据的表:
mysql> CREATE TABLE books (
-> upc CHAR(16) NOT NULL PRIMARY KEY,
-> name VARCHAR(256) NOT NULL,
-> price VARCHAR(16) NOT NULL,
-> review_rating INT,
-> review_num INT,
-> stock INT
-> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.08 sec)
在Python 2中访问MySQL数据库可以使用第三方库MySQL-Python(即MySQLdb),但是MySQLdb不支持Python 3。在Python 3中,可以使用另一个第三方库mysqlclient作为替代,它是基于MySQL-Python开发的,提供了几乎完全相同的接口。因此,在两个Python版本下,可以使用相同的代码访问MySQL。
Python 2使用pip安装MySQL-python:
sudo pip install MySQL-python
Python 3使用pip安装mysqlclient:
sudo pip install mysqlclient
下面是使用MySQLdb将数据写入MySQL数据库的简单示例,与sqlite3的使用几乎完全相同:
import MySQLdb
#连接数据库, 得到Connection 对象
conn = MySQLdb.connect(host='localhost', db='scrapy_db',
user='liushuo', passwd='12345678', charset='utf8')
#创建Curosr 对象,用来执行SQL语句
cur = conn.cursor()
#创建数据表
cur.execute('CREATE TABLE person (name VARCHAR(32), age INT, sex char(1)) \
ENGINE=InnoDB DEFAULT CHARSET=utf8')
#插入一条数据
cur.execute('INSERT INTO person VALUES (%s,%s,%s)', ('刘硕', 34, 'M'))
#保存变更,commit 后数据才被实际写入数据库
conn.commit()
#关闭连接
conn.close()
仿照SQLitePipeline实现MySQLPipeline,代码如下:
import MySQLdb
class MySQLPipeline:
def open_spider(self, spider):
db = spider.settings.get('MYSQL_DB_NAME', 'scrapy_default')
host = spider.settings.get('MYSQL_HOST', 'localhost')
port = spider.settings.get('MYSQL_PORT', 3306)
user = spider.settings.get('MYSQL_USER', 'root')
passwd = spider.settings.get('MYSQL_PASSWORD', 'root')
self.db_conn = MySQLdb.connect(host=host, port=port, db=db,
user=user, passwd=passwd, charset='utf8')
self.db_cur = self.db_conn.cursor()
def close_spider(self, spider):
self.db_conn.commit()
self.db_conn.close()
def process_item(self, item, spider):
self.insert_db(item)
return item
def insert_db(self, item):
values = (
item['upc'],
item['name'],
item['price'],
item['review_rating'],
item['review_num'],
item['stock'],
)
sql = 'INSERT INTO books VALUES (%s,%s,%s,%s,%s,%s)'
self.db_cur.execute(sql, values)
上述代码结构与SQLitePipeline完全相同,不再赘述。
在配置文件settings.py中指定我们所要使用的MySQL数据库,并启用MySQLPipeline:
MYSQL_DB_NAME = 'scrapy_db'
MYSQL_HOST = 'localhost'
MYSQL_USER = 'liushuo'
MYSQL_PASSWORD = '12345678'
ITEM_PIPELINES = {
'toscrape_book.pipelines.MySQLPipeline': 401,
}
运行爬虫,并查看数据库:
结果表明,我们成功地将1000条数据存储到了MySQL数据库。
上述代码中,同样是先执行完全部的插入语句(INSERT INTO),最后一次性调用commit方法提交给数据库。或许在某些情况下,我们的确需要每执行一条插入语句,就立即调用commit方法更新数据库,如爬取过程很长,中途可能被迫中断,这样程序就不能执行到最后的commit。如果在上述代码的insert_db方法中直接添加self.db_conn.commit(),又会使程序执行慢得让人无法忍受。为解决以上难题,下面讲解另一种实现方法。
Scrapy框架自身是使用另一个Python框架Twisted编写的程序,Twisted是一个事件驱动型的异步网络框架,鼓励用户编写异步代码,Twisted中提供了以异步方式多线程访问数据库的模块adbapi,使用该模块可以显著提高程序访问数据库的效率。下面是使用adbapi中的连接池访问MySQL数据库的简单示例:
from twisted.internet import reactor, defer
from twisted.enterprise import adbapi
import threading
dbpool = adbapi.ConnectionPool('MySQLdb', host='localhost', database='scrapy_db',
user='liushuo', password='liushuo', charset='utf8')
def insert_db(tx, item):
print('In Thread:', threading.get_ident())
sql = 'INSERT INTO person VALUES (%s, %s, %s)'
tx.execute(sql, item)
for i in range(1000):
item = ('person%s' % i, 25, 'M')
dbpool.runInteraction(insert_db, item)
reactor.run()
上述代码解释如下:
● adbapi.ConnectionPool方法可以创建一个数据库连接池对象,其中包含多个连接对象,每个连接对象在独立的线程中工作。adbapi只是提供了异步访问数据库的编程框架,在其内部依然使用MySQLdb、sqlite3这样的库访问数据库。ConnectionPool方法的第一个参数就是用来指定使用哪个库访问数据库,其他参数在创建连接对象时使用。
● dbpool.runInteraction(insert_db, item)以异步方式调用instert_db函数,dbpool会选择连接池中的一个连接对象在独立线程中调用insert_db,其中参数item会被传给insert_db的第二个参数,传给insert_db的第一个参数是一个Transaction对象,其接口与Cursor对象类似,可以调用execute方法执行SQL语句,insert_db执行完后,连接对象会自动调用commit方法。
了解了adbapi的使用后,给出第二个版本的MySQLPipeline,代码如下:
from twisted.enterprise import adbapi
class MySQLAsyncPipeline:
def open_spider(self, spider):
db = spider.settings.get('MYSQL_DB_NAME', 'scrapy_default')
host = spider.settings.get('MYSQL_HOST', 'localhost')
port = spider.settings.get('MYSQL_PORT', 3306)
user = spider.settings.get('MYSQL_USER', 'root')
passwd = spider.settings.get('MYSQL_PASSWORD', 'root')
self.dbpool = adbapi.ConnectionPool('MySQLdb', host=host, db=db,
user=user, passwd=passwd, charset='utf8')
def close_spider(self, spider):
self.dbpool.close()
def process_item(self, item, spider):
self.dbpool.runInteraction(self.insert_db, item)
return item
def insert_db(self, tx, item):
values = (
item['upc'],
item['name'],
item['price'],
item['review_rating'],
item['review_num'],
item['stock'],
)
sql = 'INSERT INTO books VALUES (%s,%s,%s,%s,%s,%s)'
tx.execute(sql, values)
通过前面的讲述,相信大家可以轻松理解上述代码,不再过多解释,该版本比之前的版本在执行效率上有显著提高。
12.3 MongoDB
MongoDB是一个面向文档的非关系型数据库(NoSQL),它功能强大、灵活、易于拓展,近年来在多个领域得到广泛应用。
在Python中可以使用第三方库pymongo访问MongoDB数据库,使用pip安装pymongo:
$ sudo pip install pymongo
下面是使用pymongo将数据写入MongoDB数据库的简单示例:
from pymongo import MongoClient
# 连接MongoDB,得到一个客户端对象
client = MongoClient('mongodb://localhost:27017')
# 获取名为scrapy_db的数据库的对象
db = client.scrapy_db
# 获取名为person的集合的对象
collection = db.person
doc = {
'name': '刘硕',
'age': 34,
'sex': 'M',
}
# 将文档插入集合
collection.insert_one(doc)
# 关闭客户端
client.close()
仿照SQLitePipeline实现MongoDBPipeline,代码如下:
from pymongo import MongoClient
from scrapy import Item
class MongoDBPipeline:
def open_spider(self, spider):
db_uri = spider.settings.get('MONGODB_URI', 'mongodb://localhost:27017')
db_name = spider.settings.get('MONGODB_DB_NAME', 'scrapy_default')
self.db_client = MongoClient('mongodb://localhost:27017')
self.db = self.db_client[db_name]
def close_spider(self, spider):
self.db_client.close()
def process_item(self, item, spider):
self.insert_db(item)
return item
def insert_db(self, item):
if isinstance(item, Item):
item = dict(item)
self.db.books.insert_one(item)
解释上述代码如下:
● open_spider方法在开始爬取数据之前被调用,在该方法中通过spider.settings对象读取用户在配置文件中指定的数据库,然后建立与数据库的连接,将得到的MongoClient对象和Database对象分别赋值给self.db_client和self.db,以便之后使用。
● process_item方法处理爬取到的每一项数据,在该方法中调用insert_db方法,执行数据库的插入操作。在insert_db方法中,先将一项数据转换成字典,然后调用insert_one方法将其插入集合books。
● close_spider方法在爬取完全部数据后被调用,在该方法中关闭与数据库的连接。
在配置文件settings.py中指定我们所要使用的MongoDB数据库,并启用MongoDBPipeline:
MONGODB_URI = 'mongodb://localhost:27017'
MONGODB_DB_NAME = 'scrapy_db'
ITEM_PIPELINES = {
'toscrape_book.pipelines.MongoDBPipeline': 403,
}
运行爬虫,并查看数据库:
$ scrapy crawl books
...
$ mongo scrapy_db
MongoDB shell version: 2.4.9
connecting to: scrapy_db
> db.books.count()
1000
> db.books.find()
{ "_id" : ObjectId("58fb48859dcd1928b736ee4f"), "review_rating" : 3, "review_num" : "0", "stock" :
"22", "upc" : "a897fe39b1053632", "price" : "£51.77", "name" : "A Light in the Attic" }
{ "_id" : ObjectId("58fb48859dcd1928b736ee50"), "review_rating" : 1, "review_num" : "0", "stock" :
"19", "upc" : "feb7cc7701ecf901", "price" : "£23.88", "name" : "Olio" }
{ "_id" : ObjectId("58fb48859dcd1928b736ee51"), "review_rating" : 2, "review_num" : "0", "stock" :
"19", "upc" : "a18a4f574854aced", "price" : "£51.33", "name" : "Libertarianism for Beginners" }
{ "_id" : ObjectId("58fb48859dcd1928b736ee52"), "review_rating" : 1, "review_num" : "0", "stock" :
"19", "upc" : "e30f54cea9b38190", "price" : "£37.59", "name" : "Mesaerion: The Best Science Fiction
Stories 1800-1849" }
{ "_id" : ObjectId("58fb48859dcd1928b736ee53"), "review_rating" : 5, "review_num" : "0", "stock" :
"19", "upc" : "a34ba96d4081e6a4", "price" : "£35.02", "name" : "Rip it Up and Start Again" }
{ "_id" : ObjectId("58fb48859dcd1928b736ee54"), "review_rating" : 2, "review_num" : "0", "stock" :
"19", "upc" : "a22124811bfa8350", "price" : "£45.17", "name" : "It's Only the Himalayas" }
{ "_id" : ObjectId("58fb48859dcd1928b736ee55"), "review_rating" : 5, "review_num" : "0", "stock" :
"19", "upc" : "3b1c02bac2a429e6", "price" : "£52.29", "name" : "Scott Pilgrim's Precious Little Life (Scott
Pilgrim #1)" }
{ "_id" : ObjectId("58fb48859dcd1928b736ee56"), "review_rating" : 3, "review_num" : "0", "stock" :
"19", "upc" : "deda3e61b9514b83", "price" : "£57.25", "name" : "Our Band Could Be Your Life: Scenes
from the American Indie Underground, 1981-1991" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee57"), "review_rating" : 5, "review_num" : "0", "stock" :
"19", "upc" : "ce6396b0f23f6ecc", "price" : "£17.46", "name" : "Set Me Free" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee58"), "review_rating" : 4, "review_num" : "0", "stock" :
"19", "upc" : "30a7f60cd76ca58c", "price" : "£20.66", "name" : "Shakespeare's Sonnets" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee59"), "review_rating" : 2, "review_num" : "0", "stock" :
"19", "upc" : "0312262ecafa5a40", "price" : "£13.99", "name" : "Starving Hearts (Triangular Trade Trilogy,
#1)" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee5a"), "review_rating" : 1, "review_num" : "0", "stock" :
"19", "upc" : "1dfe412b8ac00530", "price" : "£52.15", "name" : "The Black Maria" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee5b"), "review_rating" : 4, "review_num" : "0", "stock" :
"19", "upc" : "e10e1e165dc8be4a", "price" : "£22.60", "name" : "The Boys in the Boat: Nine Americans and
Their Epic Quest for Gold at the 1936 Berlin Olympics" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee5c"), "review_rating" : 1, "review_num" : "0", "stock" :
"19", "upc" : "f77dbf2323deb740", "price" : "£22.65", "name" : "The Requiem Red" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee5d"), "review_rating" : 4, "review_num" : "0", "stock" :
"19", "upc" : "2597b5a345f45e1b", "price" : "£33.34", "name" : "The Dirty Little Secrets of Getting Your
Dream Job" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee5e"), "review_rating" : 3, "review_num" : "0", "stock" :
"19", "upc" : "e72a5dfc7e9267b2", "price" : "£17.93", "name" : "The Coming Woman: A Novel Based on
the Life of the Infamous Feminist, Victoria Woodhull" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee5f"), "review_rating" : 5, "review_num" : "0", "stock" :
"20", "upc" : "4165285e1663650f", "price" : "£54.23", "name" : "Sapiens: A Brief History of Humankind" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee60"), "review_rating" : 4, "review_num" : "0", "stock" :
"20", "upc" : "e00eb4fd7b871a48", "price" : "£47.82", "name" : "Sharp Objects" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee61"), "review_rating" : 1, "review_num" : "0", "stock" :
"20", "upc" : "90fa61229261140a", "price" : "£53.74", "name" : "Tipping the Velvet" }
{ "_id" : ObjectId("58fb48869dcd1928b736ee62"), "review_rating" : 1, "review_num" : "0", "stock" :
"20", "upc" : "6957f44c3847a760", "price" : "£50.10", "name" : "Soumission" }
Type "it" for more
>
结果表明,我们成功地将1000条数据存储到了MongoDB数据库。
12.4 Redis
Redis是一个使用ANSI C编写的高性能Key-Value数据库,使用内存作为主存储,内存中的数据也可以被持久化到硬盘。
在Python中可以使用第三方库redis-py访问Redis数据库,使用pip安装redis-py:
$ sudo pip install redis
下面是使用redis-py将数据写入Redis数据库的简单示例:
import redis
# 连接数据库
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 创建3 条数据
person1 = {
'name': '刘硕',
'age': 34,
'sex': 'M',
}
person2= {
'name': '李雷',
'age': 32,
'sex': 'M',
}
person3= {
'name': '韩梅梅',
'age': 31,
'sex': 'F',
}
# 将3 条数据以Hash 类型(哈希)保存到Redis中
r.hmset('person:1', person1)
r.hmset('person:2', person2)
r.hmset('person:3', person3)
# 关闭连接
r.connection_pool.disconnect()
Redis是Key-Value数据库,一项数据在数据库中就是一个键值对,存储多项同类别的数据时(如Book),通常以item:id这样的形式作为每项数据的键,其中的“:”并没有什么特殊,也可以换成“-”或“/”等,只是大家习惯这样使用。
仿照SQLitePipeline实现RedisPipeline,代码如下:
import redis
from scrapy import Item
class RedisPipeline:
def open_spider(self, spider):
db_host = spider.settings.get('REDIS_HOST', 'localhost')
db_port = spider.settings.get('REDIS_PORT', 6379)
db_index = spider.settings.get('REDIS_DB_INDEX', 0)
self.db_conn = redis.StrictRedis(host=db_host, port=db_port, db=db_index)
self.item_i = 0
def close_spider(self, spider):
self.db_conn.connection_pool.disconnect()
def process_item(self, item, spider):
self.insert_db(item)
return item
def insert_db(self, item):
if isinstance(item, Item):
item = dict(item)
self.item_i += 1
self.db_conn.hmset('book:%s' % self.item_i, item)
解释上述代码如下:
● open_spider方法在开始爬取数据之前被调用,在该方法中通过spider.settings对象读取用户在配置文件中指定的数据库,然后建立与数据库的连接,将得到的连接对象赋值给self.db_conn,以便之后使用,并初始化一个self.item_i作为每项数据的id。在插入一项数据时,使用self.item_i自加的结果构造数据在数据库中的键。
● process_item方法处理爬取到的每一项数据,在该方法中调用insert_db方法执行数据库的插入操作,在insert_db方法中先将一项数据转换成字典,然后调用hmset方法将数据以Hash类型存入Redis数据库。
● close_spider方法在爬取完全部数据后被调用,在该方法中关闭与数据库的连接。
在配置文件settings.py中指定我们所要使用的Redis数据库,并启用RedisPipeline:
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB_INDEX = 0
ITEM_PIPELINES = {
'toscrape_book.pipelines.RedisPipeline': 404,
}
运行爬虫,并查看数据库:
$ scrapy crawl books
$ redis-cli
127.0.0.1:6379> KEYS book:*
1) "book:470"
2) "book:300"
3) "book:801"
4) "book:476"
5) "book:914"
...省略中间输出...
995) "book:703"
996) "book:407"
997) "book:995"
998) "book:569"
999) "book:298"
1000) "book:110"
127.0.0.1:6379> HGETALL book:1
1) "price"
2) "\xc2\xa351.33"
3) "review_rating"
4) "2"
5) "review_num"
6) "0"
7) "name"
8) "Libertarianism for Beginners"
9) "stock"
10) "19"
11) "upc"
12) "a18a4f574854aced"
127.0.0.1:6379> HGETALL book:2
1) "price"
2) "\xc2\xa317.46"
3) "review_rating"
4) "5"
5) "review_num"
6) "0"
7) "name"
8) "Set Me Free"
9) "stock"
10) "19"
11) "upc"
12) "ce6396b0f23f6ecc"
结果表明,我们成功地将1000条数据存储到了Redis数据库。
12.5 本章小结
本章学习了如何将爬取到的数据存储到数据库的相关内容,在Scrapy中可以实现Item Pipeline完成数据库存储的任务,我们以SQLite、MySQL、MongoDB和Redis几种常用数据库为例,先讲解在Python中将数据写入各种数据库的方法,然后实现相应的Item Pipeline。