您的位置:首页 > 其它

早期写的一个抓取程序留个纪念

2014-04-12 22:08 211 查看
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# 采集公交url地址信息,并保存到文件中
"""
1、源地址http://bus.mapbar.com/beijing/poi/5af90Q5CN8BH
2、匹配内容的模型类
3、数据保存子类
4、逻辑控制类
"""

import abc
from Queue import Queue
from threading import Thread, Lock, stack_size
from BeautifulSoup import BeautifulSoup
import urllib2, cookielib
from gzip import GzipFile
from StringIO import StringIO
import re, logging
import MySQLdb
import time
import random

class DataModel:
"""
数据模型基类,定义基本接口
"""

__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def analytics(self, data):
return

class UrlDataModel(DataModel):
"""
对url进行分析的数据模型
"""
def __init__(self, host, city):
self.host = host
self.city = city

def analytics(self, data):
self.data = data
lineName = ""       #公交线路名
oppLineName = ""    #反向线路名
point = []          #公交站点名和url
#         point_url = []      #公交站url
region = ""         #内容匹配区域

log = logging.getLogger('BusUrl.UrlDataModel')

regex_oppLineName = re.compile('<input[\s]+type="hidden"[\s]+id="h_oppLineName[0-9]+"[\s]+value="(.*?)"[^/]*/>')
regex_point =  re.compile('<a[\s]+class="thisSite[0-9]*"[\s]+href="(.*?)"[\s]+target=".*?"><b>[0-9]+?</b>(.*?)</a')
regex_point2 = re.compile('<div.+?title="(.*?)"><a')
regex_lineName =  re.compile("<strong>([0-9]+)</strong>路</a>")

start_index = 0
soup = BeautifulSoup(self.data)

UrlData_dict = dict()

while True:
div = soup.findAll(id="divId"+str(start_index))
start_index = start_index + 1
if not div:
break

div = str(div[0])
m = regex_lineName.search(div)
if m is None:
log.warn("%s regex_lineName search is empty", soup.title.string)
continue
lineName = m.groups(0)

m = regex_oppLineName.search(div)
if m is None:
log.warn("%s regex_oppLineName search is empty", soup.title.string)
continue
oppLineName = m.groups(0)[0]
#             if oppLineName.index("(") > 0:
#                 oppLineName = oppLineName.split("(")[0]

m = regex_point.findall(div)
m2 = regex_point2.findall(div)
if len(m) == 0:
log.warn("%s regex_point search is empty", soup.title.string)
continue
for i in range(0, len(m)):
v = m[i]
point = []
name = v[1]
url = v[0]
if v[1].find("span") != -1 and len(m2) >= i:
name = m2[i]

if not v[0].startswith("http"):
point.insert(0,"http://bus.mapbar.com" + url)
else:
point.insert(0, url)
point.insert(1, name)
m[i] = tuple(point)
point = m

UrlData_obj = UrlData(lineName, oppLineName, point, self.city)
UrlData_dict[lineName] = UrlData_obj

return UrlData_dict

class UrlData:
"""
urldata 对象
"""
def __init__(self, lineName, oppLineName, points, city):
self.lineName = lineName
self.oppLineName = oppLineName
self.points = points
self.city = city

class Db:
"""
Mysql database class
"""

def __init__(self, host, user, passwd, db, port, socket, charset):
self.log = logging.getLogger('Storage.Db')
try:
self.conn = MySQLdb.connect(host=host, user=user, passwd=passwd, db=db, port=port, unix_socket=socket, charset=charset)
except Exception, e:
self.log.error("Database conn failed, error is %s", e)
raise

def query(self, sql):
try:
self.conn.query(sql)
result = self.conn.store_result();

except Exception, e:
self.log.error("Database query failed, error is %s", e)
raise

def fetchAll(self, sql):
try:
self.conn.query(sql)
result = self.conn.store_result()
return result.fetch_row(result.num_rows(), 2)
except Exception, e:
self.log.error("Database fetchALl failed, error is %s", e)
raise

def execute(self, sql, val):
try:
cur = self.conn.cursor()
cur.execute(sql, val)
self.conn.commit()
except Exception, e:
self.log.error("Database execute failed, error is %s", e)
raise

def executemany(self, sql, args):
try:
cur = self.conn.cursor()
rows = cur.executemany(sql, args)
self.conn.commit()
except Exception, e:
self.log.error("Database execute failed, error is %s", e)
raise

def update(self, sql, args):
try:
cur = self.conn.cursor()
cur.execute(sql, args)
self.conn.commit()
except Exception,e:
self.log.error("Database update failed, error is %s", e)
raise

def escape(self, string):
return self.conn.escape_string(string)

class Storage:
"""
数据存储基类
"""

__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def save(self, data):
return

class UrlStorage:
"""
保存url数据
"""
def __init__(self):
self.log = logging.getLogger('Storage.UrlStorage')

self.db = Db(host='localhost',user='root',passwd='',db='bj_busonline',port=3306,socket='/tmp/mysql.sock',charset="utf8")

def save(self, UrlData):
sql = "insert into point values(NULL, %s, %s, %s, %s)"
self.db.execute(sql, [UrlData.City])

def saveList(self, UrlData_dict):
sql = "insert into point(id, city, lineName, oppLineName, points) values (NULL, %s, %s, %s, %s)"
args = list()
escape = self.db.escape
for k, v in UrlData_dict.iteritems():
tmp_point = ""
for val in v.points:
tmp_point += val[1] + "-"
args.append(tuple([escape(v.city), escape(v.lineName[0]), escape(v.oppLineName), escape(tmp_point[:-1])]))

return self.db.executemany(sql, args)

