您的位置:首页 > 其它

Scrapy之Extension实例——计算吞吐量及时延

2016-04-19 10:32 351 查看
Scrapy已经有一个extension是用于测量吞吐量的——Log Stats extension,我们就以它为起点。为了计算时延,我们关联了
request_scheduled
response_received
item_scaped
信号。我们给每个信号发生时打上时间戳并相减以计算时延,再累积平均以计算平均值。通过观察这三个信号提供给回调函数的参数可以发现,
item_scaped
只提供了
Response
request_scheduled
提供了
Request
,只有
response_received
信号提供了
Request
Response
参数。不过幸运的是,每个
Response
都有一个成员变量是
Request
,指向它原始的
Request
,还有一个
meta dict
,它和原始的
Request
一样,不用考虑是否经过了重定向。我们可以把时间戳存放在这时。

AutoThrottle extension
使用了相同的机制——使用了
request.meta.get('download_latency')
download_latency
是通过
scrapy/core/downloader/webclient.py
下载器来计算出来的。要想提升你编写中间件的技术,最好的方法是熟悉Scrapy默认的中间件的代码。

这是一个extension的例子:

from time import time

from scrapy.exceptions import NotConfigured
from twisted.internet import task
from scrapy import signals

class Latencies(object):
"""
An extension that measures throughput and latencies.
"""
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)

def __init__(self, crawler):
self.crawler = crawler
self.interval = crawler.settings.getfloat('LATENCIES_INTERVAL')

if not self.interval:
raise NotConfigured

cs = crawler.signals
cs.connect(self._spider_opened, signal=signals.spider_opened)
cs.connect(self._spider_closed, signal=signals.spider_closed)
cs.connect(self._request_scheduled, signal=signals.request_scheduled)
cs.connect(self._response_received, signal=signals.response_received)
cs.connect(self._item_scraped, signal=signals.item_scraped)

self.latency, self.proc_latency, self.items = 0, 0, 0

def _spider_opened(self, spider):
self.task = task.LoopingCall(self._log, spider)
self.task.start(self.interval)

def _spider_closed(self, spider, reason):
if self.task.running:
self.task.stop()

def _request_scheduled(self, request, spider):
request.meta['schedule_time'] = time()

def _response_received(self, response, request, spider):
request.meta['received_time'] = time()

def _item_scraped(self, item, response, spider):
self.latency += time() - response.meta['schedule_time']
self.proc_latency += time() - response.meta['received_time']
self.items += 1

def _log(self, spider):
irate = float(self.items) / self.interval
latency = self.latency / self.items if self.items else 0
proc_latency = self.proc_latency / self.items if self.items else 0

spider.logger.info(("Scraped %d items at %.1f items/s, avg latency: "
"%.2f s and avg time in pipelines: %.2f s") %
(self.items, irate, latency, proc_latency))

self.latency, self.proc_latency, self.items = 0, 0, 0


前方法非常重要,因为它们是很典型的。它们使用了一个
Crawler
对象来初始化中间件。你会在几乎所有重要的中间件中发现这样的代码。
from_crawler(cls, crawler)
是一个抓取
Crawler
对象的方式。然后我们还注意到,
__init__()
方法中使用了
crawler.settings
并且抛出了一个
NotConfigured
异常,如果它没有设置的话。你会发现很多extension会检查相应的项有没有设置。如果没有设置或者为
False
的话就抛出异常。这是一个简单地集成中间件的通用模式,只需在
settings.py
文件中设置相应的项即可,不过默认情况下是禁用的,除非设置了相应的设置项。很多默认的Scrapy中间件(例如,
AutoThrottle
HttpCache
),都使用了这种模式。在我们的例子中,除非
LATENCIES_INTERVAL
已经设置了,否则我们的extension就是禁用的。

稍后在
__init__()
方法中,我们使用
crawler.signals.connect()
方法注册了所有我们感兴趣的信号的回调函数,并且初始化了一些变量。类的剩余部分实现了一些信号处理器。在
_spider_opened()
方法中,我们初始化了一个定时器,每隔
LATENCIES_INTERVAL
秒就会调用我们的
_log()
函数;在
_spider_closed()
函数中,我们停止了那个定时器;在
_request_scheduled()
_response_received()
函数中,我们在
request.meta
中存储了当前的时间戳;在
_item_scaped()
函数中,我们把时延相加,并把抓取到的
Item
数目加1.
_log()
函数负责计算平均值并格式化地输出一些信息,并把累加的值重置以开始另一个抽样周期。

任何一个曾经写过多线程代码的人都会很开心前面的代码中没有互斥量的出现。在这个例子中它们可能并不复杂,但是写单线程的代码仍然要比写多线程的代码更加容易,并且在复杂的环境中运行地更好。

我们可以把上面的一段代码添加到与
settings.py
文件同级目录的
latencies.py
模块中。为了使用这个模块,需要在
settings.py
文件中添加两行代码:

EXTENSIONS = { 'properties.latencies.Latencies': 500, }
LATENCIES_INTERVAL = 5


运行一下:

$ pwd
/root/book/ch08/properties
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000 -s
LOG_LEVEL=INFO
...
INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0
items/min)
INFO: Scraped 0 items at 0.0 items/sec, average latency: 0.00
sec and average time in pipelines: 0.00 sec
INFO: Scraped 115 items at 23.0 items/s, avg latency: 0.84 s
and avg time in pipelines: 0.12 s
INFO: Scraped 125 items at 25.0 items/s, avg latency: 0.78 s
and avg time in pipelines: 0.12 s


第一行的日志记录是由Log Stats extension输出的,接下来的一些日志就是来自我们的extension了。我们可以观察到每秒24个
Item
的吞吐量,平均0.78s的时延,并且在下载之后几乎没有处理时延。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: