| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 | # SPDX-License-Identifier: AGPL-3.0-or-laterimport asyncioimport loggingimport threadingimport httpcoreimport httpxfrom httpx_socks import AsyncProxyTransportfrom python_socks import parse_proxy_urlimport python_socks._errorsfrom searx import logger# Optional uvloop (support Python 3.6)try:    import uvloopexcept ImportError:    passelse:    uvloop.install()logger = logger.getChild('searx.http.client')LOOP = NoneSSLCONTEXTS = {}TRANSPORT_KWARGS = {    'backend': 'asyncio',    'trust_env': False,}async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):    origin = httpcore._utils.url_to_origin(url)    logger.debug('Drop connections for %r', origin)    connections_to_close = connection_pool._connections_for_origin(origin)    for connection in connections_to_close:        await connection_pool._remove_from_pool(connection)        try:            await connection.aclose()        except httpcore.NetworkError as e:            logger.warning('Error closing an existing connection', exc_info=e)def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):    global SSLCONTEXTS    key = (proxy_url, cert, verify, trust_env, http2)    if key not in SSLCONTEXTS:        SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)    return SSLCONTEXTS[key]class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):    """Block HTTP request"""    async def arequest(self, method, url, headers=None, stream=None, ext=None):        raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")class AsyncProxyTransportFixed(AsyncProxyTransport):    """Fix httpx_socks.AsyncProxyTransport    Map python_socks exceptions to httpcore.ProxyError    Map socket.gaierror to httpcore.ConnectError    Note: keepalive_expiry is ignored, AsyncProxyTransport should call:    * self._keepalive_sweep()    * self._response_closed(self, connection)    Note: AsyncProxyTransport inherit from AsyncConnectionPool    Note: the API is going to change on httpx 0.18.0    see https://github.com/encode/httpx/pull/1522    """    async def arequest(self, method, url, headers=None, stream=None, ext=None):        retry = 2        while retry > 0:            retry -= 1            try:                return await super().arequest(method, url, headers, stream, ext)            except (python_socks._errors.ProxyConnectionError,                    python_socks._errors.ProxyTimeoutError,                    python_socks._errors.ProxyError) as e:                raise httpcore.ProxyError(e)            except OSError as e:                # socket.gaierror when DNS resolution fails                raise httpcore.NetworkError(e)            except httpcore.RemoteProtocolError as e:                # in case of httpcore.RemoteProtocolError: Server disconnected                await close_connections_for_url(self, url)                logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)                # retry            except (httpcore.NetworkError, httpcore.ProtocolError) as e:                # httpcore.WriteError on HTTP/2 connection leaves a new opened stream                # then each new request creates a new stream and raise the same WriteError                await close_connections_for_url(self, url)                raise eclass AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):    """Fix httpx.AsyncHTTPTransport"""    async def arequest(self, method, url, headers=None, stream=None, ext=None):        retry = 2        while retry > 0:            retry -= 1            try:                return await super().arequest(method, url, headers, stream, ext)            except OSError as e:                # socket.gaierror when DNS resolution fails                raise httpcore.ConnectError(e)            except httpcore.CloseError as e:                # httpcore.CloseError: [Errno 104] Connection reset by peer                # raised by _keepalive_sweep()                #   from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198  # noqa                await close_connections_for_url(self._pool, url)                logger.warning('httpcore.CloseError: retry', exc_info=e)                # retry            except httpcore.RemoteProtocolError as e:                # in case of httpcore.RemoteProtocolError: Server disconnected                await close_connections_for_url(self._pool, url)                logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)                # retry            except (httpcore.ProtocolError, httpcore.NetworkError) as e:                await close_connections_for_url(self._pool, url)                raise edef get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):    # support socks5h (requests compatibility):    # https://requests.readthedocs.io/en/master/user/advanced/#socks    # socks5://   hostname is resolved on client side    # socks5h://  hostname is resolved on proxy side    rdns = False    socks5h = 'socks5h://'    if proxy_url.startswith(socks5h):        proxy_url = 'socks5://' + proxy_url[len(socks5h):]        rdns = True    proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)    verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify    return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,                                    username=proxy_username, password=proxy_password,                                    rdns=rdns,                                    loop=get_loop(),                                    verify=verify,                                    http2=http2,                                    local_address=local_address,                                    max_connections=limit.max_connections,                                    max_keepalive_connections=limit.max_keepalive_connections,                                    keepalive_expiry=limit.keepalive_expiry,                                    retries=retries,                                    **TRANSPORT_KWARGS)def get_transport(verify, http2, local_address, proxy_url, limit, retries):    verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify    return AsyncHTTPTransportFixed(verify=verify,                                   http2=http2,                                   local_address=local_address,                                   proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,                                   limits=limit,                                   retries=retries,                                   **TRANSPORT_KWARGS)def iter_proxies(proxies):    # https://www.python-httpx.org/compatibility/#proxy-keys    if isinstance(proxies, str):        yield 'all://', proxies    elif isinstance(proxies, dict):        for pattern, proxy_url in proxies.items():            yield pattern, proxy_urldef new_client(enable_http, verify, enable_http2,               max_connections, max_keepalive_connections, keepalive_expiry,               proxies, local_address, retries, max_redirects):    limit = httpx.Limits(max_connections=max_connections,                         max_keepalive_connections=max_keepalive_connections,                         keepalive_expiry=keepalive_expiry)    # See https://www.python-httpx.org/advanced/#routing    mounts = {}    for pattern, proxy_url in iter_proxies(proxies):        if not enable_http and (pattern == 'http' or pattern.startswith('http://')):            continue        if proxy_url.startswith('socks4://') \           or proxy_url.startswith('socks5://') \           or proxy_url.startswith('socks5h://'):            mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,                                                            retries)        else:            mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)    if not enable_http:        mounts['http://'] = AsyncHTTPTransportNoHttp()    transport = get_transport(verify, enable_http2, local_address, None, limit, retries)    return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)def get_loop():    global LOOP    return LOOPdef init():    # log    for logger_name in ('hpack.hpack', 'hpack.table'):        logging.getLogger(logger_name).setLevel(logging.WARNING)    # loop    def loop_thread():        global LOOP        LOOP = asyncio.new_event_loop()        LOOP.run_forever()    th = threading.Thread(        target=loop_thread,        name='asyncio_loop',        daemon=True,    )    th.start()init()
 |