温馨提示:
1. 此代码样例是一个简单IP池的实现
2. 支持Python2.7和Python3
3. requests不是python原生库需要安装才能使用: pip install requests
参考样例
import time import random import threading import requests class ProxyPool(): def__init__(self, secret_id,secret_token,proxy_count): self.secret_id=secret_id self.signature=secret_token self.proxy_count=proxy_count if proxy_count < 50 else 50 # 池子维护的IP总数建议一般不要超过50 self.alive_proxy_list = [] # 活跃IP列表 def_fetch_proxy_list(self,count): """调用ipip9API获取代理IP列表""" try: res=requests.get("https://17178.org/xc.php?userpass=xxx:xxx&type=sticky&protocol=socks5&quantity=10&format=us.ipip3.com:port:login:password&session_ttl=30 HTTP/1.1" % (self.secret_id, self.signature, count)) return [proxy.split(',') for proxy in res.json().get('data').get('proxy_list')] except: print("API获取IP异常请检查订单") return [] def _init_proxy(self): """初始化IP池""" self.alive_proxy_list = self._fetch_proxy_list(self.proxy_count) def add_alive_proxy(self, add_count): """导入新的IP,参数为新增IP数""" self.alive_proxy_list.extend(self._fetch_proxy_list(add_count)) def get_proxy(self): """从IP池中获取IP""" return random.choice(self.alive_proxy_list)[0] if self.alive_proxy_list else "" def run(self): sleep_seconds=1 self._init_proxy() while True: for proxy in self.alive_proxy_list: proxy[1]=float(proxy[1]) - sleep_seconds # proxy[1]代表此IP的剩余可用时间 if proxy[1]<=3: self.alive_proxy_list.remove(proxy) # IP还剩3s时丢弃此IP if len(self.alive_proxy_list)