下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

通过python分布式开发 网页数据抓取(三)——代码

作者:课课家教育     来源: http://www.kokojia.com点击数:903发布时间: 2016-01-08 14:04:31

标签: pythonphp数据库

大神带你学编程,欢迎选课

  这一节主要是贴代码

      本程序连跑了24小时,然后分布式在10台机器上部署,长时间续航基本没有问题。

  之后每天将进行10万次网页的爬取。

  源码如下:

  内容爬取及工具

   Created on 2010-9-15

   @author: chenggong

   import urllib2

   import re

   import socket

   DEBUG = 0

   工具类

   class Tools():

   #log函数

   @staticmethod

   def writelog(level,info,notify=False):

   if DEBUG == 0:

   try:

   print "["+level+"]"+info.decode('UTF-8').encode('GBK')

  except:

  print "["+level+"]"+info.encode('GBK')

  else:

  print "["+level+"]"+info

  #if notify:

  # print "[notify]报告管理员!!"

  #转unicode

  @staticmethod

  def toUnicode(s,charset):

  if( charset == "" ):

  return s

  else:

  try:

  u = unicode( s, charset )

  except:

  u = ""

  return u

 

 

#正则抓取

 

#@param single 是否只抓取一个

 

@staticmethod

 

def getFromPatten(patten,src,single=False):

 

rst = "";

 

p = re.compile(patten,re.S)

 

all = p.findall(src)

 

for matcher in all:

 

rst += matcher + " "

 

if( single ):

 

break

 

return rst.strip()

'''

 

网页内容爬虫

 

'''

 

class PageGripper():

 

URL_OPEN_TIMEOUT = 10 #网页超时时间

 

MAX_RETRY = 3 #最大重试次数

 

 

def __init__(self):

 

socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)

 

 

#获取字符集

 

def getCharset(self,s):

 

rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)

 

if rst != "":

 

if rst == "utf8":

 

rst = "utf-8"

 

return rst

 

 

#尝试获取页面

 

def downloadUrl(self,url):

 

charset = ""

 

page = ""

 

retry = 0

 

while True:

 

try:

 

fp = urllib2.urlopen(url)

 

break

 

except urllib2.HTTPError,e: #状态错误

 

Tools.writelog('error','HTTP状态错误 code='+e.code)

 

raise urllib2.HTTPError

 

except urllib2.URLError,e: #网络错误超时

 

Tools.writelog('warn','页面访问超时,重试..')

 

retry+=1

 

if( retry > self.MAX_RETRY ):

 

Tools.writelog('warn','超过最大重试次数,放弃')

 

raise urllib2.URLError

 

 

while True:

 

line = fp.readline()

 

if charset == "":

 

charset = self.getCharset(line)

 

if not line:

 

break

 

page += Tools.toUnicode(line,charset)

 

fp.close()

 

return page

 

 

#获取页面

 

def getPageInfo(self,url):

 

Tools.writelog( "info","开始抓取网页,url= "+url)

 

info = ""

 

try:

 

info = self.downloadUrl(url)

 

except:

 

raise

 

Tools.writelog("debug","网页抓取成功")

 

return info

'''

 

内容提取类

 

'''

 

class InfoGripper():

 

pageGripper = PageGripper()

 

 

def __init__(self):

 

Tools.writelog('debug',"爬虫启动")

 

 

#抓取标题

 

def griptitle(self,data):

 

title = Tools.getFromPatten(u'box2t sp">

(.*?)

', data, True)

 

if title == "":

 

title = Tools.getFromPatten(u'(.*?)[-<]',data,True)

 

return title.strip()

 

 

#抓取频道

 

def gripchannel(self,data):

 

zone = Tools.getFromPatten(u'频道:(.*?)',data,True)

 

channel = Tools.getFromPatten(u'<a.*?>(.*?)',zone,True)

 

return channel

 

 

#抓取标签

 

def griptag(self,data):

 

zone = Tools.getFromPatten(u'标签:(.*?)',data,True);

 

rst = Tools.getFromPatten(u'>(.*?)',zone,False);

 

return rst

 

 

#抓取观看次数

 

def gripviews(self,data):

 

rst = Tools.getFromPatten(u'已经有(.*?)次观看',data);

 

return rst

 

 

#抓取发布时间

 

def griptime(self,data):

 

rst = Tools.getFromPatten(u'在(.*?)发布',data,True)

 

return rst

 

 

#抓取发布者

 

def gripuser(self,data):

 

rst = Tools.getFromPatten(u'title="点击进入(.*?)的用户空间"',data,True)

 

return rst

 

 

#获取页面字符集

 

def getPageCharset(self,data):

 

charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)

 

 

