您的位置:首页 > 数据库 > Redis

scrapy_redis 相关: 将 jobdir 保存的爬虫进度转移到 Redis

2018-07-11 19:07 746 查看

0.参考

 Scrapy 隐含 bug: 强制关闭爬虫后从 requests.queue 读取的已保存 request 数量可能有误

1.说明

Scrapy 设置 jobdir,停止爬虫后,保存文件目录结构:

crawl/apps/
├── requests.queue
│   ├── active.json
│   ├── p0
│   └── p1
├── requests.seen
└── spider.state

requests.queue/p0 文件保存 priority=0 的未调度 request, p-1 对应实际 priority=1 的高优先级 request,转移到 redis 有序集合时,score 值越小排序越靠前,因此取 score 为 -1。以此类推,p1 对应 priority=-1 的低优先级 request。

requests.seen 保存请求指纹过滤器对已入队 request 的 hash 值,每行一个值。

spider.state 涉及自定义属性的持久化存储,不在本文处理范围以内。

 

2.实现代码

import os
from os.path import join
import re
import struct

import redis

def sadd_dupefilter(jobdir, redis_server, name):
"""See python/lib/site-packages/scrapy/dupefilters.py"""

file = join(jobdir, 'requests.seen')
with open(file) as f:
print('Processing %s, it may take minutes...'%file)
key = '%s:dupefilter'%name
for x in f:
redis_server.sadd(key, x.rstrip())
print('Result: {} {}'.format(key, redis_server.scard(key)))

def zadd_requests(jobdir, redis_server, name):
"""See python/lib/site-packages/queuelib/queue.py"""

SIZE_FORMAT = ">L"
SIZE_SIZE = struct.calcsize(SIZE_FORMAT)

key = '%s:requests'%name
queue_dir = join(jobdir, 'requests.queue')
file_list = os.listdir(queue_dir)
file_score_dict = dict([(f, int(f[1:])) for f in file_list
if re.match(r'^p-?\d+$', f)])
for (file, score) in file_score_dict.items():
print('Processing %s, it may take minutes...'%file)
f = open(join(queue_dir, file), 'rb+')
qsize = f.read(SIZE_SIZE)
total_size, = struct.unpack(SIZE_FORMAT, qsize)
f.seek(0, os.SEEK_END)

actual_size = 0
while True:
if f.tell() == SIZE_SIZE:
break
f.seek(-SIZE_SIZE, os.SEEK_CUR)
size, = struct.unpack(SIZE_FORMAT, f.read(SIZE_SIZE))
f.seek(-size-SIZE_SIZE, os.SEEK_CUR)
data = f.read(size)
redis_server.execute_command('ZADD', key, score, data)
f.seek(-size, os.SEEK_CUR)
actual_size += 1
print('total_size {}, actual_size {}, score {}'.format(
total_size, actual_size, score))
print('Result: {} {}'.format(key, redis_server.zlexcount(key, '-', '+')))

if __name__ == '__main__':
name = 'test'
jobdir = '/home/yourproject/crawl/apps'
database_num = 0
# apps/
# ├── requests.queue
# │   ├── active.json
# │   ├── p0
# │   └── p1
# ├── requests.seen
# └── spider.state

password = 'password'
host = '127.0.0.1'
port = '6379'
redis_server = redis.StrictRedis.from_url('redis://:{password}@{host}:{port}/{database_num}'.format(
password=password, host=host,
port=port, database_num=database_num))

sadd_dupefilter(jobdir, redis_server, name)
zadd_requests(jobdir, redis_server, name)

 

 

3.运行结果

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: