poolrequests.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. import sys
  2. from time import time
  3. from itertools import cycle
  4. from threading import RLock, local
  5. import requests
  6. from searx import settings
  7. from searx import logger
  8. logger = logger.getChild('poolrequests')
  9. try:
  10. import ssl
  11. if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
  12. # https://github.com/certifi/python-certifi#1024-bit-root-certificates
  13. logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
  14. .format(ssl.OPENSSL_VERSION))
  15. sys.exit(1)
  16. except ImportError:
  17. ssl = None
  18. if not getattr(ssl, "HAS_SNI", False):
  19. try:
  20. import OpenSSL # pylint: disable=unused-import
  21. except ImportError:
  22. logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
  23. "Some HTTPS connections will fail")
  24. sys.exit(1)
  25. class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
  26. def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
  27. pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
  28. max_retries=requests.adapters.DEFAULT_RETRIES,
  29. pool_block=requests.adapters.DEFAULT_POOLBLOCK,
  30. **conn_params):
  31. if max_retries == requests.adapters.DEFAULT_RETRIES:
  32. self.max_retries = requests.adapters.Retry(0, read=False)
  33. else:
  34. self.max_retries = requests.adapters.Retry.from_int(max_retries)
  35. self.config = {}
  36. self.proxy_manager = {}
  37. super().__init__()
  38. self._pool_connections = pool_connections
  39. self._pool_maxsize = pool_maxsize
  40. self._pool_block = pool_block
  41. self._conn_params = conn_params
  42. self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
  43. def __setstate__(self, state):
  44. # Can't handle by adding 'proxy_manager' to self.__attrs__ because
  45. # because self.poolmanager uses a lambda function, which isn't pickleable.
  46. self.proxy_manager = {}
  47. self.config = {}
  48. for attr, value in state.items():
  49. setattr(self, attr, value)
  50. self.init_poolmanager(self._pool_connections, self._pool_maxsize,
  51. block=self._pool_block, **self._conn_params)
  52. threadLocal = local()
  53. connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
  54. maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
  55. if settings['outgoing'].get('source_ips'):
  56. http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  57. source_address=(source_ip, 0))
  58. for source_ip in settings['outgoing']['source_ips'])
  59. https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  60. source_address=(source_ip, 0))
  61. for source_ip in settings['outgoing']['source_ips'])
  62. else:
  63. http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  64. https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  65. class SessionSinglePool(requests.Session):
  66. def __init__(self):
  67. super().__init__()
  68. # reuse the same adapters
  69. with RLock():
  70. self.adapters.clear()
  71. self.mount('https://', next(https_adapters))
  72. self.mount('http://', next(http_adapters))
  73. def close(self):
  74. """Call super, but clear adapters since there are managed globaly"""
  75. self.adapters.clear()
  76. super().close()
  77. def set_timeout_for_thread(timeout, start_time=None):
  78. threadLocal.timeout = timeout
  79. threadLocal.start_time = start_time
  80. def reset_time_for_thread():
  81. threadLocal.total_time = 0
  82. def get_time_for_thread():
  83. return threadLocal.total_time
  84. def get_proxy_cycles(proxy_settings):
  85. if not proxy_settings:
  86. return None
  87. # Backwards compatibility for single proxy in settings.yml
  88. for protocol, proxy in proxy_settings.items():
  89. if isinstance(proxy, str):
  90. proxy_settings[protocol] = [proxy]
  91. for protocol in proxy_settings:
  92. proxy_settings[protocol] = cycle(proxy_settings[protocol])
  93. return proxy_settings
  94. GLOBAL_PROXY_CYCLES = get_proxy_cycles(settings['outgoing'].get('proxies'))
  95. def get_proxies(proxy_cycles):
  96. if proxy_cycles:
  97. return {protocol: next(proxy_cycle) for protocol, proxy_cycle in proxy_cycles.items()}
  98. return None
  99. def get_global_proxies():
  100. return get_proxies(GLOBAL_PROXY_CYCLES)
  101. def request(method, url, **kwargs):
  102. """same as requests/requests/api.py request(...)"""
  103. time_before_request = time()
  104. # session start
  105. session = SessionSinglePool()
  106. # proxies
  107. if not kwargs.get('proxies'):
  108. kwargs['proxies'] = get_global_proxies()
  109. # timeout
  110. if 'timeout' in kwargs:
  111. timeout = kwargs['timeout']
  112. else:
  113. timeout = getattr(threadLocal, 'timeout', None)
  114. if timeout is not None:
  115. kwargs['timeout'] = timeout
  116. # do request
  117. response = session.request(method=method, url=url, **kwargs)
  118. time_after_request = time()
  119. # is there a timeout for this engine ?
  120. if timeout is not None:
  121. timeout_overhead = 0.2 # seconds
  122. # start_time = when the user request started
  123. start_time = getattr(threadLocal, 'start_time', time_before_request)
  124. search_duration = time_after_request - start_time
  125. if search_duration > timeout + timeout_overhead:
  126. raise requests.exceptions.Timeout(response=response)
  127. # session end
  128. session.close()
  129. if hasattr(threadLocal, 'total_time'):
  130. threadLocal.total_time += time_after_request - time_before_request
  131. return response
  132. def get(url, **kwargs):
  133. kwargs.setdefault('allow_redirects', True)
  134. return request('get', url, **kwargs)
  135. def options(url, **kwargs):
  136. kwargs.setdefault('allow_redirects', True)
  137. return request('options', url, **kwargs)
  138. def head(url, **kwargs):
  139. kwargs.setdefault('allow_redirects', False)
  140. return request('head', url, **kwargs)
  141. def post(url, data=None, **kwargs):
  142. return request('post', url, data=data, **kwargs)
  143. def put(url, data=None, **kwargs):
  144. return request('put', url, data=data, **kwargs)
  145. def patch(url, data=None, **kwargs):
  146. return request('patch', url, data=data, **kwargs)
  147. def delete(url, **kwargs):
  148. return request('delete', url, **kwargs)