client.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # lint: pylint
  3. # pylint: disable=missing-module-docstring, global-statement
  4. import asyncio
  5. import logging
  6. import threading
  7. import anyio
  8. import httpcore
  9. import httpx
  10. from httpx_socks import AsyncProxyTransport
  11. from python_socks import (
  12. parse_proxy_url,
  13. ProxyConnectionError,
  14. ProxyTimeoutError,
  15. ProxyError
  16. )
  17. from searx import logger
  18. # Optional uvloop (support Python 3.6)
  19. try:
  20. import uvloop
  21. except ImportError:
  22. pass
  23. else:
  24. uvloop.install()
  25. logger = logger.getChild('searx.network.client')
  26. LOOP = None
  27. SSLCONTEXTS = {}
  28. TRANSPORT_KWARGS = {
  29. # use anyio :
  30. # * https://github.com/encode/httpcore/issues/344
  31. # * https://github.com/encode/httpx/discussions/1511
  32. 'backend': 'anyio',
  33. 'trust_env': False,
  34. }
  35. # pylint: disable=protected-access
  36. async def close_connections_for_url(
  37. connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL
  38. ):
  39. origin = httpcore._utils.url_to_origin(url)
  40. logger.debug('Drop connections for %r', origin)
  41. connections_to_close = connection_pool._connections_for_origin(origin)
  42. for connection in connections_to_close:
  43. await connection_pool._remove_from_pool(connection)
  44. try:
  45. await connection.aclose()
  46. except httpx.NetworkError as e:
  47. logger.warning('Error closing an existing connection', exc_info=e)
  48. # pylint: enable=protected-access
  49. def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
  50. key = (proxy_url, cert, verify, trust_env, http2)
  51. if key not in SSLCONTEXTS:
  52. SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
  53. return SSLCONTEXTS[key]
  54. class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
  55. """Block HTTP request"""
  56. async def handle_async_request(
  57. self, method, url, headers=None, stream=None, extensions=None
  58. ):
  59. raise httpx.UnsupportedProtocol('HTTP protocol is disabled')
  60. class AsyncProxyTransportFixed(AsyncProxyTransport):
  61. """Fix httpx_socks.AsyncProxyTransport
  62. Map python_socks exceptions to httpx.ProxyError / httpx.ConnectError
  63. Map socket.gaierror to httpx.ConnectError
  64. Note: AsyncProxyTransport inherit from AsyncConnectionPool
  65. """
  66. async def handle_async_request(
  67. self, method, url, headers=None, stream=None, extensions=None
  68. ):
  69. retry = 2
  70. while retry > 0:
  71. retry -= 1
  72. try:
  73. return await super().handle_async_request(
  74. method, url, headers=headers, stream=stream, extensions=extensions
  75. )
  76. except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e:
  77. raise httpx.ProxyError from e
  78. except OSError as e:
  79. # socket.gaierror when DNS resolution fails
  80. raise httpx.ConnectError from e
  81. except httpx.NetworkError as e:
  82. # httpx.WriteError on HTTP/2 connection leaves a new opened stream
  83. # then each new request creates a new stream and raise the same WriteError
  84. await close_connections_for_url(self, url)
  85. raise e
  86. except anyio.ClosedResourceError as e:
  87. await close_connections_for_url(self, url)
  88. raise httpx.CloseError from e
  89. except httpx.RemoteProtocolError as e:
  90. # in case of httpx.RemoteProtocolError: Server disconnected
  91. await close_connections_for_url(self, url)
  92. logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
  93. # retry
  94. class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
  95. """Fix httpx.AsyncHTTPTransport"""
  96. async def handle_async_request(
  97. self, method, url, headers=None, stream=None, extensions=None
  98. ):
  99. retry = 2
  100. while retry > 0:
  101. retry -= 1
  102. try:
  103. return await super().handle_async_request(
  104. method, url, headers=headers, stream=stream, extensions=extensions
  105. )
  106. except OSError as e:
  107. # socket.gaierror when DNS resolution fails
  108. raise httpx.ConnectError from e
  109. except httpx.NetworkError as e:
  110. # httpx.WriteError on HTTP/2 connection leaves a new opened stream
  111. # then each new request creates a new stream and raise the same WriteError
  112. await close_connections_for_url(self._pool, url)
  113. raise e
  114. except anyio.ClosedResourceError as e:
  115. await close_connections_for_url(self._pool, url)
  116. raise httpx.CloseError from e
  117. except httpx.RemoteProtocolError as e:
  118. # in case of httpx.RemoteProtocolError: Server disconnected
  119. await close_connections_for_url(self._pool, url)
  120. logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
  121. # retry
  122. def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
  123. # support socks5h (requests compatibility):
  124. # https://requests.readthedocs.io/en/master/user/advanced/#socks
  125. # socks5:// hostname is resolved on client side
  126. # socks5h:// hostname is resolved on proxy side
  127. rdns = False
  128. socks5h = 'socks5h://'
  129. if proxy_url.startswith(socks5h):
  130. proxy_url = 'socks5://' + proxy_url[len(socks5h):]
  131. rdns = True
  132. proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
  133. verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
  134. return AsyncProxyTransportFixed(
  135. proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
  136. username=proxy_username, password=proxy_password,
  137. rdns=rdns,
  138. loop=get_loop(),
  139. verify=verify,
  140. http2=http2,
  141. local_address=local_address,
  142. max_connections=limit.max_connections,
  143. max_keepalive_connections=limit.max_keepalive_connections,
  144. keepalive_expiry=limit.keepalive_expiry,
  145. retries=retries,
  146. **TRANSPORT_KWARGS
  147. )
  148. def get_transport(verify, http2, local_address, proxy_url, limit, retries):
  149. verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
  150. return AsyncHTTPTransportFixed(
  151. # pylint: disable=protected-access
  152. verify=verify,
  153. http2=http2,
  154. local_address=local_address,
  155. proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
  156. limits=limit,
  157. retries=retries,
  158. **TRANSPORT_KWARGS
  159. )
  160. def new_client(
  161. # pylint: disable=too-many-arguments
  162. enable_http, verify, enable_http2,
  163. max_connections, max_keepalive_connections, keepalive_expiry,
  164. proxies, local_address, retries, max_redirects, hook_log_response ):
  165. limit = httpx.Limits(
  166. max_connections=max_connections,
  167. max_keepalive_connections=max_keepalive_connections,
  168. keepalive_expiry=keepalive_expiry
  169. )
  170. # See https://www.python-httpx.org/advanced/#routing
  171. mounts = {}
  172. for pattern, proxy_url in proxies.items():
  173. if not enable_http and pattern.startswith('http://'):
  174. continue
  175. if (proxy_url.startswith('socks4://')
  176. or proxy_url.startswith('socks5://')
  177. or proxy_url.startswith('socks5h://')
  178. ):
  179. mounts[pattern] = get_transport_for_socks_proxy(
  180. verify, enable_http2, local_address, proxy_url, limit, retries
  181. )
  182. else:
  183. mounts[pattern] = get_transport(
  184. verify, enable_http2, local_address, proxy_url, limit, retries
  185. )
  186. if not enable_http:
  187. mounts['http://'] = AsyncHTTPTransportNoHttp()
  188. transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
  189. event_hooks = None
  190. if hook_log_response:
  191. event_hooks = {'response': [ hook_log_response ]}
  192. return httpx.AsyncClient(
  193. transport=transport,
  194. mounts=mounts,
  195. max_redirects=max_redirects,
  196. event_hooks=event_hooks,
  197. )
  198. def get_loop():
  199. return LOOP
  200. def init():
  201. # log
  202. for logger_name in ('hpack.hpack', 'hpack.table', 'httpx._client'):
  203. logging.getLogger(logger_name).setLevel(logging.WARNING)
  204. # loop
  205. def loop_thread():
  206. global LOOP
  207. LOOP = asyncio.new_event_loop()
  208. LOOP.run_forever()
  209. thread = threading.Thread(
  210. target=loop_thread,
  211. name='asyncio_loop',
  212. daemon=True,
  213. )
  214. thread.start()
  215. init()