| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 | 
"""This is the implementation of the Google WEB engine.  Some of thisimplementations (manly the :py:obj:`get_google_info`) are shared by otherengines:- :ref:`google images engine`- :ref:`google news engine`- :ref:`google videos engine`- :ref:`google scholar engine`- :ref:`google autocomplete`"""from typing import TYPE_CHECKINGimport refrom urllib.parse import urlencodefrom lxml import htmlimport babelimport babel.coreimport babel.languagesfrom searx.utils import extract_text, eval_xpath, eval_xpath_list, eval_xpath_getindexfrom searx.locales import language_tag, region_tag, get_official_localesfrom searx.network import get  from searx.exceptions import SearxEngineCaptchaExceptionfrom searx.enginelib.traits import EngineTraitsif TYPE_CHECKING:    import logging    logger: logging.Loggertraits: EngineTraitsabout = {    "website": 'https://www.google.com',    "wikidata_id": 'Q9366',    "official_api_documentation": 'https://developers.google.com/custom-search/',    "use_official_api": False,    "require_api_key": False,    "results": 'HTML',}categories = ['general', 'web']paging = Truemax_page = 50time_range_support = Truesafesearch = Truetime_range_dict = {'day': 'd', 'week': 'w', 'month': 'm', 'year': 'y'}filter_mapping = {0: 'off', 1: 'medium', 2: 'high'}results_xpath = './/div[contains(@jscontroller, "SC7lYd")]'title_xpath = './/a/h3[1]'href_xpath = './/a[h3]/@href'content_xpath = './/div[@data-sncf="1"]'suggestion_xpath = '//div[contains(@class, "EIaa9b")]//a'UI_ASYNC = 'use_ac:true,_fmt:prog'"""Format of the response from UI's async request."""def get_google_info(params, eng_traits):    """Composing various (language) properties for the google engines (:ref:`google    API`).    This function is called by the various google engines (:ref:`google web    engine`, :ref:`google images engine`, :ref:`google news engine` and    :ref:`google videos engine`).    :param dict param: Request parameters of the engine.  At least        a ``searxng_locale`` key should be in the dictionary.    :param eng_traits: Engine's traits fetched from google preferences        (:py:obj:`searx.enginelib.traits.EngineTraits`)    :rtype: dict    :returns:        Py-Dictionary with the key/value pairs:        language:            The language code that is used by google (e.g. ``lang_en`` or            ``lang_zh-TW``)        country:            The country code that is used by google (e.g. ``US`` or ``TW``)        locale:            A instance of :py:obj:`babel.core.Locale` build from the            ``searxng_locale`` value.        subdomain:            Google subdomain :py:obj:`google_domains` that fits to the country            code.        params:            Py-Dictionary with additional request arguments (can be passed to            :py:func:`urllib.parse.urlencode`).            - ``hl`` parameter: specifies the interface language of user interface.            - ``lr`` parameter: restricts search results to documents written in              a particular language.            - ``cr`` parameter: restricts search results to documents              originating in a particular country.            - ``ie`` parameter: sets the character encoding scheme that should              be used to interpret the query string ('utf8').            - ``oe`` parameter: sets the character encoding scheme that should              be used to decode the XML result ('utf8').        headers:            Py-Dictionary with additional HTTP headers (can be passed to            request's headers)            - ``Accept: '*/*``    """    ret_val = {        'language': None,        'country': None,        'subdomain': None,        'params': {},        'headers': {},        'cookies': {},        'locale': None,    }    sxng_locale = params.get('searxng_locale', 'all')    try:        locale = babel.Locale.parse(sxng_locale, sep='-')    except babel.core.UnknownLocaleError:        locale = None    eng_lang = eng_traits.get_language(sxng_locale, 'lang_en')    lang_code = eng_lang.split('_')[-1]      country = eng_traits.get_region(sxng_locale, eng_traits.all_locale)                            ret_val['language'] = eng_lang    ret_val['country'] = country    ret_val['locale'] = locale    ret_val['subdomain'] = eng_traits.custom['supported_domains'].get(country.upper(), 'www.google.com')                                        ret_val['params']['hl'] = f'{lang_code}-{country}'                                                        ret_val['params']['lr'] = eng_lang    if sxng_locale == 'all':        ret_val['params']['lr'] = ''                            ret_val['params']['cr'] = ''    if len(sxng_locale.split('-')) > 1:        ret_val['params']['cr'] = 'country' + country                                                                    ret_val['params']['ie'] = 'utf8'                    ret_val['params']['oe'] = 'utf8'                                    ret_val['headers']['Accept'] = '*/*'                ret_val['cookies']['CONSENT'] = "YES+"    return ret_valdef detect_google_sorry(resp):    if resp.url.host == 'sorry.google.com' or resp.url.path.startswith('/sorry'):        raise SearxEngineCaptchaException()def request(query, params):    """Google search request"""        offset = (params['pageno'] - 1) * 10    google_info = get_google_info(params, traits)        query_url = (        'https://'        + google_info['subdomain']        + '/search'        + "?"        + urlencode(            {                'q': query,                **google_info['params'],                'filter': '0',                'start': offset,                                                                                                                                                                                'asearch': 'arc',                'async': UI_ASYNC,            }        )    )    if params['time_range'] in time_range_dict:        query_url += '&' + urlencode({'tbs': 'qdr:' + time_range_dict[params['time_range']]})    if params['safesearch']:        query_url += '&' + urlencode({'safe': filter_mapping[params['safesearch']]})    params['url'] = query_url    params['cookies'] = google_info['cookies']    params['headers'].update(google_info['headers'])    return paramsRE_DATA_IMAGE = re.compile(r'"(dimg_[^"]*)"[^;]*;(data:image[^;]*;[^;]*);')def _parse_data_images(dom):    data_image_map = {}    for img_id, data_image in RE_DATA_IMAGE.findall(dom.text_content()):        end_pos = data_image.rfind('=')        if end_pos > 0:            data_image = data_image[: end_pos + 1]        data_image_map[img_id] = data_image    logger.debug('data:image objects --> %s', list(data_image_map.keys()))    return data_image_mapdef response(resp):    """Get response from google's search request"""        detect_google_sorry(resp)    results = []        dom = html.fromstring(resp.text)    data_image_map = _parse_data_images(dom)        answer_list = eval_xpath(dom, '//div[contains(@class, "LGOjhe")]')    for item in answer_list:        for bubble in eval_xpath(item, './/div[@class="nnFGuf"]'):            bubble.drop_tree()        results.append(            {                'answer': extract_text(item),                'url': (eval_xpath(item, '../..//a/@href') + [None])[0],            }        )        for result in eval_xpath_list(dom, results_xpath):          try:            title_tag = eval_xpath_getindex(result, title_xpath, 0, default=None)            if title_tag is None:                                logger.debug('ignoring item from the result_xpath list: missing title')                continue            title = extract_text(title_tag)            url = eval_xpath_getindex(result, href_xpath, 0, None)            if url is None:                logger.debug('ignoring item from the result_xpath list: missing url of title "%s"', title)                continue            content_nodes = eval_xpath(result, content_xpath)            content = extract_text(content_nodes)            if not content:                logger.debug('ignoring item from the result_xpath list: missing content of title "%s"', title)                continue            thumbnail = content_nodes[0].xpath('.//img/@src')            if thumbnail:                thumbnail = thumbnail[0]                if thumbnail.startswith('data:image'):                    img_id = content_nodes[0].xpath('.//img/@id')                    if img_id:                        thumbnail = data_image_map.get(img_id[0])            else:                thumbnail = None            results.append({'url': url, 'title': title, 'content': content, 'thumbnail': thumbnail})        except Exception as e:              logger.error(e, exc_info=True)            continue        for suggestion in eval_xpath_list(dom, suggestion_xpath):                results.append({'suggestion': extract_text(suggestion)})        return resultsskip_countries = [        'AL',      'AZ',      'BD',      'BN',      'BT',      'ET',      'GE',      'GL',      'KH',      'LA',      'LK',      'ME',      'MK',      'MM',      'MN',      'MV',      'MY',      'NP',      'TJ',      'TM',      'UZ',  ]def fetch_traits(engine_traits: EngineTraits, add_domains: bool = True):    """Fetch languages from Google."""        engine_traits.custom['supported_domains'] = {}    resp = get('https://www.google.com/preferences')    if not resp.ok:          raise RuntimeError("Response from Google's preferences is not OK.")    dom = html.fromstring(resp.text.replace('<?xml version="1.0" encoding="UTF-8"?>', ''))        lang_map = {'no': 'nb'}    for x in eval_xpath_list(dom, "//select[@name='hl']/option"):        eng_lang = x.get("value")        try:            locale = babel.Locale.parse(lang_map.get(eng_lang, eng_lang), sep='-')        except babel.UnknownLocaleError:            print("INFO:  google UI language %s (%s) is unknown by babel" % (eng_lang, x.text.split("(")[0].strip()))            continue        sxng_lang = language_tag(locale)        conflict = engine_traits.languages.get(sxng_lang)        if conflict:            if conflict != eng_lang:                print("CONFLICT: babel %s --> %s, %s" % (sxng_lang, conflict, eng_lang))            continue        engine_traits.languages[sxng_lang] = 'lang_' + eng_lang        engine_traits.languages['zh'] = 'lang_zh-CN'        for x in eval_xpath_list(dom, "//select[@name='gl']/option"):        eng_country = x.get("value")        if eng_country in skip_countries:            continue        if eng_country == 'ZZ':            engine_traits.all_locale = 'ZZ'            continue        sxng_locales = get_official_locales(eng_country, engine_traits.languages.keys(), regional=True)        if not sxng_locales:            print("ERROR: can't map from google country %s (%s) to a babel region." % (x.get('data-name'), eng_country))            continue        for sxng_locale in sxng_locales:            engine_traits.regions[region_tag(sxng_locale)] = eng_country        engine_traits.regions['zh-CN'] = 'HK'        if add_domains:        resp = get('https://www.google.com/supported_domains')        if not resp.ok:              raise RuntimeError("Response from https://www.google.com/supported_domains is not OK.")        for domain in resp.text.split():              domain = domain.strip()            if not domain or domain in [                '.google.com',            ]:                continue            region = domain.split('.')[-1].upper()            engine_traits.custom['supported_domains'][region] = 'www' + domain              if region == 'HK':                                engine_traits.custom['supported_domains']['CN'] = 'www' + domain  
 |