|
@@ -10,10 +10,14 @@ engines:
|
|
|
- :ref:`google autocomplete`
|
|
|
|
|
|
"""
|
|
|
+from __future__ import annotations
|
|
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
|
|
import re
|
|
|
+import random
|
|
|
+import string
|
|
|
+import time
|
|
|
from urllib.parse import urlencode
|
|
|
from lxml import html
|
|
|
import babel
|
|
@@ -64,11 +68,31 @@ filter_mapping = {0: 'off', 1: 'medium', 2: 'high'}
|
|
|
# from the links not the links itself.
|
|
|
suggestion_xpath = '//div[contains(@class, "EIaa9b")]//a'
|
|
|
|
|
|
-# UI_ASYNC = 'use_ac:true,_fmt:html' # returns a HTTP 500 when user search for
|
|
|
-# # celebrities like '!google natasha allegri'
|
|
|
-# # or '!google chris evans'
|
|
|
-UI_ASYNC = 'use_ac:true,_fmt:prog'
|
|
|
-"""Format of the response from UI's async request."""
|
|
|
+
|
|
|
+_arcid_range = string.ascii_letters + string.digits + "_-"
|
|
|
+_arcid_random: tuple[str, int] | None = None
|
|
|
+
|
|
|
+
|
|
|
+def ui_async(start: int) -> str:
|
|
|
+ """Format of the response from UI's async request.
|
|
|
+
|
|
|
+ - ``arc_id:<...>,use_ac:true,_fmt:prog``
|
|
|
+
|
|
|
+ The arc_id is random generated every hour.
|
|
|
+ """
|
|
|
+ global _arcid_random # pylint: disable=global-statement
|
|
|
+
|
|
|
+ use_ac = "use_ac:true"
|
|
|
+ # _fmt:html returns a HTTP 500 when user search for celebrities like
|
|
|
+ # '!google natasha allegri' or '!google chris evans'
|
|
|
+ _fmt = "_fmt:prog"
|
|
|
+
|
|
|
+ # create a new random arc_id every hour
|
|
|
+ if not _arcid_random or (int(time.time()) - _arcid_random[1]) > 3600:
|
|
|
+ _arcid_random = (''.join(random.choices(_arcid_range, k=23)), int(time.time()))
|
|
|
+ arc_id = f"arc_id:srp_{_arcid_random[0]}_1{start:02}"
|
|
|
+
|
|
|
+ return ",".join([arc_id, use_ac, _fmt])
|
|
|
|
|
|
|
|
|
def get_google_info(params, eng_traits):
|
|
@@ -258,8 +282,10 @@ def detect_google_sorry(resp):
|
|
|
def request(query, params):
|
|
|
"""Google search request"""
|
|
|
# pylint: disable=line-too-long
|
|
|
- offset = (params['pageno'] - 1) * 10
|
|
|
+ start = (params['pageno'] - 1) * 10
|
|
|
+ str_async = ui_async(start)
|
|
|
google_info = get_google_info(params, traits)
|
|
|
+ logger.debug("ARC_ID: %s", str_async)
|
|
|
|
|
|
# https://www.google.de/search?q=corona&hl=de&lr=lang_de&start=0&tbs=qdr%3Ad&safe=medium
|
|
|
query_url = (
|
|
@@ -272,7 +298,7 @@ def request(query, params):
|
|
|
'q': query,
|
|
|
**google_info['params'],
|
|
|
'filter': '0',
|
|
|
- 'start': offset,
|
|
|
+ 'start': start,
|
|
|
# 'vet': '12ahUKEwik3ZbIzfn7AhXMX_EDHbUDBh0QxK8CegQIARAC..i',
|
|
|
# 'ved': '2ahUKEwik3ZbIzfn7AhXMX_EDHbUDBh0Q_skCegQIARAG',
|
|
|
# 'cs' : 1,
|
|
@@ -284,7 +310,7 @@ def request(query, params):
|
|
|
# 'sstk': 'AcOHfVkD7sWCSAheZi-0tx_09XDO55gTWY0JNq3_V26cNN-c8lfD45aZYPI8s_Bqp8s57AHz5pxchDtAGCA_cikAWSjy9kw3kgg'
|
|
|
# formally known as use_mobile_ui
|
|
|
'asearch': 'arc',
|
|
|
- 'async': UI_ASYNC,
|
|
|
+ 'async': str_async,
|
|
|
}
|
|
|
)
|
|
|
)
|
|
@@ -303,15 +329,20 @@ def request(query, params):
|
|
|
# =26;[3,"dimg_ZNMiZPCqE4apxc8P3a2tuAQ_137"]a87;data:image/jpeg;base64,/9j/4AAQSkZJRgABA
|
|
|
# ...6T+9Nl4cnD+gr9OK8I56/tX3l86nWYw//2Q==26;
|
|
|
RE_DATA_IMAGE = re.compile(r'"(dimg_[^"]*)"[^;]*;(data:image[^;]*;[^;]*);')
|
|
|
+RE_DATA_IMAGE_end = re.compile(r'"(dimg_[^"]*)"[^;]*;(data:image[^;]*;[^;]*)$')
|
|
|
|
|
|
|
|
|
-def _parse_data_images(dom):
|
|
|
+def parse_data_images(text: str):
|
|
|
data_image_map = {}
|
|
|
- for img_id, data_image in RE_DATA_IMAGE.findall(dom.text_content()):
|
|
|
+
|
|
|
+ for img_id, data_image in RE_DATA_IMAGE.findall(text):
|
|
|
end_pos = data_image.rfind('=')
|
|
|
if end_pos > 0:
|
|
|
data_image = data_image[: end_pos + 1]
|
|
|
data_image_map[img_id] = data_image
|
|
|
+ last = RE_DATA_IMAGE_end.search(text)
|
|
|
+ if last:
|
|
|
+ data_image_map[last.group(1)] = last.group(2)
|
|
|
logger.debug('data:image objects --> %s', list(data_image_map.keys()))
|
|
|
return data_image_map
|
|
|
|
|
@@ -320,12 +351,12 @@ def response(resp) -> EngineResults:
|
|
|
"""Get response from google's search request"""
|
|
|
# pylint: disable=too-many-branches, too-many-statements
|
|
|
detect_google_sorry(resp)
|
|
|
+ data_image_map = parse_data_images(resp.text)
|
|
|
|
|
|
results = EngineResults()
|
|
|
|
|
|
# convert the text to dom
|
|
|
dom = html.fromstring(resp.text)
|
|
|
- data_image_map = _parse_data_images(dom)
|
|
|
|
|
|
# results --> answer
|
|
|
answer_list = eval_xpath(dom, '//div[contains(@class, "LGOjhe")]')
|