4、构建处理数据的流水线Pipeline
在scrapy中,Item Pipeline是处理数据的组件,也是一个类,一个Item Pipeline只负责一种功能的数据处理(典型功能是①清洗数据 ②验证数据有效性 ③过滤重复数据 ④将数据存入数据库)。在一个项目中可以同时启用多个Item Pipeline,它们按指定次序级联起来,形成一条数据处理流水线。
实现Item Pipeline
运行爬虫,查看爬取的信息(前五行)1
2$Scrapy crawl books -o books.csv
$head -5 books.csv #查看文件开头的5行
如果想把书价改为人民币价格(英镑价格×汇率),在pipelines.py中实现PriceConverterPipeline1
2
3
4
5
6
7
8
9
10class PriceConverterPipeline(objedct): # 不需要继承特定基类,只需要实现某些特定方法
exchange_rate = 8.5309
# 一个Item Pipeline必须实现一个process_item方法,它用来处理每项由Spider爬取到的数据
# 2个参数:①Item:爬取到的一项数据(Item或字典) ②Spider:爬取此项数据的Spider对象
def process_item(self,item,spider):
# ①提取item的price字段 ②去掉前面英镑符号,转换为float类型,乘以汇率
price = float(item['price'][1:])*self.exchange_rate
item['price']='¥%.2f'%price #保留2位小数,赋值回item的price字段
return item #返回被处理过的item
【补充1】关于process_item方法返回的item:①返回的数据会递送给下一级Item Pipeline继续处理;②如果process_item在处理某项item时抛出一个DropItem异常(一般在检测到无效数据/想过滤的数据时会抛出它),该项Item便会被抛弃,不再递送给后面的Item Pipeline继续处理,也不会导出到文件。
【补充2】除了必须实现的process_item方法外,还有3个比较常用的方法(可根据需求选择实现):①open_spider(self,spider)→Spider打开时(处理数据前)回调该方法,通常该方法用于在开始处理数据之前完成某些初始化工作,如连接数据库;②close_spider(self,spider)→Spider关闭时(处理数据后)回调该方法,通常该方法用于在处理完所有数据之后完成某些清理工作,如关闭数据库;③from_crawler(cls,crawler)→创建Item Pipeline对象时回调该类方法。一般在该方法中通过crawler.settings读取配置,根据配置创建Item Pipeline对象。
启用Item Pipeline
因为Item Pipeline是可选组件,所以如果想启用某个/些Item Pipeline,需要在settings.py中进行设置。1
ITEM_PIPELINES={'bookspider.pipelines.PriceConverterPipeline':300,}
ITEM_PIPELINES是一个字典,将想启用的Item Pipeline添加到这个字典中,key是每个Item Pipeline类的导入路径,value是一个数值(0~1000),当同时启用多个Item Pipeline时,scrapy会根据这些数值决定每个Item Pipeline处理数据的先后顺序(数值小的先启用)。启用PriceConverterPipeline后,重新运行爬虫,并观察结果
更多例子
【例1】过滤重复数据
为了确保爬取到的书籍信息中没有重复项,可以实现一个去重Item Pipeline。这里,我们就以书名作为主键进行去重,实现DuplicatesPipeline1
2
3
4
5
6
7
8
9
10from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self): #增加构造器方法,初始化用于对书名去重的集合
self.book_set=set()
def process_item(self,item,spider):
name=item['name'] #先取出item的name字段
if name in self.book_set: #检查书名是否已在集合book_set中
raise DropItem("Duplicate book found:%s"%item) #如果存在,就是重复数据,抛出DropItem异常,将item抛弃
self.book_set.add(name) #否则,将item的name字段存入集合
return item
接下来测试DuplicatesPipeline。先在不启用它的情况下运行爬虫,查看结果(此时有1000本书);然后在设置文件中启用它,运行爬虫发现有999本,说明有两本书是同名的,翻越爬虫的log信息可以找到重复值。
【例2】将数据存入MongoDB1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21form scrapy.item import Item
imort pymongo
class MongoDBPipeline(object):
DB_URI = 'mongodb://localhost:27017/' #在类属性中定义2个变量,这个是数据库URI地址
DB_NAME = 'scrapy_data' #数据库名称
#在Spider整个爬取过程中,数据库的连接和关闭操作只需要进行一次(处理数据之前连接,处理完后关闭)
#下面2个方法,在Spider打开和关闭时被调用
def open_spider(self,spider):
self.client = pymongo.MongoClient(self.DB_URI)
self.db = self.client[self.DB_NAME]
def close_spider(self,spider):
self.client.close()
#在process_item中实现MongoDB数据库的写入操作
def process_item(self,item,spider):
collection = self.db[spider.name] #得到一个集合
post = dict(item) if isinstance(item,Item) else item
collection.insert_one(post) #将数据插入该集合,集合对象的insert_one方法需传入一个字典对象,不能传入Item对象,所以在调用前先对item的类型进行判断:如果item是Item对象就转换为字典
return item
接下来测试MongoDBPipeline,在设置文件中启用它,运行爬虫并查看数据库中的结果1
2$scrapy crawl books
$mongo
如果想通过设置文件得到数据库的URI地址、名字,需要在MongoDBPipeline中添加下面代码,并删除在类属性中定义的DB_URI和DB_NAME1
2
3
4
5@classmethod
def from_crawler(cls,crawler): #cls是MongoDBPipeline类对象,crawler的settings属性可访问设置文件
cls.DB_URI = crawler.settings.get('MONGO_DB_URI','mongodb://localhost:27917/')
cls.DB_NAME = crawler.settings.get('MONGO_DB_NAME','scrapy_data') #将读取的信息赋给cls属性
return cls()
然后在设置文件setings.py中,对所要使用的数据库进行设置1
2MONGO_DB_URI = 'mongodb://192.168.1.105:27017/'
MONGO_DB_NAME = 'liushuo_scrapy_data'