Proxy Pool 是一个自主运行的代理池服务,将传统的"按需获取"模式升级为"自主维护+按需调用"的服务端架构。系统启动后持续在后台维护代理池,通过定时任务自动执行获取、验证、质量检测和清理,并通过RESTful API返回已验证的最优代理。
http://localhost:8888
# 方式1: 直接运行
cd server
python main.py
# 方式2: Docker部署 (推荐)
docker-compose up -d
# 方式3: 系统服务
sudo systemctl start proxy-pool
# 检查健康状态
curl http://localhost:8888/health
# 查看统计信息
curl http://localhost:8888/api/stats
# 访问Web界面
open http://localhost:8888
curl http://localhost:8888/api/proxy/acquire
curl "http://localhost:8888/api/proxy/acquire?protocol=socks5"
curl "http://localhost:8888/api/proxy/acquire?residential=true&min_score=80"
curl "http://localhost:8888/api/proxy/acquire?site_id=chatgpt&min_score=85"
系统自动执行以下后台任务:
| 任务 | 执行频率 | 说明 |
|---|---|---|
| 代理获取 | 每6小时 | 从所有启用的源获取代理 |
| 代理验证 | 每4小时 | 验证代理可用性和响应时间 |
| 质量检测 | 每8小时 | 检测住宅IP、ChatGPT/Google可用性 |
| 清理任务 | 每天凌晨2点 | 清理失效代理和旧日志 |
| 节点收集 | 每6小时 | 收集免费节点 |
# 刷新代理池
curl -X POST http://localhost:8888/api/proxy/refresh
# 质量检测
curl -X POST http://localhost:8888/api/proxy/quality-check
# 收集节点
curl -X POST http://localhost:8888/api/aggregator/collect
在爬虫项目中使用代理池避免IP封禁:
import requests
# 获取代理
proxy_resp = requests.get('http://localhost:8888/api/proxy/acquire?protocol=http&min_score=70')
proxy = proxy_resp.json()
# 使用代理爬取
response = requests.get('https://example.com', proxies=proxy['proxies'])
print(response.text)
使用住宅IP代理访问ChatGPT API:
import openai
# 获取ChatGPT可用代理
proxy_resp = requests.get('http://localhost:8888/api/proxy/acquire', params={
'site_id': 'chatgpt',
'residential': True,
'min_score': 85
})
proxy = proxy_resp.json()
# 配置OpenAI客户端使用代理
openai.proxy = proxy['proxies']['https']
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}]
)
使用不同国家的代理进行批量注册:
# 获取美国代理
us_proxies = requests.get('http://localhost:8888/api/proxy/batch', params={
'count': 10,
'country': 'US',
'protocol': 'http',
'min_score': 80
}).json()
# 使用不同代理注册账号
for proxy_data in us_proxies['proxies']:
proxies = proxy_data['proxies']
# 执行注册逻辑
register_account(proxies)
获取VMess/Trojan等节点用于科学上网:
# 获取VMess节点
node = requests.get('http://localhost:8888/api/aggregator/acquire?protocol=vmess&min_score=80').json()
# 获取节点URL
vmess_url = node['raw_url']
print(f"VMess节点: {vmess_url}")
# 批量获取多个节点
nodes = requests.get('http://localhost:8888/api/aggregator/batch?count=5&protocol=trojan').json()
for node in nodes['nodes']:
print(f"{node['protocol']}://{node['server']}:{node['port']}")
部署一个代理池服务,供多个项目共享使用:
# 项目A: Python爬虫
proxy = requests.get('http://proxy-pool.internal:8888/api/proxy/acquire?protocol=http').json()
# 项目B: Go服务
resp, _ := http.Get("http://proxy-pool.internal:8888/api/proxy/acquire?protocol=socks5")
# 项目C: Node.js应用
const proxy = await fetch('http://proxy-pool.internal:8888/api/proxy/acquire?protocol=http').then(r => r.json());
获取单个最优代理
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
site_id |
string | 否 | 目标站点 (chatgpt/google/general) |
protocol |
string | 否 | 协议类型 (http/https/socks4/socks5) |
country |
string | 否 | 国家代码 (US/CN/JP等) |
residential |
boolean | 否 | 是否住宅IP |
min_score |
integer | 否 | 最低评分(0-100) |
{
"url": "http://1.2.3.4:8080",
"protocol": "http",
"host": "1.2.3.4",
"port": 8080,
"score": 95,
"proxies": {
"http": "http://1.2.3.4:8080",
"https": "http://1.2.3.4:8080"
},
"quality": {
"is_residential": true,
"chatgpt_accessible": true,
"google_accessible": true,
"country": "US",
"ip_address": "1.2.3.4",
"ip_type": "Residential",
"risk_level": "Low",
"risk_score": 0.15
}
}
import requests
# 获取代理
response = requests.get('http://localhost:8888/api/proxy/acquire')
proxy_data = response.json()
# 使用代理
proxies = proxy_data['proxies']
response = requests.get('https://httpbin.org/ip', proxies=proxies)
print(response.json())
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
)
func main() {
// 获取代理
resp, _ := http.Get("http://localhost:8888/api/proxy/acquire")
defer resp.Body.Close()
var proxyData map[string]interface{}
json.NewDecoder(resp.Body).Decode(&proxyData)
// 使用代理
proxyURL, _ := url.Parse(proxyData["url"].(string))
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyURL),
},
}
resp, _ = client.Get("https://httpbin.org/ip")
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
fmt.Println(string(body))
}
// Node.js with axios
const axios = require('axios');
const HttpsProxyAgent = require('https-proxy-agent');
async function main() {
// 获取代理
const response = await axios.get('http://localhost:8888/api/proxy/acquire');
const proxyData = response.data;
// 使用代理
const agent = new HttpsProxyAgent(proxyData.url);
const result = await axios.get('https://httpbin.org/ip', {
httpsAgent: agent
});
console.log(result.data);
}
main();
批量获取代理
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
count |
integer | 否 | 获取数量 (默认10,最大100) |
| 其他参数同/api/proxy/acquire | |||
{
"total": 10,
"proxies": [
{
"url": "http://1.2.3.4:8080",
"protocol": "http",
"host": "1.2.3.4",
"port": 8080,
"score": 95,
"proxies": {
"http": "http://1.2.3.4:8080",
"https": "http://1.2.3.4:8080"
}
}
]
}
列出当前可用代理
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
limit |
integer | 否 | 每页数量 (默认100) |
offset |
integer | 否 | 偏移量(默认0) |
protocol |
string | 否 | 协议过滤 |
country |
string | 否 | 国家过滤 |
手动触发代理池刷新任务
curl -X POST http://localhost:8888/api/proxy/refresh
{
"status": "accepted",
"message": "代理刷新任务已启动",
"task": "fetch_proxies"
}
手动触发质量检测任务
curl -X POST http://localhost:8888/api/proxy/quality-check
获取单个最优节点
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
protocol |
string | 否 | 协议类型 (vmess/vless/trojan/ss/hy2) |
country |
string | 否 | 国家代码 |
min_score |
integer | 否 | 最低评分 |
{
"protocol": "vmess",
"server": "example.com",
"port": 443,
"raw_url": "vmess://...",
"score": 90
}
手动触发节点收集任务
curl -X POST http://localhost:8888/api/aggregator/collect
获取代理池统计信息
{
"http_socks": {
"total": 8523,
"validated": 6234,
"by_protocol": {
"http": 3000,
"https": 2500,
"socks4": 1500,
"socks5": 1523
}
},
"nodes": {
"total": 23456,
"validated": 15678,
"by_protocol": {
"vmess": 10000,
"vless": 5000,
"trojan": 4000,
"ss": 3456,
"hy2": 1000
}
}
}
获取订阅源列表
添加订阅源
{
"name": "My Subscription",
"url": "https://...",
"enabled": true
}
获取当前系统配置
{
"server": {
"host": "0.0.0.0",
"port": 8888,
"workers": 4,
"debug": false
},
"cache": {
"memory_maxsize": 100,
"memory_ttl": 300,
"redis_enabled": true
},
"scheduler": {
"enabled": true,
"timezone": "Asia/Shanghai"
},
"tasks": {
"fetch_proxies": {
"enabled": true,
"interval_hours": 6
},
"validate_proxies": {
"enabled": true,
"interval_hours": 4
}
}
}
更新系统配置 (部分配置需要重启服务生效)
{
"scheduler": {
"jobs": {
"fetch_proxies": {
"enabled": true,
"cron": "0 */4 * * *"
}
}
}
}
获取所有代理数据源列表
{
"total": 28,
"sources": [
{
"id": "kookeey",
"name": "Kookeey API",
"type": "http_socks",
"category": "paid",
"enabled": true,
"total_fetched": 12345,
"last_fetch_time": "2024-03-19T10:00:00Z",
"last_fetch_count": 234,
"success_rate": 0.95
}
]
}
手动触发指定的后台任务
| 参数 | 类型 | 说明 |
|---|---|---|
task_name |
string | 任务名称: fetch_proxies, validate_proxies, quality_check, cleanup, collect_nodes |
# 触发代理获取任务
curl -X POST http://localhost:8888/api/tasks/trigger/fetch_proxies
# 触发验证任务
curl -X POST http://localhost:8888/api/tasks/trigger/validate_proxies
# 触发质量检测任务
curl -X POST http://localhost:8888/api/tasks/trigger/quality_check
# 触发清理任务
curl -X POST http://localhost:8888/api/tasks/trigger/cleanup
# 触发节点收集任务
curl -X POST http://localhost:8888/api/tasks/trigger/collect_nodes
{
"success": true,
"message": "任务 fetch_proxies 已启动"
}
从JSON或TXT文件导入代理
{
"proxies": [
{
"protocol": "http",
"host": "1.2.3.4",
"port": 8080
},
{
"protocol": "socks5",
"host": "5.6.7.8",
"port": 1080
}
],
"source": "manual_import"
}
{
"text": "http://1.2.3.4:8080\nsocks5://5.6.7.8:1080",
"source": "manual_import"
}
{
"success": true,
"imported": 2,
"duplicates": 0,
"message": "成功导入2个代理"
}
导出代理为JSON或TXT格式
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
format |
string | 否 | 导出格式: json 或 txt (默认json) |
protocol |
string | 否 | 协议过滤 |
is_valid |
boolean | 否 | 只导出有效代理 |
# 导出为JSON
curl "http://localhost:8888/api/proxy/export?format=json&is_valid=true"
# 导出为TXT
curl "http://localhost:8888/api/proxy/export?format=txt&protocol=http" > proxies.txt
检查服务健康状态
{
"status": "healthy",
"components": {
"database": {
"status": "healthy",
"proxies": 8523,
"nodes": 23456
}
}
}
如果需要频繁获取相同条件的代理,考虑在客户端缓存结果:
import time
# 简单的客户端缓存
cache = {}
cache_ttl = 300 # 5分钟
def get_cached_proxy(filters):
cache_key = str(filters)
# 检查缓存
if cache_key in cache:
cached_data, cached_time = cache[cache_key]
if time.time() - cached_time < cache_ttl:
return cached_data
# 获取新代理
proxy = requests.get('http://localhost:8888/api/proxy/acquire', params=filters).json()
cache[cache_key] = (proxy, time.time())
return proxy
需要多个代理时,使用批量接口而不是多次调用单个接口:
# ❌ 不推荐: 多次调用
proxies = []
for i in range(10):
proxy = requests.get('http://localhost:8888/api/proxy/acquire').json()
proxies.append(proxy)
# ✅ 推荐: 批量获取
response = requests.get('http://localhost:8888/api/proxy/batch?count=10')
proxies = response.json()['proxies']
实现指数退避重试机制:
import time
def get_proxy_with_retry(max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get('http://localhost:8888/api/proxy/acquire', timeout=5)
if response.status_code == 200:
return response.json()
elif response.status_code == 503:
# 无可用代理,等待后重试
wait_time = 2 ** attempt # 指数退避: 1s, 2s, 4s
print(f"无可用代理,{wait_time}秒后重试...")
time.sleep(wait_time)
else:
response.raise_for_status()
except requests.RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"请求失败: {e}, {wait_time}秒后重试...")
time.sleep(wait_time)
return None
定期检查服务健康状态:
def check_service_health():
try:
response = requests.get('http://localhost:8888/health', timeout=5)
health = response.json()
if health['status'] == 'healthy':
return True
elif health['status'] == 'degraded':
print(f"服务降级: {health['components']}")
return True
else:
print(f"服务不可用: {health}")
return False
except Exception as e:
print(f"健康检查失败: {e}")
return False
# 在应用启动时检查
if not check_service_health():
print("警告: 代理池服务不可用")
通过过滤条件获取最适合的代理,提高成功率:
# 根据目标站点选择
chatgpt_proxy = requests.get('http://localhost:8888/api/proxy/acquire', params={
'site_id': 'chatgpt', # ChatGPT专用
'residential': True, # 住宅IP
'min_score': 85 # 高质量
}).json()
# 根据地理位置选择
us_proxy = requests.get('http://localhost:8888/api/proxy/acquire', params={
'country': 'US', # 美国IP
'protocol': 'http', # HTTP协议
'min_score': 70 # 中等质量
}).json()
# 根据协议选择
socks5_proxy = requests.get('http://localhost:8888/api/proxy/acquire', params={
'protocol': 'socks5', # SOCKS5协议
'min_score': 60 # 基本质量
}).json()
实现代理轮换避免单个代理过度使用:
class ProxyRotator:
def __init__(self, pool_size=10):
self.pool_size = pool_size
self.proxies = []
self.current_index = 0
self.refresh_proxies()
def refresh_proxies(self):
"""刷新代理池"""
response = requests.get('http://localhost:8888/api/proxy/batch', params={
'count': self.pool_size,
'min_score': 70
})
self.proxies = response.json()['proxies']
self.current_index = 0
def get_next_proxy(self):
"""获取下一个代理"""
if not self.proxies:
self.refresh_proxies()
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
# 轮换一圈后刷新
if self.current_index == 0:
self.refresh_proxies()
return proxy
# 使用
rotator = ProxyRotator(pool_size=10)
for i in range(100):
proxy = rotator.get_next_proxy()
# 使用代理执行请求
response = requests.get('https://example.com', proxies=proxy['proxies'])
监控代理池状态并设置告警:
def monitor_proxy_pool():
"""监控代理池状态"""
stats = requests.get('http://localhost:8888/api/stats').json()
# 检查代理数量
total_proxies = stats['http_socks']['total']
if total_proxies < 100:
send_alert(f"警告: 代理数量过低 ({total_proxies})")
# 检查验证通过率
validated = stats['http_socks']['validated']
if validated < total_proxies * 0.5:
send_alert(f"警告: 验证通过率过低 ({validated}/{total_proxies})")
# 检查质量代理数量
high_quality = stats.get('quality', {}).get('total_checked', 0)
if high_quality < 50:
send_alert(f"警告: 高质量代理不足 ({high_quality})")
# 定期执行监控
import schedule
schedule.every(10).minutes.do(monitor_proxy_pool)
虽然API没有硬性速率限制,但建议:
/api/proxy/batch 而不是多次调用/api/proxy/acquire正确处理各种错误响应:
def handle_api_response(response):
"""处理API响应"""
if response.status_code == 200:
return response.json()
elif response.status_code == 400:
error = response.json()
print(f"请求参数错误: {error['detail']}")
return None
elif response.status_code == 503:
print("无可用代理,请稍后重试或手动刷新代理池")
return None
elif response.status_code == 500:
print("服务器内部错误,请查看日志")
return None
else:
print(f"未知错误: {response.status_code}")
return None
# 使用
response = requests.get('http://localhost:8888/api/proxy/acquire')
proxy = handle_api_response(response)