def saveUrl(self, urls):
sql = "insert into line_url(id, url, isget, url_id) values (NULL, %s, %s, %s)"
args = list()
escape = self.db.escape
for id, url in urls:
args.append(tuple([escape(url), 0, id]))

self.db.executemany(sql, args)

def loadUrl(self):
sql = "SELECT * FROM line_url"
result=self.db.fetchAll(sql)
self.log.info("loadUrl 'SELECT * FROM line_url' return %d", len(result))
return result

def loadFromFile(self, filename):
import pickle
try:
f = open(filename, "rb")
self.data = pickle.load(f)
f.close()
except Exception, e:
self.log.error('UrlStorage Exception with message %s', e)
raise

def updateUrl(self, url):
sql = "Update line_url Set isget=1 WHERE url=%s"
self.db.update(sql, url)

class ContentEncodingProcessor(urllib2.BaseHandler):
"""A handler to add gzip capabilities to urllib2 requests """

# decode
def http_response(self, req, resp):
old_resp = resp
# gzip
if resp.headers.get("content-encoding") == "gzip":
gz = GzipFile(
fileobj=StringIO(resp.read()),
mode="r"
)
resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
# deflate
if resp.headers.get("content-encoding") == "deflate":
gz = StringIO( deflate(resp.read()) )
resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)  # 'class to add info() and
resp.msg = old_resp.msg
return resp

# deflate support
import zlib
def deflate(data):   # zlib only provides the zlib compress format, not the deflate format;
try:               # so on top of all there's this workaround:
return zlib.decompress(data, -zlib.MAX_WBITS)
except zlib.error:
return zlib.decompress(data)

class Fetcher():
"""
抓取数据
"""

def __init__(self, url, DataModel, Storage, ThreadNum, StackSize=32768*16, headers=[]):
self.BaseUrl = url
self.DataModel = DataModel
self.Storage = Storage
self.q_task = Queue()
self.StackSize = StackSize
self.ThreadNum = ThreadNum

cookie_support = urllib2.HTTPCookieProcessor(cookielib.CookieJar())
encoding_support = ContentEncodingProcessor()
self.opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler, encoding_support)

# if headers, add to request
self.headers = headers

self.q_task.put(url)
self.task_done = []
self.lock = Lock()
self.urls = dict()

self.log = logging.getLogger("BusUrl.Fetcher")

self.load()

self.thread_pool = []
for i in range(ThreadNum):
t = Thread(target=self.threadget)
t.start()
self.thread_pool.append(t)

def load(self):
urls = self.Storage.loadUrl()
for url in urls:
self.urls[url['line_url.url_id']] = url['line_url.url']
if url['line_url.isget'] == 0:
self.q_task.put(url['line_url.url'])

def __del__(self):
"""
保存任务队列,保存运行中的数据
"""
time.sleep(0.5)
self.q_task.join()

def threadget(self):
while True:
try:
stack_size(self.StackSize)

self.lock.acquire(True)
url = self.q_task.get()

req = urllib2.Request(url)
if self.headers is not None:
for header in self.headers:
k,v = header
req.add_header(k, v)

req.add_header("Referer", url)
data = self.get(req)
info_list = self.DataModel.analytics(data)
self.Storage.saveList(info_list)

self.log.info("Complete url %s fetch", url)
self.Storage.updateUrl((url))

if url not in self.task_done:
self.task_done.append(url)

"""
执行过的任务和没有执行过的任务区别
任务的执行状态要随时保存,以便下次从上次未完成的任务开始
"""
for k, v in info_list.iteritems():
tmp = []
for val in v.points:
if val[0] not in self.urls.values():
self.urls[len(self.urls)]= val[0]
tmp.append((len(self.urls), val[0]))
self.q_task.put(val[0])
self.Storage.saveUrl(tmp)

except Exception, e:
self.log.warn("threadget get req %s failed, except %s", req.get_full_url(), e)
pass
finally:
time.sleep(random.randint(30, 40))
self.lock.release()

def get(self,req,repeat=3):
"""
http get, 重复3次
获取的header信息状态问题要报错
报错信息要集中保存
"""
try:
response = self.opener.open(req)
data = response.read()
except Exception , what:
print what,req
if repeat>0:
return self.get(req,repeat-1)
else:
self.log.warn("GET Failed req %s",req)
return ''
return data

def wait(self):
for i in range(self.ThreadNum):
self.thread_pool[i].join()

if __name__ == "__main__":
import platform
if platform.system() == "Windows":
logging.basicConfig(filename = "D:/www/busonline/tools/log.txt", filemode="a", level=logging.NOTSET)
else:
logging.basicConfig(filename = "/media/D/www/busonline/tools/log.txt", filemode="a", level=logging.NOTSET)

filename = "header.txt"
headers = []

f = open(filename, "r")
for header in f.readlines():
v = header.strip().split("|", 2)
headers.append((v[0].strip(), v[1].strip()))

UrlDataModel_obj = UrlDataModel("http://bus.mapbar.com", "北京")
UrlStorage_obj = UrlStorage()
url = "http://bus.mapbar.com/beijing/poi/5af90Q5CN8BH"
fetcher = Fetcher(url, UrlDataModel_obj, UrlStorage_obj, 4, headers=headers)
fetcher.wait()


代码就这些量不大,基本解决从网页分析到入库部分问题,在保存url时有数据重复的bug没有处理。

数据库表结构

CREATE TABLE `line_url` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`url` varchar(255) NOT NULL,
`isget` tinyint(1) NOT NULL DEFAULT '0',
`url_id` varchar(45) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6865 DEFAULT CHARSET=utf8


CREATE TABLE `point` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`city` varchar(45) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`lineName` varchar(45) DEFAULT NULL,
`oppLineName` varchar(45) DEFAULT NULL,
`points` varchar(10000) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10416 DEFAULT CHARSET=utf8
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: