123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- import asyncio
- import logging
- import threading
- import httpcore
- import httpx
- from httpx_socks import AsyncProxyTransport
- from python_socks import (
- parse_proxy_url,
- ProxyConnectionError,
- ProxyTimeoutError,
- ProxyError
- )
- from searx import logger
- try:
- import uvloop
- except ImportError:
- pass
- else:
- uvloop.install()
- logger = logger.getChild('searx.http.client')
- LOOP = None
- SSLCONTEXTS = {}
- TRANSPORT_KWARGS = {
- 'backend': 'asyncio',
- 'trust_env': False,
- }
- async def close_connections_for_url(
- connection_pool: httpcore.AsyncConnectionPool,
- url: httpcore._utils.URL ):
- origin = httpcore._utils.url_to_origin(url)
- logger.debug('Drop connections for %r', origin)
- connections_to_close = connection_pool._connections_for_origin(origin)
- for connection in connections_to_close:
- await connection_pool._remove_from_pool(connection)
- try:
- await connection.aclose()
- except httpcore.NetworkError as e:
- logger.warning('Error closing an existing connection', exc_info=e)
- def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
- global SSLCONTEXTS
- key = (proxy_url, cert, verify, trust_env, http2)
- if key not in SSLCONTEXTS:
- SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
- return SSLCONTEXTS[key]
- class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
- """Block HTTP request"""
- async def arequest(self, method, url, headers=None, stream=None, ext=None):
- raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
- class AsyncProxyTransportFixed(AsyncProxyTransport):
- """Fix httpx_socks.AsyncProxyTransport
- Map python_socks exceptions to httpcore.ProxyError
- Map socket.gaierror to httpcore.ConnectError
- Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
- * self._keepalive_sweep()
- * self._response_closed(self, connection)
- Note: AsyncProxyTransport inherit from AsyncConnectionPool
- Note: the API is going to change on httpx 0.18.0
- see https://github.com/encode/httpx/pull/1522
- """
- async def arequest(self, method, url, headers=None, stream=None, ext=None):
- retry = 2
- while retry > 0:
- retry -= 1
- try:
- return await super().arequest(method, url, headers, stream, ext)
- except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e:
- raise httpcore.ProxyError(e)
- except OSError as e:
-
- raise httpcore.NetworkError(e)
- except httpcore.RemoteProtocolError as e:
-
- await close_connections_for_url(self, url)
- logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
-
- except (httpcore.NetworkError, httpcore.ProtocolError) as e:
-
-
- await close_connections_for_url(self, url)
- raise e
- class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
- """Fix httpx.AsyncHTTPTransport"""
- async def arequest(self, method, url, headers=None, stream=None, ext=None):
- retry = 2
- while retry > 0:
- retry -= 1
- try:
- return await super().arequest(method, url, headers, stream, ext)
- except OSError as e:
-
- raise httpcore.ConnectError(e)
- except httpcore.CloseError as e:
-
-
-
- await close_connections_for_url(self._pool, url)
- logger.warning('httpcore.CloseError: retry', exc_info=e)
-
- except httpcore.RemoteProtocolError as e:
-
- await close_connections_for_url(self._pool, url)
- logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
-
- except (httpcore.ProtocolError, httpcore.NetworkError) as e:
- await close_connections_for_url(self._pool, url)
- raise e
- def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
- global TRANSPORT_KWARGS
-
-
-
-
- rdns = False
- socks5h = 'socks5h://'
- if proxy_url.startswith(socks5h):
- proxy_url = 'socks5://' + proxy_url[len(socks5h):]
- rdns = True
- proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
- verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
- return AsyncProxyTransportFixed(
- proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
- username=proxy_username, password=proxy_password,
- rdns=rdns,
- loop=get_loop(),
- verify=verify,
- http2=http2,
- local_address=local_address,
- max_connections=limit.max_connections,
- max_keepalive_connections=limit.max_keepalive_connections,
- keepalive_expiry=limit.keepalive_expiry,
- retries=retries,
- **TRANSPORT_KWARGS
- )
- def get_transport(verify, http2, local_address, proxy_url, limit, retries):
- global TRANSPORT_KWARGS
- verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
- return AsyncHTTPTransportFixed(
-
- verify=verify,
- http2=http2,
- local_address=local_address,
- proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
- limits=limit,
- retries=retries,
- **TRANSPORT_KWARGS
- )
- def iter_proxies(proxies):
-
- if isinstance(proxies, str):
- yield 'all://', proxies
- elif isinstance(proxies, dict):
- for pattern, proxy_url in proxies.items():
- yield pattern, proxy_url
- def new_client(
-
- enable_http, verify, enable_http2,
- max_connections, max_keepalive_connections, keepalive_expiry,
- proxies, local_address, retries, max_redirects ):
- limit = httpx.Limits(
- max_connections=max_connections,
- max_keepalive_connections=max_keepalive_connections,
- keepalive_expiry=keepalive_expiry
- )
-
- mounts = {}
- for pattern, proxy_url in iter_proxies(proxies):
- if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
- continue
- if (proxy_url.startswith('socks4://')
- or proxy_url.startswith('socks5://')
- or proxy_url.startswith('socks5h://')
- ):
- mounts[pattern] = get_transport_for_socks_proxy(
- verify, enable_http2, local_address, proxy_url, limit, retries
- )
- else:
- mounts[pattern] = get_transport(
- verify, enable_http2, local_address, proxy_url, limit, retries
- )
- if not enable_http:
- mounts['http://'] = AsyncHTTPTransportNoHttp()
- transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
- return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
- def get_loop():
- global LOOP
- return LOOP
- def init():
-
- for logger_name in ('hpack.hpack', 'hpack.table'):
- logging.getLogger(logger_name).setLevel(logging.WARNING)
-
- def loop_thread():
- global LOOP
- LOOP = asyncio.new_event_loop()
- LOOP.run_forever()
- thread = threading.Thread(
- target=loop_thread,
- name='asyncio_loop',
- daemon=True,
- )
- thread.start()
- init()
|