if( charset == "utf8" ):

 

charset = "utf-8"

 

return charset

 

 

#获取CC相关数据

 

def getCCData(self,data):

 

 

zone = Tools.getFromPatten(u'SWFObject(.*?)',data,True)

 

 

#判断是否使用bokecc播放

 

isFromBokeCC = re.match('.*bokecc.com.*', zone)

 

if( not isFromBokeCC ):

 

return "",""

 

 

ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)

 

ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)

 

return ccSiteId,ccVid

 

 

#获取站内vid

 

def gripVideoId(self,data):

 

vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)

 

return vid

 

 

#获取点击量

 

def gripViewsajax(self,vid,url,basedir):

 

host = Tools.getFromPatten(u'http://(.*?)/',url,True)

 

ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid

 

'''

 

try:

 

content = self.pageGripper.getPageInfo(ajaxAddr)

 

except Exception,e:

 

print e

 

Tools.writelog ("error", ajaxAddr+u"抓取失败")

 

return "error"

 

'''

 

Tools.writelog('debug', u"开始获取点击量,url="+ajaxAddr)

 

while True:

 

try:

 

fp = urllib2.urlopen(ajaxAddr)

 

break

 

except urllib2.HTTPError,e: #状态错误

 

Tools.writelog('error','HTTP状态错误 code='+"%d"%e.code)

 

return ""

 

except urllib2.URLError,e: #网络错误超时

 

Tools.writelog('warn','页面访问超时,重试..')

 

retry+=1

 

if( retry > self.MAX_RETRY ):

 

Tools.writelog('warn','超过最大重试次数,放弃')

 

return ""

 

content = fp.read()

 

fp.close()

 

views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)

 

views = views.replace('"','')

 

return views

 

 

#从网页内容中爬取点击量

 

def gripViewsFromData(self,data):

 

views = Tools.getFromPatten(u'已经有<.*?>(.*?)<.*?>次观看',data,True)

 

return views

def gripBaseDir(self,data):

 

dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)

 

return dir

#抓取数据

 

def gripinfo(self,url):

 

 

try:

 

data = self.pageGripper.getPageInfo(url)

 

except:

 

Tools.writelog ("error", url+" 抓取失败")

 

raise

 

 

Tools.writelog('info','开始内容匹配')

 

rst = {}

 

rst['title'] = self.griptitle(data)

 

rst['channel'] = self.gripchannel(data)

 

rst['tag'] = self.griptag(data)

 

rst['release'] = self.griptime(data)

 

rst['user'] = self.gripuser(data)

 

ccdata = self.getCCData(data)

 

rst['ccsiteId'] = ccdata[0]

 

rst['ccVid'] = ccdata[1]

 

views = self.gripViewsFromData(data)

 

if views =="" or not views:

 

vid = self.gripVideoId(data)

 

basedir = self.gripBaseDir(data)

 

views = self.gripViewsAjax(vid,url,basedir)

 

if( views == "" ):

 

views = "error"

 

if( views == "error"):

 

Tools.writelog("error","获取观看次数失败")

 

Tools.writelog("debug","点击量:"+views)

 

rst['views'] = views

 

Tools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))

 

return rst

'''

 

单元测试

 

'''

 

if __name__ == '__main__':

 

