poolrequests.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import sys
  2. from time import time
  3. from itertools import cycle
  4. from threading import RLock, local
  5. import requests
  6. from searx import settings
  7. from searx import logger
  8. from searx.raise_for_httperror import raise_for_httperror
  9. logger = logger.getChild('poolrequests')
  10. try:
  11. import ssl
  12. if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
  13. # https://github.com/certifi/python-certifi#1024-bit-root-certificates
  14. logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
  15. .format(ssl.OPENSSL_VERSION))
  16. sys.exit(1)
  17. except ImportError:
  18. ssl = None
  19. if not getattr(ssl, "HAS_SNI", False):
  20. try:
  21. import OpenSSL # pylint: disable=unused-import
  22. except ImportError:
  23. logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
  24. "Some HTTPS connections will fail")
  25. sys.exit(1)
  26. class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
  27. def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
  28. pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
  29. max_retries=requests.adapters.DEFAULT_RETRIES,
  30. pool_block=requests.adapters.DEFAULT_POOLBLOCK,
  31. **conn_params):
  32. if max_retries == requests.adapters.DEFAULT_RETRIES:
  33. self.max_retries = requests.adapters.Retry(0, read=False)
  34. else:
  35. self.max_retries = requests.adapters.Retry.from_int(max_retries)
  36. self.config = {}
  37. self.proxy_manager = {}
  38. super().__init__()
  39. self._pool_connections = pool_connections
  40. self._pool_maxsize = pool_maxsize
  41. self._pool_block = pool_block
  42. self._conn_params = conn_params
  43. self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
  44. def __setstate__(self, state):
  45. # Can't handle by adding 'proxy_manager' to self.__attrs__ because
  46. # because self.poolmanager uses a lambda function, which isn't pickleable.
  47. self.proxy_manager = {}
  48. self.config = {}
  49. for attr, value in state.items():
  50. setattr(self, attr, value)
  51. self.init_poolmanager(self._pool_connections, self._pool_maxsize,
  52. block=self._pool_block, **self._conn_params)
  53. threadLocal = local()
  54. connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
  55. maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
  56. if settings['outgoing'].get('source_ips'):
  57. http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  58. source_address=(source_ip, 0))
  59. for source_ip in settings['outgoing']['source_ips'])
  60. https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  61. source_address=(source_ip, 0))
  62. for source_ip in settings['outgoing']['source_ips'])
  63. else:
  64. http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  65. https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  66. class SessionSinglePool(requests.Session):
  67. def __init__(self):
  68. super().__init__()
  69. # reuse the same adapters
  70. with RLock():
  71. self.adapters.clear()
  72. self.mount('https://', next(https_adapters))
  73. self.mount('http://', next(http_adapters))
  74. def close(self):
  75. """Call super, but clear adapters since there are managed globaly"""
  76. self.adapters.clear()
  77. super().close()
  78. def set_timeout_for_thread(timeout, start_time=None):
  79. threadLocal.timeout = timeout
  80. threadLocal.start_time = start_time
  81. def reset_time_for_thread():
  82. threadLocal.total_time = 0
  83. def get_time_for_thread():
  84. return threadLocal.total_time
  85. def get_proxy_cycles(proxy_settings):
  86. if not proxy_settings:
  87. return None
  88. # Backwards compatibility for single proxy in settings.yml
  89. for protocol, proxy in proxy_settings.items():
  90. if isinstance(proxy, str):
  91. proxy_settings[protocol] = [proxy]
  92. for protocol in proxy_settings:
  93. proxy_settings[protocol] = cycle(proxy_settings[protocol])
  94. return proxy_settings
  95. GLOBAL_PROXY_CYCLES = get_proxy_cycles(settings['outgoing'].get('proxies'))
  96. def get_proxies(proxy_cycles):
  97. if proxy_cycles:
  98. return {protocol: next(proxy_cycle) for protocol, proxy_cycle in proxy_cycles.items()}
  99. return None
  100. def get_global_proxies():
  101. return get_proxies(GLOBAL_PROXY_CYCLES)
  102. def request(method, url, **kwargs):
  103. """same as requests/requests/api.py request(...)"""
  104. time_before_request = time()
  105. # session start
  106. session = SessionSinglePool()
  107. # proxies
  108. if not kwargs.get('proxies'):
  109. kwargs['proxies'] = get_global_proxies()
  110. # timeout
  111. if 'timeout' in kwargs:
  112. timeout = kwargs['timeout']
  113. else:
  114. timeout = getattr(threadLocal, 'timeout', None)
  115. if timeout is not None:
  116. kwargs['timeout'] = timeout
  117. # raise_for_error
  118. check_for_httperror = True
  119. if 'raise_for_httperror' in kwargs:
  120. check_for_httperror = kwargs['raise_for_httperror']
  121. del kwargs['raise_for_httperror']
  122. # do request
  123. response = session.request(method=method, url=url, **kwargs)
  124. time_after_request = time()
  125. # is there a timeout for this engine ?
  126. if timeout is not None:
  127. timeout_overhead = 0.2 # seconds
  128. # start_time = when the user request started
  129. start_time = getattr(threadLocal, 'start_time', time_before_request)
  130. search_duration = time_after_request - start_time
  131. if search_duration > timeout + timeout_overhead:
  132. raise requests.exceptions.Timeout(response=response)
  133. # session end
  134. session.close()
  135. if hasattr(threadLocal, 'total_time'):
  136. threadLocal.total_time += time_after_request - time_before_request
  137. # raise an exception
  138. if check_for_httperror:
  139. raise_for_httperror(response)
  140. return response
  141. def get(url, **kwargs):
  142. kwargs.setdefault('allow_redirects', True)
  143. return request('get', url, **kwargs)
  144. def options(url, **kwargs):
  145. kwargs.setdefault('allow_redirects', True)
  146. return request('options', url, **kwargs)
  147. def head(url, **kwargs):
  148. kwargs.setdefault('allow_redirects', False)
  149. return request('head', url, **kwargs)
  150. def post(url, data=None, **kwargs):
  151. return request('post', url, data=data, **kwargs)
  152. def put(url, data=None, **kwargs):
  153. return request('put', url, data=data, **kwargs)
  154. def patch(url, data=None, **kwargs):
  155. return request('patch', url, data=data, **kwargs)
  156. def delete(url, **kwargs):
  157. return request('delete', url, **kwargs)