Untitled
unknown
python
a year ago
2.2 kB
5
Indexable
Never
from aiohttp_socks import ProxyConnector, ProxyType import aiohttp import asyncio loop = asyncio.get_event_loop() timeout = aiohttp.ClientTimeout(total=15.0) urls = ( "https://api.proxyscrape.com?request=displayproxies&proxytype=socks5&timeout=5000", "https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/socks5.txt" ) async def fetch_proxies(): result = [] async with aiohttp.ClientSession() as session: for url in urls: async with session.get(url) as response: if response.status == 200: result.extend( (tuple(proxy.strip().split(":")) for proxy in (await response.text()).split("\n") if ":" in proxy) ) return result async def check_proxy(host, port): proxy = f"{host}:{port}" async def on_request_start(_, ctx, __): ctx.start = loop.time() result = dict(passed=False) async def on_request_end(_, ctx, __): print(f"[Proxy {proxy}] session passed ({loop.time() - ctx.start:.2f}s)") result.update(dict(passed=True, proxy=proxy)) trace_config = aiohttp.TraceConfig() trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) try: async with aiohttp.ClientSession(connector=ProxyConnector( proxy_type=ProxyType.SOCKS5, host=host, port=port, rdns=True ), timeout=timeout, trace_configs=[trace_config]) as session: resp = await session.get("https://shrinke.me/SketchfabRipper") print(resp.status) resp.close() except Exception: pass return result if __name__ == "__main__": print("Fetching some proxies") proxy_list = loop.run_until_complete(fetch_proxies()) if proxy_list: print(f"Found list of {len(proxy_list)} proxies") result = loop.run_until_complete(asyncio.gather(*(check_proxy(host, int(port)) for host, port in proxy_list))) with open("./out.txt", "w") as f: f.write("\n".join((info["proxy"] for info in filter(lambda d: d["passed"], result)))) else: print("Found no proxy list")