list = [

 

'http://008yx.com/xbsp/index.php/video/index/3138',

 

'http://vblog.xwhb.com/index.php/video/index/4067',

 

'http://demo.ccvms.bokecc.com/index.php/video/index/3968',

 

'http://vlog.cnhubei.com/wuhan/20100912_56145.html',

 

'http://vlog.cnhubei.com/html/js/30271.html',

 

'http://www.ddvtv.com/index.php/video/index/15',

 

'http://boke.2500sz.com/index.php/video/index/60605',

 

'http://video.zgkqw.com/index.php/video/index/334',

 

'http://yule.hitmv.com/html/joke/27041.html',

 

'http://www.ddvtv.com/index.php/video/index/11',

 

'http://www.zgnyyy.com/index.php/video/index/700',

 

'http://www.kdianshi.com/index.php/video/index/5330',

 

'http://www.aoyatv.com/index.php/video/index/127',

 

'http://v.ourracing.com/html/channel2/64.html',

 

'http://v.zheye.net/index.php/video/index/93',

 

'http://vblog.thmz.com/index.php/video/index/7616',

 

'http://kdianshi.com/index.php/video/index/5330',

 

'http://tv.seeyoueveryday.com/index.php/video/index/95146',

 

'http://sp.zgyangzhi.com/html/ji/2.html',

 

'http://www.xjapan.cc/index.php/video/index/146',

 

'http://www.jojy.cn/vod/index.php/video/index/399',

 

'http://v.cyzone.cn/index.php/video/index/99',

 

]

 

 

list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']

infoGripper = InfoGripper()

 

for url in list:

 

infoGripper.gripinfo(url)

 

del infoGripper

WEB服务及任务调度

'''

 

Created on 2010-9-15

@author: chenggong

 

'''

 

# -*- coding: utf-8 -*-

 

import string,cgi,time

 

from os import curdir,sep

 

from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer

 

from InfoGripper import *

 

import re

 

import MySQLdb

 

import time

 

import threading

 

import urllib

 

import urllib2

PORT = 8079

 

VERSION = 0.1

 

DBCHARSET = "utf8"

 

PARAMS = [

 

'callback',

 

'sessionId',

 

'retry',

 

'retryInterval',

 

'dbhost',

 

'dbport',

 

'db',

 

'dbuser',

 

'dbpass',

 

'videoId'

 

]

DBMAP = ['video_id',

 

'ccsiteid',

 

'ccvid',

 

'desc_url',

 

'site_id',

 

'title',

 

'post_time',

 

'author',

 

'elapse',

 

'channel',

 

'tags',

 

'create_time',

 

'check_time',

 

'status']

'''

 

ERROR CODE定义

 

'''

 

ERR_OK = 0

 

ERR_PARAM = 1

 

ERR_HTTP_TIMEOUT = 5

 

ERR_HTTP_STATUS = 6

 

ERR_DB_CONNECT_FAIL = 8

 

ERR_DB_SQL_FAIL = 9

 

ERR_GRIPVIEW = 11

 

ERR_UNKNOW = 12

'''

 

数据库适配器

 

'''

 

class DBAdapter(object):

 

 

def __init__(self):

 

self.param = {'ip':'',

 

'port':0,

 

'user':'',

 

'pw':'',

 

'db':''}

 

self.connect_once = False #是否连接过数据库

 

 

'''

 

创建/更新数据库连接池

 

'''

 

def connect(self,ip,port,user,pw,db):

 

if( ip != self.param['ip'] or

 

port != self.param['port'] or

 

user != self.param['user'] or

 

pw != self.param['pw'] or

 

db != self.param['db']):

 

Tools.writelog('info','更换数据库连接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)

 

try:

 

if self.connect_once == True: #释放上次连接

 

self.cur.close()

 

self.conn.close()

 

self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))

 

self.conn.set_character_set(DBCHARSET)

 

self.connect_once = True

 

self.cur=self.conn.cursor(MySQLdb.cursors.Cursor)

 

self.param['ip'] = ip

 

self.param['port'] = port

 

self.param['user'] = user

 

self.param['pw'] = pw

 

self.param['db'] = db

 

except:

 

