poolrequests.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import sys
  2. from time import time
  3. from itertools import cycle
  4. from threading import RLock, local
  5. import requests
  6. from searx import settings
  7. from searx import logger
  8. logger = logger.getChild('poolrequests')
  9. try:
  10. import ssl
  11. if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
  12. # https://github.com/certifi/python-certifi#1024-bit-root-certificates
  13. logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
  14. .format(ssl.OPENSSL_VERSION))
  15. sys.exit(1)
  16. except ImportError:
  17. ssl = None
  18. if not getattr(ssl, "HAS_SNI", False):
  19. try:
  20. import OpenSSL # pylint: disable=unused-import
  21. except ImportError:
  22. logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
  23. "Some HTTPS connections will fail")
  24. sys.exit(1)
  25. class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
  26. def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
  27. pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
  28. max_retries=requests.adapters.DEFAULT_RETRIES,
  29. pool_block=requests.adapters.DEFAULT_POOLBLOCK,
  30. **conn_params):
  31. if max_retries == requests.adapters.DEFAULT_RETRIES:
  32. self.max_retries = requests.adapters.Retry(0, read=False)
  33. else:
  34. self.max_retries = requests.adapters.Retry.from_int(max_retries)
  35. self.config = {}
  36. self.proxy_manager = {}
  37. super().__init__()
  38. self._pool_connections = pool_connections
  39. self._pool_maxsize = pool_maxsize
  40. self._pool_block = pool_block
  41. self._conn_params = conn_params
  42. self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
  43. def __setstate__(self, state):
  44. # Can't handle by adding 'proxy_manager' to self.__attrs__ because
  45. # because self.poolmanager uses a lambda function, which isn't pickleable.
  46. self.proxy_manager = {}
  47. self.config = {}
  48. for attr, value in state.items():
  49. setattr(self, attr, value)
  50. self.init_poolmanager(self._pool_connections, self._pool_maxsize,
  51. block=self._pool_block, **self._conn_params)
  52. threadLocal = local()
  53. connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
  54. maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
  55. if settings['outgoing'].get('source_ips'):
  56. http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  57. source_address=(source_ip, 0))
  58. for source_ip in settings['outgoing']['source_ips'])
  59. https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  60. source_address=(source_ip, 0))
  61. for source_ip in settings['outgoing']['source_ips'])
  62. else:
  63. http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  64. https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  65. class SessionSinglePool(requests.Session):
  66. def __init__(self):
  67. super().__init__()
  68. # reuse the same adapters
  69. with RLock():
  70. self.adapters.clear()
  71. self.mount('https://', next(https_adapters))
  72. self.mount('http://', next(http_adapters))
  73. def close(self):
  74. """Call super, but clear adapters since there are managed globaly"""
  75. self.adapters.clear()
  76. super().close()
  77. def set_timeout_for_thread(timeout, start_time=None):
  78. threadLocal.timeout = timeout
  79. threadLocal.start_time = start_time
  80. def reset_time_for_thread():
  81. threadLocal.total_time = 0
  82. def get_time_for_thread():
  83. return threadLocal.total_time
  84. def request(method, url, **kwargs):
  85. """same as requests/requests/api.py request(...)"""
  86. time_before_request = time()
  87. # session start
  88. session = SessionSinglePool()
  89. # proxies
  90. if kwargs.get('proxies') is None:
  91. kwargs['proxies'] = settings['outgoing'].get('proxies')
  92. # timeout
  93. if 'timeout' in kwargs:
  94. timeout = kwargs['timeout']
  95. else:
  96. timeout = getattr(threadLocal, 'timeout', None)
  97. if timeout is not None:
  98. kwargs['timeout'] = timeout
  99. # do request
  100. response = session.request(method=method, url=url, **kwargs)
  101. time_after_request = time()
  102. # is there a timeout for this engine ?
  103. if timeout is not None:
  104. timeout_overhead = 0.2 # seconds
  105. # start_time = when the user request started
  106. start_time = getattr(threadLocal, 'start_time', time_before_request)
  107. search_duration = time_after_request - start_time
  108. if search_duration > timeout + timeout_overhead:
  109. raise requests.exceptions.Timeout(response=response)
  110. # session end
  111. session.close()
  112. if hasattr(threadLocal, 'total_time'):
  113. threadLocal.total_time += time_after_request - time_before_request
  114. return response
  115. def get(url, **kwargs):
  116. kwargs.setdefault('allow_redirects', True)
  117. return request('get', url, **kwargs)
  118. def options(url, **kwargs):
  119. kwargs.setdefault('allow_redirects', True)
  120. return request('options', url, **kwargs)
  121. def head(url, **kwargs):
  122. kwargs.setdefault('allow_redirects', False)
  123. return request('head', url, **kwargs)
  124. def post(url, data=None, **kwargs):
  125. return request('post', url, data=data, **kwargs)
  126. def put(url, data=None, **kwargs):
  127. return request('put', url, data=data, **kwargs)
  128. def patch(url, data=None, **kwargs):
  129. return request('patch', url, data=data, **kwargs)
  130. def delete(url, **kwargs):
  131. return request('delete', url, **kwargs)