client.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # lint: pylint
  3. # pylint: disable=missing-module-docstring, missing-function-docstring, global-statement
  4. import asyncio
  5. import logging
  6. import threading
  7. import httpcore
  8. import httpx
  9. from httpx_socks import AsyncProxyTransport
  10. from python_socks import (
  11. parse_proxy_url,
  12. ProxyConnectionError,
  13. ProxyTimeoutError,
  14. ProxyError
  15. )
  16. from searx import logger
  17. # Optional uvloop (support Python 3.6)
  18. try:
  19. import uvloop
  20. except ImportError:
  21. pass
  22. else:
  23. uvloop.install()
  24. logger = logger.getChild('searx.http.client')
  25. LOOP = None
  26. SSLCONTEXTS = {}
  27. TRANSPORT_KWARGS = {
  28. 'backend': 'asyncio',
  29. 'trust_env': False,
  30. }
  31. # pylint: disable=protected-access
  32. async def close_connections_for_url(
  33. connection_pool: httpcore.AsyncConnectionPool,
  34. url: httpcore._utils.URL ):
  35. origin = httpcore._utils.url_to_origin(url)
  36. logger.debug('Drop connections for %r', origin)
  37. connections_to_close = connection_pool._connections_for_origin(origin)
  38. for connection in connections_to_close:
  39. await connection_pool._remove_from_pool(connection)
  40. try:
  41. await connection.aclose()
  42. except httpcore.NetworkError as e:
  43. logger.warning('Error closing an existing connection', exc_info=e)
  44. # pylint: enable=protected-access
  45. def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
  46. global SSLCONTEXTS
  47. key = (proxy_url, cert, verify, trust_env, http2)
  48. if key not in SSLCONTEXTS:
  49. SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
  50. return SSLCONTEXTS[key]
  51. class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
  52. """Block HTTP request"""
  53. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  54. raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
  55. class AsyncProxyTransportFixed(AsyncProxyTransport):
  56. """Fix httpx_socks.AsyncProxyTransport
  57. Map python_socks exceptions to httpcore.ProxyError
  58. Map socket.gaierror to httpcore.ConnectError
  59. Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
  60. * self._keepalive_sweep()
  61. * self._response_closed(self, connection)
  62. Note: AsyncProxyTransport inherit from AsyncConnectionPool
  63. Note: the API is going to change on httpx 0.18.0
  64. see https://github.com/encode/httpx/pull/1522
  65. """
  66. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  67. retry = 2
  68. while retry > 0:
  69. retry -= 1
  70. try:
  71. return await super().arequest(method, url, headers, stream, ext)
  72. except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e:
  73. raise httpcore.ProxyError(e)
  74. except OSError as e:
  75. # socket.gaierror when DNS resolution fails
  76. raise httpcore.NetworkError(e)
  77. except httpcore.RemoteProtocolError as e:
  78. # in case of httpcore.RemoteProtocolError: Server disconnected
  79. await close_connections_for_url(self, url)
  80. logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
  81. # retry
  82. except (httpcore.NetworkError, httpcore.ProtocolError) as e:
  83. # httpcore.WriteError on HTTP/2 connection leaves a new opened stream
  84. # then each new request creates a new stream and raise the same WriteError
  85. await close_connections_for_url(self, url)
  86. raise e
  87. class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
  88. """Fix httpx.AsyncHTTPTransport"""
  89. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  90. retry = 2
  91. while retry > 0:
  92. retry -= 1
  93. try:
  94. return await super().arequest(method, url, headers, stream, ext)
  95. except OSError as e:
  96. # socket.gaierror when DNS resolution fails
  97. raise httpcore.ConnectError(e)
  98. except httpcore.CloseError as e:
  99. # httpcore.CloseError: [Errno 104] Connection reset by peer
  100. # raised by _keepalive_sweep()
  101. # from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # pylint: disable=line-too-long
  102. await close_connections_for_url(self._pool, url)
  103. logger.warning('httpcore.CloseError: retry', exc_info=e)
  104. # retry
  105. except httpcore.RemoteProtocolError as e:
  106. # in case of httpcore.RemoteProtocolError: Server disconnected
  107. await close_connections_for_url(self._pool, url)
  108. logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
  109. # retry
  110. except (httpcore.ProtocolError, httpcore.NetworkError) as e:
  111. await close_connections_for_url(self._pool, url)
  112. raise e
  113. def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
  114. global TRANSPORT_KWARGS
  115. # support socks5h (requests compatibility):
  116. # https://requests.readthedocs.io/en/master/user/advanced/#socks
  117. # socks5:// hostname is resolved on client side
  118. # socks5h:// hostname is resolved on proxy side
  119. rdns = False
  120. socks5h = 'socks5h://'
  121. if proxy_url.startswith(socks5h):
  122. proxy_url = 'socks5://' + proxy_url[len(socks5h):]
  123. rdns = True
  124. proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
  125. verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
  126. return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
  127. username=proxy_username, password=proxy_password,
  128. rdns=rdns,
  129. loop=get_loop(),
  130. verify=verify,
  131. http2=http2,
  132. local_address=local_address,
  133. max_connections=limit.max_connections,
  134. max_keepalive_connections=limit.max_keepalive_connections,
  135. keepalive_expiry=limit.keepalive_expiry,
  136. retries=retries,
  137. **TRANSPORT_KWARGS)
  138. def get_transport(verify, http2, local_address, proxy_url, limit, retries):
  139. global TRANSPORT_KWARGS
  140. verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
  141. return AsyncHTTPTransportFixed(
  142. # pylint: disable=protected-access
  143. verify=verify,
  144. http2=http2,
  145. local_address=local_address,
  146. proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
  147. limits=limit,
  148. retries=retries,
  149. **TRANSPORT_KWARGS
  150. )
  151. def iter_proxies(proxies):
  152. # https://www.python-httpx.org/compatibility/#proxy-keys
  153. if isinstance(proxies, str):
  154. yield 'all://', proxies
  155. elif isinstance(proxies, dict):
  156. for pattern, proxy_url in proxies.items():
  157. yield pattern, proxy_url
  158. def new_client(
  159. # pylint: disable=too-many-arguments
  160. enable_http, verify, enable_http2,
  161. max_connections, max_keepalive_connections, keepalive_expiry,
  162. proxies, local_address, retries, max_redirects ):
  163. limit = httpx.Limits(max_connections=max_connections,
  164. max_keepalive_connections=max_keepalive_connections,
  165. keepalive_expiry=keepalive_expiry)
  166. # See https://www.python-httpx.org/advanced/#routing
  167. mounts = {}
  168. for pattern, proxy_url in iter_proxies(proxies):
  169. if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
  170. continue
  171. if proxy_url.startswith('socks4://') \
  172. or proxy_url.startswith('socks5://') \
  173. or proxy_url.startswith('socks5h://'):
  174. mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
  175. retries)
  176. else:
  177. mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
  178. if not enable_http:
  179. mounts['http://'] = AsyncHTTPTransportNoHttp()
  180. transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
  181. return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
  182. def get_loop():
  183. global LOOP
  184. return LOOP
  185. def init():
  186. # log
  187. for logger_name in ('hpack.hpack', 'hpack.table'):
  188. logging.getLogger(logger_name).setLevel(logging.WARNING)
  189. # loop
  190. def loop_thread():
  191. global LOOP
  192. LOOP = asyncio.new_event_loop()
  193. LOOP.run_forever()
  194. thread = threading.Thread(
  195. target=loop_thread,
  196. name='asyncio_loop',
  197. daemon=True,
  198. )
  199. thread.start()
  200. init()