client.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. import asyncio
  3. import logging
  4. import threading
  5. import httpcore
  6. import httpx
  7. from httpx_socks import AsyncProxyTransport
  8. from python_socks import parse_proxy_url
  9. import python_socks._errors
  10. from searx import logger
  11. # Optional uvloop (support Python 3.6)
  12. try:
  13. import uvloop
  14. except ImportError:
  15. pass
  16. else:
  17. uvloop.install()
  18. logger = logger.getChild('searx.http.client')
  19. LOOP = None
  20. SSLCONTEXTS = {}
  21. TRANSPORT_KWARGS = {
  22. 'backend': 'asyncio',
  23. 'trust_env': False,
  24. }
  25. async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
  26. origin = httpcore._utils.url_to_origin(url)
  27. logger.debug('Drop connections for %r', origin)
  28. connections_to_close = connection_pool._connections_for_origin(origin)
  29. for connection in connections_to_close:
  30. await connection_pool._remove_from_pool(connection)
  31. try:
  32. await connection.aclose()
  33. except httpcore.NetworkError as e:
  34. logger.warning('Error closing an existing connection', exc_info=e)
  35. def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
  36. global SSLCONTEXTS
  37. key = (proxy_url, cert, verify, trust_env, http2)
  38. if key not in SSLCONTEXTS:
  39. SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
  40. return SSLCONTEXTS[key]
  41. class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
  42. """Block HTTP request"""
  43. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  44. raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
  45. class AsyncProxyTransportFixed(AsyncProxyTransport):
  46. """Fix httpx_socks.AsyncProxyTransport
  47. Map python_socks exceptions to httpcore.ProxyError
  48. Map socket.gaierror to httpcore.ConnectError
  49. Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
  50. * self._keepalive_sweep()
  51. * self._response_closed(self, connection)
  52. Note: AsyncProxyTransport inherit from AsyncConnectionPool
  53. Note: the API is going to change on httpx 0.18.0
  54. see https://github.com/encode/httpx/pull/1522
  55. """
  56. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  57. retry = 2
  58. while retry > 0:
  59. retry -= 1
  60. try:
  61. return await super().arequest(method, url, headers, stream, ext)
  62. except (python_socks._errors.ProxyConnectionError,
  63. python_socks._errors.ProxyTimeoutError,
  64. python_socks._errors.ProxyError) as e:
  65. raise httpcore.ProxyError(e)
  66. except OSError as e:
  67. # socket.gaierror when DNS resolution fails
  68. raise httpcore.NetworkError(e)
  69. except httpcore.RemoteProtocolError as e:
  70. # in case of httpcore.RemoteProtocolError: Server disconnected
  71. await close_connections_for_url(self, url)
  72. logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
  73. # retry
  74. except (httpcore.NetworkError, httpcore.ProtocolError) as e:
  75. # httpcore.WriteError on HTTP/2 connection leaves a new opened stream
  76. # then each new request creates a new stream and raise the same WriteError
  77. await close_connections_for_url(self, url)
  78. raise e
  79. class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
  80. """Fix httpx.AsyncHTTPTransport"""
  81. async def arequest(self, method, url, headers=None, stream=None, ext=None):
  82. retry = 2
  83. while retry > 0:
  84. retry -= 1
  85. try:
  86. return await super().arequest(method, url, headers, stream, ext)
  87. except OSError as e:
  88. # socket.gaierror when DNS resolution fails
  89. raise httpcore.ConnectError(e)
  90. except httpcore.CloseError as e:
  91. # httpcore.CloseError: [Errno 104] Connection reset by peer
  92. # raised by _keepalive_sweep()
  93. # from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
  94. await close_connections_for_url(self._pool, url)
  95. logger.warning('httpcore.CloseError: retry', exc_info=e)
  96. # retry
  97. except httpcore.RemoteProtocolError as e:
  98. # in case of httpcore.RemoteProtocolError: Server disconnected
  99. await close_connections_for_url(self._pool, url)
  100. logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
  101. # retry
  102. except (httpcore.ProtocolError, httpcore.NetworkError) as e:
  103. await close_connections_for_url(self._pool, url)
  104. raise e
  105. def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
  106. # support socks5h (requests compatibility):
  107. # https://requests.readthedocs.io/en/master/user/advanced/#socks
  108. # socks5:// hostname is resolved on client side
  109. # socks5h:// hostname is resolved on proxy side
  110. rdns = False
  111. socks5h = 'socks5h://'
  112. if proxy_url.startswith(socks5h):
  113. proxy_url = 'socks5://' + proxy_url[len(socks5h):]
  114. rdns = True
  115. proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
  116. verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
  117. return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
  118. username=proxy_username, password=proxy_password,
  119. rdns=rdns,
  120. loop=get_loop(),
  121. verify=verify,
  122. http2=http2,
  123. local_address=local_address,
  124. max_connections=limit.max_connections,
  125. max_keepalive_connections=limit.max_keepalive_connections,
  126. keepalive_expiry=limit.keepalive_expiry,
  127. retries=retries,
  128. **TRANSPORT_KWARGS)
  129. def get_transport(verify, http2, local_address, proxy_url, limit, retries):
  130. verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
  131. return AsyncHTTPTransportFixed(verify=verify,
  132. http2=http2,
  133. local_address=local_address,
  134. proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
  135. limits=limit,
  136. retries=retries,
  137. **TRANSPORT_KWARGS)
  138. def iter_proxies(proxies):
  139. # https://www.python-httpx.org/compatibility/#proxy-keys
  140. if isinstance(proxies, str):
  141. yield 'all://', proxies
  142. elif isinstance(proxies, dict):
  143. for pattern, proxy_url in proxies.items():
  144. yield pattern, proxy_url
  145. def new_client(enable_http, verify, enable_http2,
  146. max_connections, max_keepalive_connections, keepalive_expiry,
  147. proxies, local_address, retries, max_redirects):
  148. limit = httpx.Limits(max_connections=max_connections,
  149. max_keepalive_connections=max_keepalive_connections,
  150. keepalive_expiry=keepalive_expiry)
  151. # See https://www.python-httpx.org/advanced/#routing
  152. mounts = {}
  153. for pattern, proxy_url in iter_proxies(proxies):
  154. if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
  155. continue
  156. if proxy_url.startswith('socks4://') \
  157. or proxy_url.startswith('socks5://') \
  158. or proxy_url.startswith('socks5h://'):
  159. mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
  160. retries)
  161. else:
  162. mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
  163. if not enable_http:
  164. mounts['http://'] = AsyncHTTPTransportNoHttp()
  165. transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
  166. return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
  167. def get_loop():
  168. global LOOP
  169. return LOOP
  170. def init():
  171. # log
  172. for logger_name in ('hpack.hpack', 'hpack.table'):
  173. logging.getLogger(logger_name).setLevel(logging.WARNING)
  174. # loop
  175. def loop_thread():
  176. global LOOP
  177. LOOP = asyncio.new_event_loop()
  178. LOOP.run_forever()
  179. th = threading.Thread(
  180. target=loop_thread,
  181. name='asyncio_loop',
  182. daemon=True,
  183. )
  184. th.start()
  185. init()