早期写的一个抓取程序留个纪念
2014-04-12 22:08
211 查看
#!/usr/bin/env python #-*- coding:utf-8 -*- # 采集公交url地址信息,并保存到文件中 """ 1、源地址http://bus.mapbar.com/beijing/poi/5af90Q5CN8BH 2、匹配内容的模型类 3、数据保存子类 4、逻辑控制类 """ import abc from Queue import Queue from threading import Thread, Lock, stack_size from BeautifulSoup import BeautifulSoup import urllib2, cookielib from gzip import GzipFile from StringIO import StringIO import re, logging import MySQLdb import time import random class DataModel: """ 数据模型基类,定义基本接口 """ __metaclass__ = abc.ABCMeta @abc.abstractmethod def analytics(self, data): return class UrlDataModel(DataModel): """ 对url进行分析的数据模型 """ def __init__(self, host, city): self.host = host self.city = city def analytics(self, data): self.data = data lineName = "" #公交线路名 oppLineName = "" #反向线路名 point = [] #公交站点名和url # point_url = [] #公交站url region = "" #内容匹配区域 log = logging.getLogger('BusUrl.UrlDataModel') regex_oppLineName = re.compile('<input[\s]+type="hidden"[\s]+id="h_oppLineName[0-9]+"[\s]+value="(.*?)"[^/]*/>') regex_point = re.compile('<a[\s]+class="thisSite[0-9]*"[\s]+href="(.*?)"[\s]+target=".*?"><b>[0-9]+?</b>(.*?)</a') regex_point2 = re.compile('<div.+?title="(.*?)"><a') regex_lineName = re.compile("<strong>([0-9]+)</strong>路</a>") start_index = 0 soup = BeautifulSoup(self.data) UrlData_dict = dict() while True: div = soup.findAll(id="divId"+str(start_index)) start_index = start_index + 1 if not div: break div = str(div[0]) m = regex_lineName.search(div) if m is None: log.warn("%s regex_lineName search is empty", soup.title.string) continue lineName = m.groups(0) m = regex_oppLineName.search(div) if m is None: log.warn("%s regex_oppLineName search is empty", soup.title.string) continue oppLineName = m.groups(0)[0] # if oppLineName.index("(") > 0: # oppLineName = oppLineName.split("(")[0] m = regex_point.findall(div) m2 = regex_point2.findall(div) if len(m) == 0: log.warn("%s regex_point search is empty", soup.title.string) continue for i in range(0, len(m)): v = m[i] point = [] name = v[1] url = v[0] if v[1].find("span") != -1 and len(m2) >= i: name = m2[i] if not v[0].startswith("http"): point.insert(0,"http://bus.mapbar.com" + url) else: point.insert(0, url) point.insert(1, name) m[i] = tuple(point) point = m UrlData_obj = UrlData(lineName, oppLineName, point, self.city) UrlData_dict[lineName] = UrlData_obj return UrlData_dict class UrlData: """ urldata 对象 """ def __init__(self, lineName, oppLineName, points, city): self.lineName = lineName self.oppLineName = oppLineName self.points = points self.city = city class Db: """ Mysql database class """ def __init__(self, host, user, passwd, db, port, socket, charset): self.log = logging.getLogger('Storage.Db') try: self.conn = MySQLdb.connect(host=host, user=user, passwd=passwd, db=db, port=port, unix_socket=socket, charset=charset) except Exception, e: self.log.error("Database conn failed, error is %s", e) raise def query(self, sql): try: self.conn.query(sql) result = self.conn.store_result(); except Exception, e: self.log.error("Database query failed, error is %s", e) raise def fetchAll(self, sql): try: self.conn.query(sql) result = self.conn.store_result() return result.fetch_row(result.num_rows(), 2) except Exception, e: self.log.error("Database fetchALl failed, error is %s", e) raise def execute(self, sql, val): try: cur = self.conn.cursor() cur.execute(sql, val) self.conn.commit() except Exception, e: self.log.error("Database execute failed, error is %s", e) raise def executemany(self, sql, args): try: cur = self.conn.cursor() rows = cur.executemany(sql, args) self.conn.commit() except Exception, e: self.log.error("Database execute failed, error is %s", e) raise def update(self, sql, args): try: cur = self.conn.cursor() cur.execute(sql, args) self.conn.commit() except Exception,e: self.log.error("Database update failed, error is %s", e) raise def escape(self, string): return self.conn.escape_string(string) class Storage: """ 数据存储基类 """ __metaclass__ = abc.ABCMeta @abc.abstractmethod def save(self, data): return class UrlStorage: """ 保存url数据 """ def __init__(self): self.log = logging.getLogger('Storage.UrlStorage') self.db = Db(host='localhost',user='root',passwd='',db='bj_busonline',port=3306,socket='/tmp/mysql.sock',charset="utf8") def save(self, UrlData): sql = "insert into point values(NULL, %s, %s, %s, %s)" self.db.execute(sql, [UrlData.City]) def saveList(self, UrlData_dict): sql = "insert into point(id, city, lineName, oppLineName, points) values (NULL, %s, %s, %s, %s)" args = list() escape = self.db.escape for k, v in UrlData_dict.iteritems(): tmp_point = "" for val in v.points: tmp_point += val[1] + "-" args.append(tuple([escape(v.city), escape(v.lineName[0]), escape(v.oppLineName), escape(tmp_point[:-1])])) return self.db.executemany(sql, args) def saveUrl(self, urls): sql = "insert into line_url(id, url, isget, url_id) values (NULL, %s, %s, %s)" args = list() escape = self.db.escape for id, url in urls: args.append(tuple([escape(url), 0, id])) self.db.executemany(sql, args) def loadUrl(self): sql = "SELECT * FROM line_url" result=self.db.fetchAll(sql) self.log.info("loadUrl 'SELECT * FROM line_url' return %d", len(result)) return result def loadFromFile(self, filename): import pickle try: f = open(filename, "rb") self.data = pickle.load(f) f.close() except Exception, e: self.log.error('UrlStorage Exception with message %s', e) raise def updateUrl(self, url): sql = "Update line_url Set isget=1 WHERE url=%s" self.db.update(sql, url) class ContentEncodingProcessor(urllib2.BaseHandler): """A handler to add gzip capabilities to urllib2 requests """ # decode def http_response(self, req, resp): old_resp = resp # gzip if resp.headers.get("content-encoding") == "gzip": gz = GzipFile( fileobj=StringIO(resp.read()), mode="r" ) resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code) resp.msg = old_resp.msg # deflate if resp.headers.get("content-encoding") == "deflate": gz = StringIO( deflate(resp.read()) ) resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code) # 'class to add info() and resp.msg = old_resp.msg return resp # deflate support import zlib def deflate(data): # zlib only provides the zlib compress format, not the deflate format; try: # so on top of all there's this workaround: return zlib.decompress(data, -zlib.MAX_WBITS) except zlib.error: return zlib.decompress(data) class Fetcher(): """ 抓取数据 """ def __init__(self, url, DataModel, Storage, ThreadNum, StackSize=32768*16, headers=[]): self.BaseUrl = url self.DataModel = DataModel self.Storage = Storage self.q_task = Queue() self.StackSize = StackSize self.ThreadNum = ThreadNum cookie_support = urllib2.HTTPCookieProcessor(cookielib.CookieJar()) encoding_support = ContentEncodingProcessor() self.opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler, encoding_support) # if headers, add to request self.headers = headers self.q_task.put(url) self.task_done = [] self.lock = Lock() self.urls = dict() self.log = logging.getLogger("BusUrl.Fetcher") self.load() self.thread_pool = [] for i in range(ThreadNum): t = Thread(target=self.threadget) t.start() self.thread_pool.append(t) def load(self): urls = self.Storage.loadUrl() for url in urls: self.urls[url['line_url.url_id']] = url['line_url.url'] if url['line_url.isget'] == 0: self.q_task.put(url['line_url.url']) def __del__(self): """ 保存任务队列,保存运行中的数据 """ time.sleep(0.5) self.q_task.join() def threadget(self): while True: try: stack_size(self.StackSize) self.lock.acquire(True) url = self.q_task.get() req = urllib2.Request(url) if self.headers is not None: for header in self.headers: k,v = header req.add_header(k, v) req.add_header("Referer", url) data = self.get(req) info_list = self.DataModel.analytics(data) self.Storage.saveList(info_list) self.log.info("Complete url %s fetch", url) self.Storage.updateUrl((url)) if url not in self.task_done: self.task_done.append(url) """ 执行过的任务和没有执行过的任务区别 任务的执行状态要随时保存,以便下次从上次未完成的任务开始 """ for k, v in info_list.iteritems(): tmp = [] for val in v.points: if val[0] not in self.urls.values(): self.urls[len(self.urls)]= val[0] tmp.append((len(self.urls), val[0])) self.q_task.put(val[0]) self.Storage.saveUrl(tmp) except Exception, e: self.log.warn("threadget get req %s failed, except %s", req.get_full_url(), e) pass finally: time.sleep(random.randint(30, 40)) self.lock.release() def get(self,req,repeat=3): """ http get, 重复3次 获取的header信息状态问题要报错 报错信息要集中保存 """ try: response = self.opener.open(req) data = response.read() except Exception , what: print what,req if repeat>0: return self.get(req,repeat-1) else: self.log.warn("GET Failed req %s",req) return '' return data def wait(self): for i in range(self.ThreadNum): self.thread_pool[i].join() if __name__ == "__main__": import platform if platform.system() == "Windows": logging.basicConfig(filename = "D:/www/busonline/tools/log.txt", filemode="a", level=logging.NOTSET) else: logging.basicConfig(filename = "/media/D/www/busonline/tools/log.txt", filemode="a", level=logging.NOTSET) filename = "header.txt" headers = [] f = open(filename, "r") for header in f.readlines(): v = header.strip().split("|", 2) headers.append((v[0].strip(), v[1].strip())) UrlDataModel_obj = UrlDataModel("http://bus.mapbar.com", "北京") UrlStorage_obj = UrlStorage() url = "http://bus.mapbar.com/beijing/poi/5af90Q5CN8BH" fetcher = Fetcher(url, UrlDataModel_obj, UrlStorage_obj, 4, headers=headers) fetcher.wait()
代码就这些量不大,基本解决从网页分析到入库部分问题,在保存url时有数据重复的bug没有处理。
数据库表结构
CREATE TABLE `line_url` ( `id` int(11) NOT NULL AUTO_INCREMENT, `url` varchar(255) NOT NULL, `isget` tinyint(1) NOT NULL DEFAULT '0', `url_id` varchar(45) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6865 DEFAULT CHARSET=utf8
CREATE TABLE `point` ( `id` int(11) NOT NULL AUTO_INCREMENT, `city` varchar(45) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, `lineName` varchar(45) DEFAULT NULL, `oppLineName` varchar(45) DEFAULT NULL, `points` varchar(10000) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10416 DEFAULT CHARSET=utf8
相关文章推荐
- Python实现一个简易的网页抓取程序
- 学习数据结构搜索的的意外收获哈。一个简单的基于java的网页抓取程序。
- 一个抓取网页解析内容的程序。
- 开始学习写一个小程序,纪念
- 3D球体一个(纪念第一个opengl程序~~)
- 前天公司面试题-使用B/S模式写一个程序,用来抓取百度或SOSO中对关键字的说明,尽量不使用服务器控件。
- 转发一个PHP抓取网页快照程序
- 一个极其简洁的Python网页抓取程序,自动从雅虎财经抓取股票数据
- 抢火车票利器:分享一个抓取火车票转让信息的小程序
- 3D球体一个(纪念第一个opengl程序~~)
- 本文利用C#和.NET提供的类来轻松创建一个抓取网页内容源代码的程序
- Python 一个抓取糗百的段子的小程序
- 学习API HOOK,编写了一个winsock 的封包抓取程序,可免费使用;
- 无意中翻出的一个早期写的程序晒出来永久保存
- Jsoup初接触-发一个Jsoup抓取图片的程序
- 抢火车票利器:分享一个抓取火车票转让信息的小程序
- 转一个python写的多线程 代理服务器 抓取,保存,验证程序
- 一个抓取电脑屏幕的小控件台程序
- 一个简单的基于Jsoup的HTML信息抓取Java程序
- 发一个python写的多线程 代理服务器 抓取,保存,验证程序,希望喜欢python的朋友和我一起完善它