Tools.writelog('error',u'数据库连接失败',True)

 

raise

 

else:

 

Tools.writelog('info',u'数据库连接成功')

 

 

'''

 

执行SQL语句

 

'''

 

def execute(self,sql):

 

Tools.writelog('debug',u'执行SQL: '+sql)

 

try:

 

self.cur.execute(sql)

 

except:

 

Tools.writelog('error',u'SQL执行错误:'+sql)

 

raise

 

 

'''

 

查询数据库

 

'''

 

def query(self,sql):

 

row = {}

 

self.execute(sql)

 

row=self.cur.fetchall()

 

return row

 

 

'''

 

视频错误

 

'''

 

def updateErr(self,videoId):

 

nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))

 

sql = "UPDATE videos SET "

 

sql += "check_time='" + nowtime +"',"

 

sql += "status=-1 "

 

sql += "WHERE video_id="+videoId

 

self.execute(sql)

 

self.conn.commit()

 

 

'''

 

更新查询结果

 

'''

 

def update(self,obj,videoId,isUpdateTitle=True):

 

 

Tools.writelog('debug','开始更新数据库')

 

try:

 

#更新video表

 

sql = "UPDATE videos SET "

 

if(obj['ccsiteId'] !="" ):

 

sql += "ccsiteid='" + obj['ccsiteId'] + "',"

 

if(obj['ccVid'] != "" ):

 

sql += "ccvid='" + obj['ccVid'] + "',"

 

if isUpdateTitle:

 

sql += "title='" + obj['title'] + "',"

 

sql += "post_time='" + obj['release'] + "',"

 

sql += "author='" + obj['user'] + "',"

 

sql += "channel='" + obj['channel'] + "',"

 

sql += "tags='" + obj['tag'] + "',"

 

nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))

 

sql += "check_time='" + nowtime +"',"

 

sql += "status=0 "

 

sql += "WHERE video_id="+videoId

 

 

self.execute(sql)

 

 

#更新count表

 

if( obj['views'] != 'error' ):

 

nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))

 

sql = "SELECT * FROM counts WHERE "

 

sql += "date = '" + nowdate + "' and video_id=" + videoId

 

rst = self.query(sql)

 

if len(rst) > 0:#如果当天已有记录,则更新

 

sql = "UPDATE counts SET count="+obj['views']

 

sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"

 

else:#否则插入

 

sql = "INSERT INTO counts VALUES"

 

sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"

 

self.execute(sql)

 

self.conn.commit()

 

Tools.writelog('debug', "db commit ok")

 

return ERR_OK

 

except Exception,e:

 

print e

 

return ERR_DB_SQL_FAIL

'''

 

任务线程类

 

'''

 

class TaskThread(threading.Thread):

 

 

def setTaskTool(self,dbAdapter,gripper):

 

self.dbAdapter = dbAdapter

 

self.gripper = gripper

 

 

def setParam(self,param):

 

self.param = param

 

self.videoId = param['videoId']

 

assert self.videoId != ""

 

 

def init(self):

 

self.views = "0"

 

self.errcode = ERR_OK

 

 

def run(self):

 

Tools.writelog('debug','开始爬虫任务,sessionId='+self.param['sessionId'])

 

self.init()

 

try:

 

#更新数据库连接

 

self.dbAdapter.connect(self.param['dbhost'],self.param['dbport'],self.param['dbuser'],self.param['dbpass'],self.param['db'])

 

except:

 

self.errcode = ERR_DB_CONNECT_FAIL #数据库连接失败

 

callback(self.errcode)

 

return

 

 

#查询该vid的视频

 

sql = "SELECT "

 

for column in DBMAP:

 

sql += column

 

if column != DBMAP[len(DBMAP)-1]:

 

sql += ","

sql += " FROM videos"

 

sql += " WHERE video_id="+self.videoId

 

video = self.dbAdapter.query(sql)

 

assert not (len(video)>1 or len(video)==0) #有且仅有一条记录

 

 

url = video[0][3]

 

assert url != ""

 

try:

 

rst = self.gripper.gripinfo(url)

 

