下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

如何运用Python创建代理IP池

作者:课课家教育     来源: http://www.kokojia.com点击数:1089发布时间: 2019-08-09 10:02:50

标签: pythonpython项目21天学通python

大神带你学编程,欢迎选课

Python在设计上坚持了清晰划一的风格,这使得Python成为一门易读、易维护,并且被大量用户所欢迎的、用途广泛的语言。

设计者开发时总的指导思想是,对于一个特定的问题,只要有一种最好的方法来解决就好了。这在由Tim Peters写的Python格言(称为The Zen of Python)里面表述为:There should be one-- and preferably only one --obvious way to do it. 这正好和Perl语言(另一种功能类似的高级动态语言)的中心思想TMTOWTDI(There's More Than One Way To Do It)完全相反。

基本原理
代理实际上指的就是代理服务器,它的功能是代理网络用户去取得网络信息 。也可以说它是网络信息的中转站 。

在我们正常请求一个网站时, 是将请求发送给 web 服务器,Web 服务器把响应传回给我们 。 如果设置了代理服务器 , 实际上就是在本机和服务器之间搭建了一个桥, 此时本机不是直接 向 Web 服务器发起请求,而是向代理服务器发出请求,请求会发送给代理服务器,然后由代理服务器再发送给 Web 服务器,接着由代理服务器再把 Web 服务器返回的响应转发给本机。 这样我们同样可以正常访问网页,但这个过程中 Web 服务器识别出的真实 IP 就不再是我们本机的 IP 了,就成功实现了 IP 伪装,解决爬虫中封IP的难题。

了解代理服务器的基本原理后,我们不禁会想到几个问题,代理IP从何而来?如何保证代理可用性?代理如何存储?如何使用这些代理?

获取代理IP: 爬取网站的免费代理。比如西刺、快代理之类有免费代理的网站, 但是这些免费代理大多数情况下都是不好用的,所以比较靠谱的方法是购买付费代理。当然,如果你有更好的代理接口也可以自己接入。
检测IP代理可用性: 因为免费代理大部分是不可用的,所以采集回来的代理IP不能直接使用,可以写检测程序不断的去用这些代理访问一个稳定的网站,看是否可以正常使用。
存储代理IP: 存储的代理IP首先要保证代理不重复 , 要检测代理的可用情况,还要动态实时处理每个代理,本文利用来MongoDB存储,当然也可用其他方式存储。
使用代理:最简单的办法就是用 API 来提供对外服务的接口 。
 

IP代理池设计
我们了解了代理池的四大问题,所以我们可以根据这四个问题去分析设计一个代理池框架,我们可以分成四个模块。分别是获取模块、检测模块、存储模块、接口模块 。这样不仅有利于我们的维护,也使得可以更高效的完成我们的需求。

架构图


代码模块
在这里只是简单的写出了代码模块的实现,并不完整不具有逻辑性。

 

获取模块
import requests
import chardet
import traceback
from lXML import etree

class Downloader(object):
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'
}

def download(self, url):
print('正在下载页面:{}'.format(url))
try:
resp = requests.get(url, headers=self.headers)
resp.encoding = chardet.detect(resp.content)['encoding']

if resp.status_code == 200:
return self.xpath_parse(resp.text)

else:
raise ConnectionError
except Exception:
print('下载页面出错:{}'.format(url))
traceback.print_exc()

def xpath_parse(self, resp):
try:
page = etree.HTML(resp)
trs = page.xpath('//div[@id="list"]/table/tbody/tr')
proxy_list = []
for tr in trs:
ip = tr.xpath('./td[1]/text()')[0]
port = tr.xpath('./td[2]/text()')[0]
proxy = {
'proxy': ip + ':' + port
}
proxy_list.append(proxy)
return proxy_list
except Exception:
print('解析IP地址出错')
traceback.print_exc()

if __name__ == '__main__':
print(Downloader().download
存储模块
import pymongo
from pymongo.errors import DuplicateKeyError


class MongoDB(object):
def __init__(self):
self.client = pymongo.MongoClient()
self.db = self.client['proxypool3']
self.proxies = self.db['proxies']
self.proxies.ensure_index('proxy', unique=True)
self.proxies.create_index()
# createIndex()
def insert(self, proxy):
try:
self.proxies.insert(proxy)
print('插入成功:{}'.format(proxy))
except DuplicateKeyError:
pass

def delete(self, conditions):
self.proxies.remove(conditions)
print('删除成功:{}'.format(conditions))

def update(self, conditions, values):
self.proxies.update(conditions, {"$set": values})
print('更新成功:{},{}'.format(conditions,values))

def get(self, count, conditions=None):
conditions = conditions if conditions else {}
count = int(count)
items = self.proxies.find(conditions, limit=count).sort('delay', pymongo.ASCENDING)
items = list(items)
return items

def get_count(self):
return self.proxies.count({})


if __name__ == '__main__':
m = MongoDB()
print(m.get(3))
检测模块
import requests
import time
import traceback
from requests.exceptions import ProxyError, ConnectionError
from db.mongo_db import MongoDB
from multiprocessing.pool import ThreadPool


def valid_many(proxy_list, method):
pool = ThreadPool(16)

for proxy in proxy_list:
pool.apply_async(valid_one, args=(proxy, method))
pool.close()
pool.join()


def valid_one(proxy, method, url='https://www.baidu.com'):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'
}
proxies = {
'http': 'http://' + proxy['proxy'],
'https': 'http://' + proxy['proxy']
}

try:
start_time = time.time()
# requests.packages.urllib3.disable_warnings()
resp = requests.get(url, headers=headers, proxies=proxies, timeout=5, verify=False)
delay = round(time.time() - start_time, 2)

if resp.status_code == 200:
proxy['delay'] = delay
if method == 'insert':

MongoDB().insert(proxy)
elif method == 'check':
MongoDB().update({'proxy': proxy['proxy']}, {'delay': proxy['delay']})
else:
if method == 'check':
MongoDB().delete({'proxy': proxy['proxy']})
except (ProxyError, ConnectionError):
if method == 'check':
MongoDB().delete({'proxy': proxy['proxy']})
except Exception:
traceback.print_exc()
API接口模块
import flask
import json
from db.mongo_db import MongoDB


app = flask.Flask(__name__)


@app.route('/one')
def get_one():
proxies = MongoDB().get(1)
result = [proxy['proxy'] for proxy in proxies]
return json.dumps(result)


@app.route('/many')
def get_many():
args = flask.request.args
proxies = MongoDB().get(args['count'])
result = [proxy['proxy'] for proxy in proxies]
return json.dumps(result)

def run():
app.run()

我们了解了代理池的四大问题,所以我们可以根据这四个问题去分析设计一个代理池框架,我们可以分成四个模块。分别是获取模块、检测模块、存储模块、接口模块 。这样不仅有利于我们的维护,也使得可以更高效的完成我们的需求。

赞(0)
踩(0)
分享到:
华为认证网络工程师 HCIE直播课视频教程