Scrapy之Extension实例——计算吞吐量及时延
2016-04-19 10:32
351 查看
Scrapy已经有一个extension是用于测量吞吐量的——Log Stats extension,我们就以它为起点。为了计算时延,我们关联了
这是一个extension的例子:
前方法非常重要,因为它们是很典型的。它们使用了一个
稍后在
任何一个曾经写过多线程代码的人都会很开心前面的代码中没有互斥量的出现。在这个例子中它们可能并不复杂,但是写单线程的代码仍然要比写多线程的代码更加容易,并且在复杂的环境中运行地更好。
我们可以把上面的一段代码添加到与
运行一下:
第一行的日志记录是由Log Stats extension输出的,接下来的一些日志就是来自我们的extension了。我们可以观察到每秒24个
request_scheduled、
response_received和
item_scaped信号。我们给每个信号发生时打上时间戳并相减以计算时延,再累积平均以计算平均值。通过观察这三个信号提供给回调函数的参数可以发现,
item_scaped只提供了
Response,
request_scheduled提供了
Request,只有
response_received信号提供了
Request和
Response参数。不过幸运的是,每个
Response都有一个成员变量是
Request,指向它原始的
Request,还有一个
meta dict,它和原始的
Request一样,不用考虑是否经过了重定向。我们可以把时间戳存放在这时。
AutoThrottle extension使用了相同的机制——使用了
request.meta.get('download_latency'),
download_latency是通过
scrapy/core/downloader/webclient.py下载器来计算出来的。要想提升你编写中间件的技术,最好的方法是熟悉Scrapy默认的中间件的代码。
这是一个extension的例子:
from time import time from scrapy.exceptions import NotConfigured from twisted.internet import task from scrapy import signals class Latencies(object): """ An extension that measures throughput and latencies. """ @classmethod def from_crawler(cls, crawler): return cls(crawler) def __init__(self, crawler): self.crawler = crawler self.interval = crawler.settings.getfloat('LATENCIES_INTERVAL') if not self.interval: raise NotConfigured cs = crawler.signals cs.connect(self._spider_opened, signal=signals.spider_opened) cs.connect(self._spider_closed, signal=signals.spider_closed) cs.connect(self._request_scheduled, signal=signals.request_scheduled) cs.connect(self._response_received, signal=signals.response_received) cs.connect(self._item_scraped, signal=signals.item_scraped) self.latency, self.proc_latency, self.items = 0, 0, 0 def _spider_opened(self, spider): self.task = task.LoopingCall(self._log, spider) self.task.start(self.interval) def _spider_closed(self, spider, reason): if self.task.running: self.task.stop() def _request_scheduled(self, request, spider): request.meta['schedule_time'] = time() def _response_received(self, response, request, spider): request.meta['received_time'] = time() def _item_scraped(self, item, response, spider): self.latency += time() - response.meta['schedule_time'] self.proc_latency += time() - response.meta['received_time'] self.items += 1 def _log(self, spider): irate = float(self.items) / self.interval latency = self.latency / self.items if self.items else 0 proc_latency = self.proc_latency / self.items if self.items else 0 spider.logger.info(("Scraped %d items at %.1f items/s, avg latency: " "%.2f s and avg time in pipelines: %.2f s") % (self.items, irate, latency, proc_latency)) self.latency, self.proc_latency, self.items = 0, 0, 0
前方法非常重要,因为它们是很典型的。它们使用了一个
Crawler对象来初始化中间件。你会在几乎所有重要的中间件中发现这样的代码。
from_crawler(cls, crawler)是一个抓取
Crawler对象的方式。然后我们还注意到,
__init__()方法中使用了
crawler.settings并且抛出了一个
NotConfigured异常,如果它没有设置的话。你会发现很多extension会检查相应的项有没有设置。如果没有设置或者为
False的话就抛出异常。这是一个简单地集成中间件的通用模式,只需在
settings.py文件中设置相应的项即可,不过默认情况下是禁用的,除非设置了相应的设置项。很多默认的Scrapy中间件(例如,
AutoThrottle和
HttpCache),都使用了这种模式。在我们的例子中,除非
LATENCIES_INTERVAL已经设置了,否则我们的extension就是禁用的。
稍后在
__init__()方法中,我们使用
crawler.signals.connect()方法注册了所有我们感兴趣的信号的回调函数,并且初始化了一些变量。类的剩余部分实现了一些信号处理器。在
_spider_opened()方法中,我们初始化了一个定时器,每隔
LATENCIES_INTERVAL秒就会调用我们的
_log()函数;在
_spider_closed()函数中,我们停止了那个定时器;在
_request_scheduled()和
_response_received()函数中,我们在
request.meta中存储了当前的时间戳;在
_item_scaped()函数中,我们把时延相加,并把抓取到的
Item数目加1.
_log()函数负责计算平均值并格式化地输出一些信息,并把累加的值重置以开始另一个抽样周期。
任何一个曾经写过多线程代码的人都会很开心前面的代码中没有互斥量的出现。在这个例子中它们可能并不复杂,但是写单线程的代码仍然要比写多线程的代码更加容易,并且在复杂的环境中运行地更好。
我们可以把上面的一段代码添加到与
settings.py文件同级目录的
latencies.py模块中。为了使用这个模块,需要在
settings.py文件中添加两行代码:
EXTENSIONS = { 'properties.latencies.Latencies': 500, } LATENCIES_INTERVAL = 5
运行一下:
$ pwd /root/book/ch08/properties $ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000 -s LOG_LEVEL=INFO ... INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min) INFO: Scraped 0 items at 0.0 items/sec, average latency: 0.00 sec and average time in pipelines: 0.00 sec INFO: Scraped 115 items at 23.0 items/s, avg latency: 0.84 s and avg time in pipelines: 0.12 s INFO: Scraped 125 items at 25.0 items/s, avg latency: 0.78 s and avg time in pipelines: 0.12 s
第一行的日志记录是由Log Stats extension输出的,接下来的一些日志就是来自我们的extension了。我们可以观察到每秒24个
Item的吞吐量,平均0.78s的时延,并且在下载之后几乎没有处理时延。
相关文章推荐
- HDFS高可用
- NSAttributedString(富文本)用法
- jQuery的选择器中的通配符总结
- 2016.04.19 添加在上面在下面
- MySQL安装完可以使用,但是找不到对应的系统服务
- 执行delete、update语句时,出现 Error Code: 1175.解决方法
- 【LeetCode】LeetCode——第9题:Palindrome Number
- Howto - Install MT7610U on CentOS 6.6
- c++ container
- 104. Maximum Depth of Binary Tree
- 常见锁的区别及适用场景
- 你真的会用Fragment了么?-Fragment解析
- windows mongodb 安装
- asp.net发布到IIS中出现错误:处理程序“PageHandlerFactory-Integrated”在其模块列表中有一个错误模块“ManagedPipelineHandler”
- 页面向下滑动实现div显示和隐藏
- android 禁用点击事件的三个属性
- iOS线程通信
- Mat类详解(三)
- 273. Integer to English Words
- 对我有价值的文章记录