client.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. import asyncio
  3. import logging
  4. import threading
  5. import httpcore
  6. import httpx
  7. from httpx_socks import AsyncProxyTransport
  8. from python_socks import parse_proxy_url
  9. import python_socks._errors
  10. from searx import logger
  11. # Optional uvloop (support Python 3.6)
  12. try:
  13. import uvloop
  14. except ImportError:
  15. pass
  16. else:
  17. uvloop.install()
  18. logger = logger.getChild('searx.http.client')
  19. LOOP = None
  20. TRANSPORT_KWARGS = {
  21. 'backend': 'asyncio',
  22. 'trust_env': False,
  23. }
  24. async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
  25. origin = httpcore._utils.url_to_origin(url)
  26. logger.debug('Drop connections for %r', origin)
  27. connections_to_close = connection_pool._connections_for_origin(origin)
  28. for connection in connections_to_close:
  29. await connection_pool._remove_from_pool(connection)
  30. try:
  31. await connection.aclose()
  32. except httpcore.NetworkError as e:
  33. logger.warning('Error closing an existing connection', exc_info=e)
  34. class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
  35. """Block HTTP request"""
  36. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  37. raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
  38. class AsyncProxyTransportFixed(AsyncProxyTransport):
  39. """Fix httpx_socks.AsyncProxyTransport
  40. Map python_socks exceptions to httpcore.ProxyError
  41. Map socket.gaierror to httpcore.ConnectError
  42. Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
  43. * self._keepalive_sweep()
  44. * self._response_closed(self, connection)
  45. Note: AsyncProxyTransport inherit from AsyncConnectionPool
  46. Note: the API is going to change on httpx 0.18.0
  47. see https://github.com/encode/httpx/pull/1522
  48. """
  49. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  50. retry = 2
  51. while retry > 0:
  52. retry -= 1
  53. try:
  54. return await super().arequest(method, url, headers, stream, ext)
  55. except (python_socks._errors.ProxyConnectionError,
  56. python_socks._errors.ProxyTimeoutError,
  57. python_socks._errors.ProxyError) as e:
  58. raise httpcore.ProxyError(e)
  59. except OSError as e:
  60. # socket.gaierror when DNS resolution fails
  61. raise httpcore.NetworkError(e)
  62. except httpcore.RemoteProtocolError as e:
  63. # in case of httpcore.RemoteProtocolError: Server disconnected
  64. await close_connections_for_url(self, url)
  65. logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
  66. # retry
  67. except (httpcore.NetworkError, httpcore.ProtocolError) as e:
  68. # httpcore.WriteError on HTTP/2 connection leaves a new opened stream
  69. # then each new request creates a new stream and raise the same WriteError
  70. await close_connections_for_url(self, url)
  71. raise e
  72. class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
  73. """Fix httpx.AsyncHTTPTransport"""
  74. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  75. retry = 2
  76. while retry > 0:
  77. retry -= 1
  78. try:
  79. return await super().arequest(method, url, headers, stream, ext)
  80. except OSError as e:
  81. # socket.gaierror when DNS resolution fails
  82. raise httpcore.ConnectError(e)
  83. except httpcore.CloseError as e:
  84. # httpcore.CloseError: [Errno 104] Connection reset by peer
  85. # raised by _keepalive_sweep()
  86. # from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
  87. await close_connections_for_url(self._pool, url)
  88. logger.warning('httpcore.CloseError: retry', exc_info=e)
  89. # retry
  90. except httpcore.RemoteProtocolError as e:
  91. # in case of httpcore.RemoteProtocolError: Server disconnected
  92. await close_connections_for_url(self._pool, url)
  93. logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
  94. # retry
  95. except (httpcore.ProtocolError, httpcore.NetworkError) as e:
  96. await close_connections_for_url(self._pool, url)
  97. raise e
  98. def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
  99. global LOOP, TRANSPORT_KWARGS
  100. # support socks5h (requests compatibility):
  101. # https://requests.readthedocs.io/en/master/user/advanced/#socks
  102. # socks5:// hostname is resolved on client side
  103. # socks5h:// hostname is resolved on proxy side
  104. rdns = False
  105. socks5h = 'socks5h://'
  106. if proxy_url.startswith(socks5h):
  107. proxy_url = 'socks5://' + proxy_url[len(socks5h):]
  108. rdns = True
  109. proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
  110. return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
  111. username=proxy_username, password=proxy_password,
  112. rdns=rdns,
  113. loop=LOOP,
  114. verify=verify,
  115. http2=http2,
  116. local_address=local_address,
  117. max_connections=limit.max_connections,
  118. max_keepalive_connections=limit.max_keepalive_connections,
  119. keepalive_expiry=limit.keepalive_expiry,
  120. retries=retries,
  121. **TRANSPORT_KWARGS)
  122. def get_transport(verify, http2, local_address, proxy_url, limit, retries):
  123. return AsyncHTTPTransportFixed(verify=verify,
  124. http2=http2,
  125. local_address=local_address,
  126. proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
  127. limits=limit,
  128. retries=retries,
  129. **TRANSPORT_KWARGS)
  130. def iter_proxies(proxies):
  131. # https://www.python-httpx.org/compatibility/#proxy-keys
  132. if isinstance(proxies, str):
  133. yield 'all://', proxies
  134. elif isinstance(proxies, dict):
  135. for pattern, proxy_url in proxies.items():
  136. yield pattern, proxy_url
  137. def new_client(enable_http, verify, enable_http2,
  138. max_connections, max_keepalive_connections, keepalive_expiry,
  139. proxies, local_address, retries, max_redirects):
  140. limit = httpx.Limits(max_connections=max_connections,
  141. max_keepalive_connections=max_keepalive_connections,
  142. keepalive_expiry=keepalive_expiry)
  143. # See https://www.python-httpx.org/advanced/#routing
  144. mounts = {}
  145. for pattern, proxy_url in iter_proxies(proxies):
  146. if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
  147. continue
  148. if proxy_url.startswith('socks4://') \
  149. or proxy_url.startswith('socks5://') \
  150. or proxy_url.startswith('socks5h://'):
  151. mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
  152. retries)
  153. else:
  154. mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
  155. if not enable_http:
  156. mounts['http://'] = AsyncHTTPTransportNoHttp()
  157. transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
  158. return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
  159. def init():
  160. # log
  161. for logger_name in ('hpack.hpack', 'hpack.table'):
  162. logging.getLogger(logger_name).setLevel(logging.WARNING)
  163. # loop
  164. def loop_thread():
  165. global LOOP
  166. LOOP = asyncio.new_event_loop()
  167. LOOP.run_forever()
  168. th = threading.Thread(
  169. target=loop_thread,
  170. name='asyncio_loop',
  171. daemon=True,
  172. )
  173. th.start()
  174. init()