except urllib2.HTTPError,e:

 

self.errcode = ERR_HTTP_STATUS #HTTP状态错误

 

self.dbAdapter.updateErr(self.videoId)

 

except urllib2.URLError,e:

 

self.errcode = ERR_HTTP_TIMEOUT #HTTP连接超时

 

self.dbAdapter.updateErr(self.videoId)

 

except:

 

self.errcode = ERR_UNKNOW #未知错误

 

self.dbAdapter.updateErr(self.videoId)

 

else:

 

self.views = rst['views']

 

if self.views == "error":

 

self.views = "-1"

 

self.errcode = ERR_GRIPVIEW #数据抓取成功,点击量抓取失败

 

#更新数据库(特殊处理,如果原title中有 "-" 则不更新title字段)

 

title = video[0][5]

 

assert title != ""

 

if re.match('.*-.*', title):

 

self.errocde = self.dbAdapter.update(rst,self.videoId,True)

 

else:

 

self.errcode = self.dbAdapter.update(rst,self.videoId)

 

self.callback(self.errcode)

 

Tools.writelog('info','任务结束,sessionId='+self.param['sessionId'])

 

return

 

 

def callback(self,errcode):

 

results = {'errorcode':errcode,'count':int(self.views)}

 

results = urllib.urlencode(results)

 

results = results.replace('&', '%26')

 

url = self.param['callback']

 

url += "?"

 

url += "sessionId=" + self.param['sessionId']

 

url += "&results=" + results

 

retry = 0

 

while True:

 

try:

 

Tools.writelog('debug',"回调主控,url="+url)

 

urllib2.urlopen(url)

 

Tools.writelog('debug','回调成功')

 

break

 

except urllib2.URLError, e: #超时、错误

 

Tools.writelog('debug','回调主控超时,%s秒后重试'%self.param['retryInterval'])

 

retry+=1

 

time.sleep(int(self.param['retryInterval']))

 

if( retry > int(self.param['retry'])):

 

Tools.writelog('error','回调主控失败')

 

return

'''

 

WEB服务类

 

'''

 

class MyHandler(BaseHTTPRequestHandler):

 

 

dbAdapter = DBAdapter()

 

gripper = InfoGripper()

 

 

def pageSuccess(self):

 

self.send_response(200)

 

self.send_header('Content-type', 'text/html')

 

self.end_headers()

 

 

def pageFail(self):

 

self.send_error(404, "not found")

 

 

def getValue(self,param):

 

src = self.path + '&'

 

reg = param + '=' + '(.*?)&'

 

 

value = Tools.getFromPatten(reg,src,True)

 

return value

 

 

def do_GET(self):

 

isGetVersion = re.match('.*vinfoant/version.*', self.path)

 

isTask = re.match('.*vinfoant/run.*', self.path)

 

if( isGetVersion ):

 

self.pageSuccess()

 

self.wfile.write(VERSION)

 

elif( isTask ):

 

self.pageSuccess()

 

param = {}

 

for p in PARAMS:

 

param[p] = self.getValue(p) #获取各项参数

 

taskThread = TaskThread()

 

taskThread.setTaskTool(self.dbAdapter, self.gripper)

 

taskThread.setParam(param)

 

taskThread.start()#启动任务线程

 

self.wfile.write("ok")

 

else:

 

self.pageFail()

 

return

 

 

'''

 

启动WEB服务,全局入口

 

'''

 

def startHttpd():

 

try:

 

Tools.writelog('debug','httpd start..listen on '+str(PORT))

 

httpd = HTTPServer(('',PORT), MyHandler )

 

Tools.writelog('debug','success')

 

httpd.serve_forever()

 

except KeyboardInterrupt:

 

Tools.writelog('debug','httpd close..')

 

httpd.socket.close()

 

 

if __name__ == '__main__':

 

startHttpd()

转载自:http://blog.csdn.net/rcfalcon/article/details/5891781  (课课家

赞(30)
踩(2)
分享到:
华为认证网络工程师 HCIE直播课视频教程