您好,登錄后才能下訂單哦!
python中怎么實現分布式抓取網頁,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
[python]
view plaincopy
'''''
Created on 2010-9-15
@author: chenggong
'''
import urllib2
import re
import socket
DEBUG = 0
'''''
工具類
'''
class Tools():
#log函數
@staticmethod
def writelog(level,info,notify=False):
if DEBUG == 0:
try:
print "["+level+"]"+info.decode('UTF-8').encode('GBK')
except:
print "["+level+"]"+info.encode('GBK')
else:
print "["+level+"]"+info
#if notify:
# print "[notify]報告管理員!!"
#轉unicode
@staticmethod
def toUnicode(s,charset):
if( charset == "" ):
return s
else:
try:
u = unicode( s, charset )
except:
u = ""
return u
#正則抓取
#@param single 是否只抓取一個
@staticmethod
def getFromPatten(patten,src,single=False):
rst = "";
p = re.compile(patten,re.S)
all = p.findall(src)
for matcher in all:
rst += matcher + " "
if( single ):
break
return rst.strip()
'''''
網頁內容爬蟲
'''
class PageGripper():
URL_OPEN_TIMEOUT = 10 #網頁超時時間
MAX_RETRY = 3 #最大重試次數
def __init__(self):
socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)
#獲取字符集
def getCharset(self,s):
rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)
if rst != "":
if rst == "utf8":
rst = "utf-8"
return rst
#嘗試獲取頁面
def downloadUrl(self,url):
charset = ""
page = ""
retry = 0
while True:
try:
fp = urllib2.urlopen(url)
break
except urllib2.HTTPError,e: #狀態錯誤
Tools.writelog('error','HTTP狀態錯誤 code='+e.code)
raise urllib2.HTTPError
except urllib2.URLError,e: #網絡錯誤超時
Tools.writelog('warn','頁面訪問超時,重試..')
retry+=1
if( retry > self.MAX_RETRY ):
Tools.writelog('warn','超過最大重試次數,放棄')
raise urllib2.URLError
while True:
line = fp.readline()
if charset == "":
charset = self.getCharset(line)
if not line:
break
page += Tools.toUnicode(line,charset)
fp.close()
return page
#獲取頁面
def getPageInfo(self,url):
Tools.writelog( "info","開始抓取網頁,url= "+url)
info = ""
try:
info = self.downloadUrl(url)
except:
raise
Tools.writelog("debug","網頁抓取成功")
return info
'''''
內容提取類
'''
class InfoGripper():
pageGripper = PageGripper()
def __init__(self):
Tools.writelog('debug',"爬蟲啟動")
#抓取標題
def griptitle(self,data):
title = Tools.getFromPatten(u'box2t sp"><h4>(.*?)</h4>', data, True)
if title == "":
title = Tools.getFromPatten(u'<title>(.*?)[-<]',data,True)
return title.strip()
#抓取頻道
def gripchannel(self,data):
zone = Tools.getFromPatten(u'頻道:(.*?)</span>',data,True)
channel = Tools.getFromPatten(u'<a.*?>(.*?)</a>',zone,True)
return channel
#抓取標簽
def griptag(self,data):
zone = Tools.getFromPatten(u'標簽:(.*?)</[^a].*>',data,True);
rst = Tools.getFromPatten(u'>(.*?)</a>',zone,False);
return rst
#抓取觀看次數
def gripviews(self,data):
rst = Tools.getFromPatten(u'已經有<em class="hot" id="viewcount">(.*?)</em>次觀看',data);
return rst
#抓取發布時間
def griptime(self,data):
rst = Tools.getFromPatten(u'在<em>(.*?)</em>發布',data,True)
return rst
#抓取發布者
def gripuser(self,data):
rst = Tools.getFromPatten(u'title="點擊進入(.*?)的用戶空間"',data,True)
return rst
#獲取頁面字符集
def getPageCharset(self,data):
charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)
if( charset == "utf8" ):
charset = "utf-8"
return charset
#獲取CC相關數據
def getCCData(self,data):
zone = Tools.getFromPatten(u'SWFObject(.*?)</script>',data,True)
#判斷是否使用bokecc播放
isFromBokeCC = re.match('.*bokecc.com.*', zone)
if( not isFromBokeCC ):
return "",""
ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)
ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)
return ccSiteId,ccVid
#獲取站內vid
def gripVideoId(self,data):
vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)
return vid
#獲取點擊量
def gripViewsAjax(self,vid,url,basedir):
host = Tools.getFromPatten(u'http://(.*?)/',url,True)
ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid
'''''
try:
content = self.pageGripper.getPageInfo(ajaxAddr)
except Exception,e:
print e
Tools.writelog ("error", ajaxAddr+u"抓取失敗")
return "error"
'''
Tools.writelog('debug', u"開始獲取點擊量,url="+ajaxAddr)
while True:
try:
fp = urllib2.urlopen(ajaxAddr)
break
except urllib2.HTTPError,e: #狀態錯誤
Tools.writelog('error','HTTP狀態錯誤 code='+"%d"%e.code)
return ""
except urllib2.URLError,e: #網絡錯誤超時
Tools.writelog('warn','頁面訪問超時,重試..')
retry+=1
if( retry > self.MAX_RETRY ):
Tools.writelog('warn','超過最大重試次數,放棄')
return ""
content = fp.read()
fp.close()
views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)
views = views.replace('"','')
return views
#從網頁內容中爬取點擊量
def gripViewsFromData(self,data):
views = Tools.getFromPatten(u'已經有<.*?>(.*?)<.*?>次觀看',data,True)
return views
def gripBaseDir(self,data):
dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)
return dir
#抓取數據
def gripinfo(self,url):
try:
data = self.pageGripper.getPageInfo(url)
except:
Tools.writelog ("error", url+" 抓取失敗")
raise
Tools.writelog('info','開始內容匹配')
rst = {}
rst['title'] = self.griptitle(data)
rst['channel'] = self.gripchannel(data)
rst['tag'] = self.griptag(data)
rst['release'] = self.griptime(data)
rst['user'] = self.gripuser(data)
ccdata = self.getCCData(data)
rst['ccsiteId'] = ccdata[0]
rst['ccVid'] = ccdata[1]
views = self.gripViewsFromData(data)
if views =="" or not views:
vid = self.gripVideoId(data)
basedir = self.gripBaseDir(data)
views = self.gripViewsAjax(vid,url,basedir)
if( views == "" ):
views = "error"
if( views == "error"):
Tools.writelog("error","獲取觀看次數失敗")
Tools.writelog("debug","點擊量:"+views)
rst['views'] = views
Tools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))
return rst
'''''
單元測試
'''
if __name__ == '__main__':
list = [
'http://008yx.com/xbsp/index.php/video/index/3138',
'http://vblog.xwhb.com/index.php/video/index/4067',
'http://demo.ccvms.bokecc.com/index.php/video/index/3968',
'http://vlog.cnhubei.com/wuhan/20100912_56145.html',
'http://vlog.cnhubei.com/html/js/30271.html',
'http://www.ddvtv.com/index.php/video/index/15',
'http://boke.2500sz.com/index.php/video/index/60605',
'http://video.zgkqw.com/index.php/video/index/334',
'http://yule.hitmv.com/html/joke/27041.html',
'http://www.ddvtv.com/index.php/video/index/11',
'http://www.zgnyyy.com/index.php/video/index/700',
'http://www.kdianshi.com/index.php/video/index/5330',
'http://www.aoyatv.com/index.php/video/index/127',
'http://v.ourracing.com/html/channel2/64.html',
'http://v.zheye.net/index.php/video/index/93',
'http://vblog.thmz.com/index.php/video/index/7616',
'http://kdianshi.com/index.php/video/index/5330',
'http://tv.seeyoueveryday.com/index.php/video/index/95146',
'http://sp.zgyangzhi.com/html/ji/2.html',
'http://www.xjapan.cc/index.php/video/index/146',
'http://www.jojy.cn/vod/index.php/video/index/399',
'http://v.cyzone.cn/index.php/video/index/99',
]
list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']
infoGripper = InfoGripper()
for url in list:
infoGripper.gripinfo(url)
del infoGripper
WEB服務及任務調度
[python]
view plaincopy
'''''
Created on 2010-9-15
@author: chenggong
'''
# -*- coding: utf-8 -*-
import string,cgi,time
from os import curdir,sep
from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
from InfoGripper import *
import re
import MySQLdb
import time
import threading
import urllib
import urllib2
PORT = 8079
VERSION = 0.1
DBCHARSET = "utf8"
PARAMS = [
'callback',
'sessionId',
'retry',
'retryInterval',
'dbhost',
'dbport',
'db',
'dbuser',
'dbpass',
'videoId'
]
DBMAP = ['video_id',
'ccsiteid',
'ccvid',
'desc_url',
'site_id',
'title',
'post_time',
'author',
'elapse',
'channel',
'tags',
'create_time',
'check_time',
'status']
'''''
ERROR CODE定義
'''
ERR_OK = 0
ERR_PARAM = 1
ERR_HTTP_TIMEOUT = 5
ERR_HTTP_STATUS = 6
ERR_DB_CONNECT_FAIL = 8
ERR_DB_SQL_FAIL = 9
ERR_GRIPVIEW = 11
ERR_UNKNOW = 12
'''''
數據庫適配器
'''
class DBAdapter(object):
def __init__(self):
self.param = {'ip':'',
'port':0,
'user':'',
'pw':'',
'db':''}
self.connect_once = False #是否連接過數據庫
'''''
創建/更新數據庫連接池
'''
def connect(self,ip,port,user,pw,db):
if( ip != self.param['ip'] or
port != self.param['port'] or
user != self.param['user'] or
pw != self.param['pw'] or
db != self.param['db']):
Tools.writelog('info','更換數據庫連接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)
try:
if self.connect_once == True: #釋放上次連接
self.cur.close()
self.conn.close()
self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))
self.conn.set_character_set(DBCHARSET)
self.connect_once = True
self.cur=self.conn.cursor(MySQLdb.cursors.Cursor)
self.param['ip'] = ip
self.param['port'] = port
self.param['user'] = user
self.param['pw'] = pw
self.param['db'] = db
except:
Tools.writelog('error',u'數據庫連接失敗',True)
raise
else:
Tools.writelog('info',u'數據庫連接成功')
'''''
執行SQL語句
'''
def execute(self,sql):
Tools.writelog('debug',u'執行SQL: '+sql)
try:
self.cur.execute(sql)
except:
Tools.writelog('error',u'SQL執行錯誤:'+sql)
raise
'''''
查詢數據庫
'''
def query(self,sql):
row = {}
self.execute(sql)
row=self.cur.fetchall()
return row
'''''
視頻錯誤
'''
def updateErr(self,videoId):
nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
sql = "UPDATE videos SET "
sql += "check_time='" + nowtime +"',"
sql += "status=-1 "
sql += "WHERE video_id="+videoId
self.execute(sql)
self.conn.commit()
'''''
更新查詢結果
'''
def update(self,obj,videoId,isUpdateTitle=True):
Tools.writelog('debug','開始更新數據庫')
try:
#更新video表
sql = "UPDATE videos SET "
if(obj['ccsiteId'] !="" ):
sql += "ccsiteid='" + obj['ccsiteId'] + "',"
if(obj['ccVid'] != "" ):
sql += "ccvid='" + obj['ccVid'] + "',"
if isUpdateTitle:
sql += "title='" + obj['title'] + "',"
sql += "post_time='" + obj['release'] + "',"
sql += "author='" + obj['user'] + "',"
sql += "channel='" + obj['channel'] + "',"
sql += "tags='" + obj['tag'] + "',"
nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
sql += "check_time='" + nowtime +"',"
sql += "status=0 "
sql += "WHERE video_id="+videoId
self.execute(sql)
#更新count表
if( obj['views'] != 'error' ):
nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))
sql = "SELECT * FROM counts WHERE "
sql += "date = '" + nowdate + "' and video_id=" + videoId
rst = self.query(sql)
if len(rst) > 0:#如果當天已有記錄,則更新
sql = "UPDATE counts SET count="+obj['views']
sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"
else:#否則插入
sql = "INSERT INTO counts VALUES"
sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"
self.execute(sql)
self.conn.commit()
Tools.writelog('debug', "db commit ok")
return ERR_OK
except Exception,e:
print e
return ERR_DB_SQL_FAIL
'''''
任務線程類
'''
class TaskThread(threading.Thread):
def setTaskTool(self,dbAdapter,gripper):
self.dbAdapter = dbAdapter
self.gripper = gripper
def setParam(self,param):
self.param = param
self.videoId = param['videoId']
assert self.videoId != ""
def init(self):
self.views = "0"
self.errcode = ERR_OK
def run(self):
Tools.writelog('debug','開始爬蟲任務,sessionId='+self.param['sessionId'])
self.init()
try:
#更新數據庫連接
self.dbAdapter.connect(self.param['dbhost'],self.param['dbport'],self.param['dbuser'],self.param['dbpass'],self.param['db'])
except:
self.errcode = ERR_DB_CONNECT_FAIL #數據庫連接失敗
callback(self.errcode)
return
#查詢該vid的視頻
sql = "SELECT "
for column in DBMAP:
sql += column
if column != DBMAP[len(DBMAP)-1]:
sql += ","
sql += " FROM videos"
sql += " WHERE video_id="+self.videoId
video = self.dbAdapter.query(sql)
assert not (len(video)>1 or len(video)==0) #有且僅有一條記錄
url = video[0][3]
assert url != ""
try:
rst = self.gripper.gripinfo(url)
except urllib2.HTTPError,e:
self.errcode = ERR_HTTP_STATUS #HTTP狀態錯誤
self.dbAdapter.updateErr(self.videoId)
except urllib2.URLError,e:
self.errcode = ERR_HTTP_TIMEOUT #HTTP連接超時
self.dbAdapter.updateErr(self.videoId)
except:
self.errcode = ERR_UNKNOW #未知錯誤
self.dbAdapter.updateErr(self.videoId)
else:
self.views = rst['views']
if self.views == "error":
self.views = "-1"
self.errcode = ERR_GRIPVIEW #數據抓取成功,點擊量抓取失敗
#更新數據庫(特殊處理,如果原title中有 "-" 則不更新title字段)
title = video[0][5]
assert title != ""
if re.match('.*-.*', title):
self.errocde = self.dbAdapter.update(rst,self.videoId,True)
else:
self.errcode = self.dbAdapter.update(rst,self.videoId)
self.callback(self.errcode)
Tools.writelog('info','任務結束,sessionId='+self.param['sessionId'])
return
def callback(self,errcode):
results = {'errorcode':errcode,'count':int(self.views)}
results = urllib.urlencode(results)
results = results.replace('&', '%26')
url = self.param['callback']
url += "?"
url += "sessionId=" + self.param['sessionId']
url += "&results=" + results
retry = 0
while True:
try:
Tools.writelog('debug',"回調主控,url="+url)
urllib2.urlopen(url)
Tools.writelog('debug','回調成功')
break
except urllib2.URLError, e: #超時、錯誤
Tools.writelog('debug','回調主控超時,%s秒后重試'%self.param['retryInterval'])
retry+=1
time.sleep(int(self.param['retryInterval']))
if( retry > int(self.param['retry'])):
Tools.writelog('error','回調主控失敗')
return
'''''
WEB服務類
'''
class MyHandler(BaseHTTPRequestHandler):
dbAdapter = DBAdapter()
gripper = InfoGripper()
def pageSuccess(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def pageFail(self):
self.send_error(404, "not found")
def getValue(self,param):
src = self.path + '&'
reg = param + '=' + '(.*?)&'
value = Tools.getFromPatten(reg,src,True)
return value
def do_GET(self):
isGetVersion = re.match('.*vinfoant/version.*', self.path)
isTask = re.match('.*vinfoant/run.*', self.path)
if( isGetVersion ):
self.pageSuccess()
self.wfile.write(VERSION)
elif( isTask ):
self.pageSuccess()
param = {}
for p in PARAMS:
param[p] = self.getValue(p) #獲取各項參數
taskThread = TaskThread()
taskThread.setTaskTool(self.dbAdapter, self.gripper)
taskThread.setParam(param)
taskThread.start()#啟動任務線程
self.wfile.write("ok")
else:
self.pageFail()
return
'''''
啟動WEB服務,全局入口
'''
def startHttpd():
try:
Tools.writelog('debug','httpd start..listen on '+str(PORT))
httpd = HTTPServer(('',PORT), MyHandler )
Tools.writelog('debug','success')
httpd.serve_forever()
except KeyboardInterrupt:
Tools.writelog('debug','httpd close..')
httpd.socket.close()
if __name__ == '__main__':
startHttpd()
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。