client.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # lint: pylint
  3. # pylint: disable=missing-module-docstring, global-statement
  4. import asyncio
  5. import logging
  6. import threading
  7. import anyio
  8. import httpcore
  9. import httpx
  10. from httpx_socks import AsyncProxyTransport
  11. from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError
  12. from searx import logger
  13. # Optional uvloop (support Python 3.6)
  14. try:
  15. import uvloop
  16. except ImportError:
  17. pass
  18. else:
  19. uvloop.install()
  20. logger = logger.getChild('searx.network.client')
  21. LOOP = None
  22. SSLCONTEXTS = {}
  23. TRANSPORT_KWARGS = {
  24. # use anyio :
  25. # * https://github.com/encode/httpcore/issues/344
  26. # * https://github.com/encode/httpx/discussions/1511
  27. 'backend': 'anyio',
  28. 'trust_env': False,
  29. }
  30. # pylint: disable=protected-access
  31. async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
  32. origin = httpcore._utils.url_to_origin(url)
  33. logger.debug('Drop connections for %r', origin)
  34. connections_to_close = connection_pool._connections_for_origin(origin)
  35. for connection in connections_to_close:
  36. await connection_pool._remove_from_pool(connection)
  37. try:
  38. await connection.aclose()
  39. except httpx.NetworkError as e:
  40. logger.warning('Error closing an existing connection', exc_info=e)
  41. # pylint: enable=protected-access
  42. def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
  43. key = (proxy_url, cert, verify, trust_env, http2)
  44. if key not in SSLCONTEXTS:
  45. SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
  46. return SSLCONTEXTS[key]
  47. class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
  48. """Block HTTP request"""
  49. async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
  50. raise httpx.UnsupportedProtocol('HTTP protocol is disabled')
  51. class AsyncProxyTransportFixed(AsyncProxyTransport):
  52. """Fix httpx_socks.AsyncProxyTransport
  53. Map python_socks exceptions to httpx.ProxyError / httpx.ConnectError
  54. Map socket.gaierror to httpx.ConnectError
  55. Note: AsyncProxyTransport inherit from AsyncConnectionPool
  56. """
  57. async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
  58. retry = 2
  59. while retry > 0:
  60. retry -= 1
  61. try:
  62. return await super().handle_async_request(
  63. method, url, headers=headers, stream=stream, extensions=extensions
  64. )
  65. except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e:
  66. raise httpx.ProxyError from e
  67. except OSError as e:
  68. # socket.gaierror when DNS resolution fails
  69. raise httpx.ConnectError from e
  70. except httpx.NetworkError as e:
  71. # httpx.WriteError on HTTP/2 connection leaves a new opened stream
  72. # then each new request creates a new stream and raise the same WriteError
  73. await close_connections_for_url(self, url)
  74. raise e
  75. except anyio.ClosedResourceError as e:
  76. await close_connections_for_url(self, url)
  77. raise httpx.CloseError from e
  78. except httpx.RemoteProtocolError as e:
  79. # in case of httpx.RemoteProtocolError: Server disconnected
  80. await close_connections_for_url(self, url)
  81. logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
  82. # retry
  83. class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
  84. """Fix httpx.AsyncHTTPTransport"""
  85. async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
  86. retry = 2
  87. while retry > 0:
  88. retry -= 1
  89. try:
  90. return await super().handle_async_request(
  91. method, url, headers=headers, stream=stream, extensions=extensions
  92. )
  93. except OSError as e:
  94. # socket.gaierror when DNS resolution fails
  95. raise httpx.ConnectError from e
  96. except httpx.NetworkError as e:
  97. # httpx.WriteError on HTTP/2 connection leaves a new opened stream
  98. # then each new request creates a new stream and raise the same WriteError
  99. await close_connections_for_url(self._pool, url)
  100. raise e
  101. except anyio.ClosedResourceError as e:
  102. await close_connections_for_url(self._pool, url)
  103. raise httpx.CloseError from e
  104. except httpx.RemoteProtocolError as e:
  105. # in case of httpx.RemoteProtocolError: Server disconnected
  106. await close_connections_for_url(self._pool, url)
  107. logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
  108. # retry
  109. def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
  110. # support socks5h (requests compatibility):
  111. # https://requests.readthedocs.io/en/master/user/advanced/#socks
  112. # socks5:// hostname is resolved on client side
  113. # socks5h:// hostname is resolved on proxy side
  114. rdns = False
  115. socks5h = 'socks5h://'
  116. if proxy_url.startswith(socks5h):
  117. proxy_url = 'socks5://' + proxy_url[len(socks5h) :]
  118. rdns = True
  119. proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
  120. verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
  121. return AsyncProxyTransportFixed(
  122. proxy_type=proxy_type,
  123. proxy_host=proxy_host,
  124. proxy_port=proxy_port,
  125. username=proxy_username,
  126. password=proxy_password,
  127. rdns=rdns,
  128. loop=get_loop(),
  129. verify=verify,
  130. http2=http2,
  131. local_address=local_address,
  132. max_connections=limit.max_connections,
  133. max_keepalive_connections=limit.max_keepalive_connections,
  134. keepalive_expiry=limit.keepalive_expiry,
  135. retries=retries,
  136. **TRANSPORT_KWARGS,
  137. )
  138. def get_transport(verify, http2, local_address, proxy_url, limit, retries):
  139. verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
  140. return AsyncHTTPTransportFixed(
  141. # pylint: disable=protected-access
  142. verify=verify,
  143. http2=http2,
  144. local_address=local_address,
  145. proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
  146. limits=limit,
  147. retries=retries,
  148. **TRANSPORT_KWARGS,
  149. )
  150. def new_client(
  151. # pylint: disable=too-many-arguments
  152. enable_http,
  153. verify,
  154. enable_http2,
  155. max_connections,
  156. max_keepalive_connections,
  157. keepalive_expiry,
  158. proxies,
  159. local_address,
  160. retries,
  161. max_redirects,
  162. hook_log_response,
  163. ):
  164. limit = httpx.Limits(
  165. max_connections=max_connections,
  166. max_keepalive_connections=max_keepalive_connections,
  167. keepalive_expiry=keepalive_expiry,
  168. )
  169. # See https://www.python-httpx.org/advanced/#routing
  170. mounts = {}
  171. for pattern, proxy_url in proxies.items():
  172. if not enable_http and pattern.startswith('http://'):
  173. continue
  174. if proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') or proxy_url.startswith('socks5h://'):
  175. mounts[pattern] = get_transport_for_socks_proxy(
  176. verify, enable_http2, local_address, proxy_url, limit, retries
  177. )
  178. else:
  179. mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
  180. if not enable_http:
  181. mounts['http://'] = AsyncHTTPTransportNoHttp()
  182. transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
  183. event_hooks = None
  184. if hook_log_response:
  185. event_hooks = {'response': [hook_log_response]}
  186. return httpx.AsyncClient(
  187. transport=transport,
  188. mounts=mounts,
  189. max_redirects=max_redirects,
  190. event_hooks=event_hooks,
  191. )
  192. def get_loop():
  193. return LOOP
  194. def init():
  195. # log
  196. for logger_name in ('hpack.hpack', 'hpack.table', 'httpx._client'):
  197. logging.getLogger(logger_name).setLevel(logging.WARNING)
  198. # loop
  199. def loop_thread():
  200. global LOOP
  201. LOOP = asyncio.new_event_loop()
  202. LOOP.run_forever()
  203. thread = threading.Thread(
  204. target=loop_thread,
  205. name='asyncio_loop',
  206. daemon=True,
  207. )
  208. thread.start()
  209. init()