python中怎么实现分布式抓取网页,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
[python]
view plaincopy
'''''
Created on 2010-9-15
@author: chenggong
'''
import urllib2
import re
import socket
DEBUG = 0
'''''
工具类
'''
class Tools():
#log函数
@staticmethod
def writelog(level,info,notify=False):
if DEBUG == 0:
try:
print "["+level+"]"+info.decode('UTF-8').encode('GBK')
except:
print "["+level+"]"+info.encode('GBK')
else:
print "["+level+"]"+info
#if notify:
# print "[notify]报告管理员!!"
#转unicode
@staticmethod
def toUnicode(s,charset):
if( charset == "" ):
return s
else:
try:
u = unicode( s, charset )
except:
u = ""
return u
#正则抓取
#@param single 是否只抓取一个
@staticmethod
def getFromPatten(patten,src,single=False):
rst = "";
p = re.compile(patten,re.S)
all = p.findall(src)
for matcher in all:
rst += matcher + " "
if( single ):
break
return rst.strip()
'''''
网页内容爬虫
'''
class PageGripper():
URL_OPEN_TIMEOUT = 10 #网页超时时间
MAX_RETRY = 3 #最大重试次数
def __init__(self):
socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)
#获取字符集
def getCharset(self,s):
rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)
if rst != "":
if rst == "utf8":
rst = "utf-8"
return rst
#尝试获取页面
def downloadUrl(self,url):
charset = ""
page = ""
retry = 0
while True:
try:
fp = urllib2.urlopen(url)
break
except urllib2.HTTPError,e: #状态错误
Tools.writelog('error','HTTP状态错误 code='+e.code)
raise urllib2.HTTPError
except urllib2.URLError,e: #网络错误超时
Tools.writelog('warn','页面访问超时,重试..')
retry+=1
if( retry > self.MAX_RETRY ):
Tools.writelog('warn','超过最大重试次数,放弃')
raise urllib2.URLError
while True:
line = fp.readline()
if charset == "":
charset = self.getCharset(line)
if not line:
break
page += Tools.toUnicode(line,charset)
fp.close()
return page
#获取页面
def getPageInfo(self,url):
Tools.writelog( "info","开始抓取网页,url= "+url)
info = ""
try:
info = self.downloadUrl(url)
except:
raise
Tools.writelog("debug","网页抓取成功")
return info
'''''
内容提取类
'''
class InfoGripper():
pageGripper = PageGripper()
def __init__(self):
Tools.writelog('debug',"爬虫启动")
#抓取标题
def griptitle(self,data):
title = Tools.getFromPatten(u'box2t sp"><h4>(.*?)</h4>', data, True)
if title == "":
title = Tools.getFromPatten(u'<title>(.*?)[-<]',data,True)
return title.strip()
#抓取频道
def gripchannel(self,data):
zone = Tools.getFromPatten(u'频道:(.*?)</span>',data,True)
channel = Tools.getFromPatten(u'<a.*?>(.*?)</a>',zone,True)
return channel
#抓取标签
def griptag(self,data):
zone = Tools.getFromPatten(u'标签:(.*?)</[^a].*>',data,True);
rst = Tools.getFromPatten(u'>(.*?)</a>',zone,False);
return rst
#抓取观看次数
def gripviews(self,data):
rst = Tools.getFromPatten(u'已经有<em class="hot" id="viewcount">(.*?)</em>次观看',data);
return rst
#抓取发布时间
def griptime(self,data):
rst = Tools.getFromPatten(u'在<em>(.*?)</em>发布',data,True)
return rst
#抓取发布者
def gripuser(self,data):
rst = Tools.getFromPatten(u'title="点击进入(.*?)的用户空间"',data,True)
return rst
#获取页面字符集
def getPageCharset(self,data):
charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)
if( charset == "utf8" ):
charset = "utf-8"
return charset
#获取CC相关数据
def getCCData(self,data):
zone = Tools.getFromPatten(u'SWFObject(.*?)</script>',data,True)
#判断是否使用bokecc播放
isFromBokeCC = re.match('.*bokecc.com.*', zone)
if( not isFromBokeCC ):
return "",""
ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)
ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)
return ccSiteId,ccVid
#获取站内vid
def gripVideoId(self,data):
vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)
return vid
#获取点击量
def gripViewsAjax(self,vid,url,basedir):
host = Tools.getFromPatten(u'http://(.*?)/',url,True)
ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid
'''''
try:
content = self.pageGripper.getPageInfo(ajaxAddr)
except Exception,e:
print e
Tools.writelog ("error", ajaxAddr+u"抓取失败")
return "error"
'''
Tools.writelog('debug', u"开始获取点击量,url="+ajaxAddr)
while True:
try:
fp = urllib2.urlopen(ajaxAddr)
break
except urllib2.HTTPError,e: #状态错误
Tools.writelog('error','HTTP状态错误 code='+"%d"%e.code)
return ""
except urllib2.URLError,e: #网络错误超时
Tools.writelog('warn','页面访问超时,重试..')
retry+=1
if( retry > self.MAX_RETRY ):
Tools.writelog('warn','超过最大重试次数,放弃')
return ""
content = fp.read()
fp.close()
views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)
views = views.replace('"','')
return views
#从网页内容中爬取点击量
def gripViewsFromData(self,data):
views = Tools.getFromPatten(u'已经有<.*?>(.*?)<.*?>次观看',data,True)
return views
def gripBaseDir(self,data):
dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)
return dir
#抓取数据
def gripinfo(self,url):
try:
data = self.pageGripper.getPageInfo(url)
except:
Tools.writelog ("error", url+" 抓取失败")
raise
Tools.writelog('info','开始内容匹配')
rst = {}
rst['title'] = self.griptitle(data)
rst['channel'] = self.gripchannel(data)
rst['tag'] = self.griptag(data)
rst['release'] = self.griptime(data)
rst['user'] = self.gripuser(data)
ccdata = self.getCCData(data)
rst['ccsiteId'] = ccdata[0]
rst['ccVid'] = ccdata[1]
views = self.gripViewsFromData(data)
if views =="" or not views:
vid = self.gripVideoId(data)
basedir = self.gripBaseDir(data)
views = self.gripViewsAjax(vid,url,basedir)
if( views == "" ):
views = "error"
if( views == "error"):
Tools.writelog("error","获取观看次数失败")
Tools.writelog("debug","点击量:"+views)
rst['views'] = views
Tools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))
return rst
'''''
单元测试
'''
if __name__ == '__main__':
list = [
'http://008yx.com/xbsp/index.php/video/index/3138',
'http://vblog.xwhb.com/index.php/video/index/4067',
'http://demo.ccvms.bokecc.com/index.php/video/index/3968',
'http://vlog.cnhubei.com/wuhan/20100912_56145.html',
'http://vlog.cnhubei.com/html/js/30271.html',
'http://www.ddvtv.com/index.php/video/index/15',
'http://boke.2500sz.com/index.php/video/index/60605',
'http://video.zgkqw.com/index.php/video/index/334',
'http://yule.hitmv.com/html/joke/27041.html',
'http://www.ddvtv.com/index.php/video/index/11',
'http://www.zgnyyy.com/index.php/video/index/700',
'http://www.kdianshi.com/index.php/video/index/5330',
'http://www.aoyatv.com/index.php/video/index/127',
'http://v.ourracing.com/html/channel2/64.html',
'http://v.zheye.net/index.php/video/index/93',
'http://vblog.thmz.com/index.php/video/index/7616',
'http://kdianshi.com/index.php/video/index/5330',
'http://tv.seeyoueveryday.com/index.php/video/index/95146',
'http://sp.zgyangzhi.com/html/ji/2.html',
'http://www.xjapan.cc/index.php/video/index/146',
'http://www.jojy.cn/vod/index.php/video/index/399',
'http://v.cyzone.cn/index.php/video/index/99',
]
list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']
infoGripper = InfoGripper()
for url in list:
infoGripper.gripinfo(url)
del infoGripper
WEB服务及任务调度
[python]
view plaincopy
'''''
Created on 2010-9-15
@author: chenggong
'''
# -*- coding: utf-8 -*-
import string,cgi,time
from os import curdir,sep
from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
from InfoGripper import *
import re
import MySQLdb
import time
import threading
import urllib
import urllib2
PORT = 8079
VERSION = 0.1
DBCHARSET = "utf8"
PARAMS = [
'callback',
'sessionId',
'retry',
'retryInterval',
'dbhost',
'dbport',
'db',
'dbuser',
'dbpass',
'videoId'
]
DBMAP = ['video_id',
'ccsiteid',
'ccvid',
'desc_url',
'site_id',
'title',
'post_time',
'author',
'elapse',
'channel',
'tags',
'create_time',
'check_time',
'status']
'''''
ERROR CODE定义
'''
ERR_OK = 0
ERR_PARAM = 1
ERR_HTTP_TIMEOUT = 5
ERR_HTTP_STATUS = 6
ERR_DB_CONNECT_FAIL = 8
ERR_DB_SQL_FAIL = 9
ERR_GRIPVIEW = 11
ERR_UNKNOW = 12
'''''
数据库适配器
'''
class DBAdapter(object):
def __init__(self):
self.param = {'ip':'',
'port':0,
'user':'',
'pw':'',
'db':''}
self.connect_once = False #是否连接过数据库
'''''
创建/更新数据库连接池
'''
def connect(self,ip,port,user,pw,db):
if( ip != self.param['ip'] or
port != self.param['port'] or
user != self.param['user'] or
pw != self.param['pw'] or
db != self.param['db']):
Tools.writelog('info','更换数据库连接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)
try:
if self.connect_once == True: #释放上次连接
self.cur.close()
self.conn.close()
self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))
self.conn.set_character_set(DBCHARSET)
self.connect_once = True
self.cur=self.conn.cursor(MySQLdb.cursors.Cursor)
self.param['ip'] = ip
self.param['port'] = port
self.param['user'] = user
self.param['pw'] = pw
self.param['db'] = db
except:
Tools.writelog('error',u'数据库连接失败',True)
raise
else:
Tools.writelog('info',u'数据库连接成功')
'''''
执行SQL语句
'''
def execute(self,sql):
Tools.writelog('debug',u'执行SQL: '+sql)
try:
self.cur.execute(sql)
except:
Tools.writelog('error',u'SQL执行错误:'+sql)
raise
'''''
查询数据库
'''
def query(self,sql):
row = {}
self.execute(sql)
row=self.cur.fetchall()
return row
'''''
视频错误
'''
def updateErr(self,videoId):
nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
sql = "UPDATE videos SET "
sql += "check_time='" + nowtime +"',"
sql += "status=-1 "
sql += "WHERE video_id="+videoId
self.execute(sql)
self.conn.commit()
'''''
更新查询结果
'''
def update(self,obj,videoId,isUpdateTitle=True):
Tools.writelog('debug','开始更新数据库')
try:
#更新video表
sql = "UPDATE videos SET "
if(obj['ccsiteId'] !="" ):
sql += "ccsiteid='" + obj['ccsiteId'] + "',"
if(obj['ccVid'] != "" ):
sql += "ccvid='" + obj['ccVid'] + "',"
if isUpdateTitle:
sql += "title='" + obj['title'] + "',"
sql += "post_time='" + obj['release'] + "',"
sql += "author='" + obj['user'] + "',"
sql += "channel='" + obj['channel'] + "',"
sql += "tags='" + obj['tag'] + "',"
nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
sql += "check_time='" + nowtime +"',"
sql += "status=0 "
sql += "WHERE video_id="+videoId
self.execute(sql)
#更新count表
if( obj['views'] != 'error' ):
nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))
sql = "SELECT * FROM counts WHERE "
sql += "date = '" + nowdate + "' and video_id=" + videoId
rst = self.query(sql)
if len(rst) > 0:#如果当天已有记录,则更新
sql = "UPDATE counts SET count="+obj['views']
sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"
else:#否则插入
sql = "INSERT INTO counts VALUES"
sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"
self.execute(sql)
self.conn.commit()
Tools.writelog('debug', "db commit ok")
return ERR_OK
except Exception,e:
print e
return ERR_DB_SQL_FAIL
'''''
任务线程类
'''
class TaskThread(threading.Thread):
def setTaskTool(self,dbAdapter,gripper):
self.dbAdapter = dbAdapter
self.gripper = gripper
def setParam(self,param):
self.param = param
self.videoId = param['videoId']
assert self.videoId != ""
def init(self):
self.views = "0"
self.errcode = ERR_OK
def run(self):
Tools.writelog('debug','开始爬虫任务,sessionId='+self.param['sessionId'])
self.init()
try:
#更新数据库连接
self.dbAdapter.connect(self.param['dbhost'],self.param['dbport'],self.param['dbuser'],self.param['dbpass'],self.param['db'])
except:
self.errcode = ERR_DB_CONNECT_FAIL #数据库连接失败
callback(self.errcode)
return
#查询该vid的视频
sql = "SELECT "
for column in DBMAP:
sql += column
if column != DBMAP[len(DBMAP)-1]:
sql += ","
sql += " FROM videos"
sql += " WHERE video_id="+self.videoId
video = self.dbAdapter.query(sql)
assert not (len(video)>1 or len(video)==0) #有且仅有一条记录
url = video[0][3]
assert url != ""
try:
rst = self.gripper.gripinfo(url)
except urllib2.HTTPError,e:
self.errcode = ERR_HTTP_STATUS #HTTP状态错误
self.dbAdapter.updateErr(self.videoId)
except urllib2.URLError,e:
self.errcode = ERR_HTTP_TIMEOUT #HTTP连接超时
self.dbAdapter.updateErr(self.videoId)
except:
self.errcode = ERR_UNKNOW #未知错误
self.dbAdapter.updateErr(self.videoId)
else:
self.views = rst['views']
if self.views == "error":
self.views = "-1"
self.errcode = ERR_GRIPVIEW #数据抓取成功,点击量抓取失败
#更新数据库(特殊处理,如果原title中有 "-" 则不更新title字段)
title = video[0][5]
assert title != ""
if re.match('.*-.*', title):
self.errocde = self.dbAdapter.update(rst,self.videoId,True)
else:
self.errcode = self.dbAdapter.update(rst,self.videoId)
self.callback(self.errcode)
Tools.writelog('info','任务结束,sessionId='+self.param['sessionId'])
return
def callback(self,errcode):
results = {'errorcode':errcode,'count':int(self.views)}
results = urllib.urlencode(results)
results = results.replace('&', '%26')
url = self.param['callback']
url += "?"
url += "sessionId=" + self.param['sessionId']
url += "&results=" + results
retry = 0
while True:
try:
Tools.writelog('debug',"回调主控,url="+url)
urllib2.urlopen(url)
Tools.writelog('debug','回调成功')
break
except urllib2.URLError, e: #超时、错误
Tools.writelog('debug','回调主控超时,%s秒后重试'%self.param['retryInterval'])
retry+=1
time.sleep(int(self.param['retryInterval']))
if( retry > int(self.param['retry'])):
Tools.writelog('error','回调主控失败')
return
'''''
WEB服务类
'''
class MyHandler(BaseHTTPRequestHandler):
dbAdapter = DBAdapter()
gripper = InfoGripper()
def pageSuccess(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def pageFail(self):
self.send_error(404, "not found")
def getValue(self,param):
src = self.path + '&'
reg = param + '=' + '(.*?)&'
value = Tools.getFromPatten(reg,src,True)
return value
def do_GET(self):
isGetVersion = re.match('.*vinfoant/version.*', self.path)
isTask = re.match('.*vinfoant/run.*', self.path)
if( isGetVersion ):
self.pageSuccess()
self.wfile.write(VERSION)
elif( isTask ):
self.pageSuccess()
param = {}
for p in PARAMS:
param[p] = self.getValue(p) #获取各项参数
taskThread = TaskThread()
taskThread.setTaskTool(self.dbAdapter, self.gripper)
taskThread.setParam(param)
taskThread.start()#启动任务线程
self.wfile.write("ok")
else:
self.pageFail()
return
'''''
启动WEB服务,全局入口
'''
def startHttpd():
try:
Tools.writelog('debug','httpd start..listen on '+str(PORT))
httpd = HTTPServer(('',PORT), MyHandler )
Tools.writelog('debug','success')
httpd.serve_forever()
except KeyboardInterrupt:
Tools.writelog('debug','httpd close..')
httpd.socket.close()
if __name__ == '__main__':
startHttpd()
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。