| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 | # SPDX-License-Identifier: AGPL-3.0-or-laterfrom abc import abstractmethod, ABCimport refrom searx import settingsfrom searx.sxng_locales import sxng_localesfrom searx.engines import categories, engines, engine_shortcutsfrom searx.external_bang import get_bang_definition_and_autocompletefrom searx.search import EngineReffrom searx.webutils import VALID_LANGUAGE_CODEclass QueryPartParser(ABC):    __slots__ = "raw_text_query", "enable_autocomplete"    @staticmethod    @abstractmethod    def check(raw_value):        """Check if raw_value can be parsed"""    def __init__(self, raw_text_query, enable_autocomplete):        self.raw_text_query = raw_text_query        self.enable_autocomplete = enable_autocomplete    @abstractmethod    def __call__(self, raw_value):        """Try to parse raw_value: set the self.raw_text_query properties        return True if raw_value has been parsed        self.raw_text_query.autocomplete_list is also modified        if self.enable_autocomplete is True        """    def _add_autocomplete(self, value):        if value not in self.raw_text_query.autocomplete_list:            self.raw_text_query.autocomplete_list.append(value)class TimeoutParser(QueryPartParser):    @staticmethod    def check(raw_value):        return raw_value[0] == '<'    def __call__(self, raw_value):        value = raw_value[1:]        found = self._parse(value) if len(value) > 0 else False        if self.enable_autocomplete and not value:            self._autocomplete()        return found    def _parse(self, value):        if not value.isdigit():            return False        raw_timeout_limit = int(value)        if raw_timeout_limit < 100:            # below 100, the unit is the second ( <3 = 3 seconds timeout )            self.raw_text_query.timeout_limit = float(raw_timeout_limit)        else:            # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )            self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0        return True    def _autocomplete(self):        for suggestion in ['<3', '<850']:            self._add_autocomplete(suggestion)class LanguageParser(QueryPartParser):    @staticmethod    def check(raw_value):        return raw_value[0] == ':'    def __call__(self, raw_value):        value = raw_value[1:].lower().replace('_', '-')        found = self._parse(value) if len(value) > 0 else False        if self.enable_autocomplete and not found:            self._autocomplete(value)        return found    def _parse(self, value):        found = False        # check if any language-code is equal with        # declared language-codes        for lc in sxng_locales:            lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)            # if correct language-code is found            # set it as new search-language            if (                value == lang_id or value == lang_name or value == english_name or value.replace('-', ' ') == country            ) and value not in self.raw_text_query.languages:                found = True                lang_parts = lang_id.split('-')                if len(lang_parts) == 2:                    self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())                else:                    self.raw_text_query.languages.append(lang_id)                # to ensure best match (first match is not necessarily the best one)                if value == lang_id:                    break        # user may set a valid, yet not selectable language        if VALID_LANGUAGE_CODE.match(value) or value == 'auto':            lang_parts = value.split('-')            if len(lang_parts) > 1:                value = lang_parts[0].lower() + '-' + lang_parts[1].upper()            if value not in self.raw_text_query.languages:                self.raw_text_query.languages.append(value)                found = True        return found    def _autocomplete(self, value):        if not value:            # show some example queries            if len(settings['search']['languages']) < 10:                for lang in settings['search']['languages']:                    self.raw_text_query.autocomplete_list.append(':' + lang)            else:                for lang in [":en", ":en_us", ":english", ":united_kingdom"]:                    self.raw_text_query.autocomplete_list.append(lang)            return        for lc in sxng_locales:            if lc[0] not in settings['search']['languages']:                continue            lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)            # check if query starts with language-id            if lang_id.startswith(value):                if len(value) <= 2:                    self._add_autocomplete(':' + lang_id.split('-')[0])                else:                    self._add_autocomplete(':' + lang_id)            # check if query starts with language name            if lang_name.startswith(value) or english_name.startswith(value):                self._add_autocomplete(':' + lang_name)            # check if query starts with country            # here "new_zealand" is "new-zealand" (see __call__)            if country.startswith(value.replace('-', ' ')):                self._add_autocomplete(':' + country.replace(' ', '_'))class ExternalBangParser(QueryPartParser):    @staticmethod    def check(raw_value):        return raw_value.startswith('!!') and len(raw_value) > 2    def __call__(self, raw_value):        value = raw_value[2:]        found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])        if self.enable_autocomplete:            self._autocomplete(bang_ac_list)        return found    def _parse(self, value):        found = False        bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)        if bang_definition is not None:            self.raw_text_query.external_bang = value            found = True        return found, bang_ac_list    def _autocomplete(self, bang_ac_list):        if not bang_ac_list:            bang_ac_list = ['g', 'ddg', 'bing']        for external_bang in bang_ac_list:            self._add_autocomplete('!!' + external_bang)class BangParser(QueryPartParser):    @staticmethod    def check(raw_value):        # make sure it's not any bang with double '!!'        return raw_value[0] == '!' and (len(raw_value) < 2 or raw_value[1] != '!')    def __call__(self, raw_value):        value = raw_value[1:].replace('-', ' ').replace('_', ' ')        found = self._parse(value) if len(value) > 0 else False        if found and raw_value[0] == '!':            self.raw_text_query.specific = True        if self.enable_autocomplete:            self._autocomplete(raw_value[0], value)        return found    def _parse(self, value):        # check if prefix is equal with engine shortcut        if value in engine_shortcuts:            value = engine_shortcuts[value]        # check if prefix is equal with engine name        if value in engines:            self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))            return True        # check if prefix is equal with category name        if value in categories:            # using all engines for that search, which            # are declared under that category name            self.raw_text_query.enginerefs.extend(                EngineRef(engine.name, value)                for engine in categories[value]                if (engine.name, value) not in self.raw_text_query.disabled_engines            )            return True        return False    def _autocomplete(self, first_char, value):        if not value:            # show some example queries            for suggestion in ['images', 'wikipedia', 'osm']:                if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:                    self._add_autocomplete(first_char + suggestion)            return        # check if query starts with category name        for category in categories:            if category.startswith(value):                self._add_autocomplete(first_char + category.replace(' ', '_'))        # check if query starts with engine name        for engine in engines:            if engine.startswith(value):                self._add_autocomplete(first_char + engine.replace(' ', '_'))        # check if query starts with engine shortcut        for engine_shortcut in engine_shortcuts:            if engine_shortcut.startswith(value):                self._add_autocomplete(first_char + engine_shortcut)class FeelingLuckyParser(QueryPartParser):    @staticmethod    def check(raw_value):        return raw_value == '!!'    def __call__(self, raw_value):        self.raw_text_query.redirect_to_first_result = True        return Trueclass RawTextQuery:    """parse raw text query (the value from the html input)"""    PARSER_CLASSES = [        TimeoutParser,  # force the timeout        LanguageParser,  # force a language        ExternalBangParser,  # external bang (must be before BangParser)        BangParser,  # force an engine or category        FeelingLuckyParser,  # redirect to the first link in the results list    ]    def __init__(self, query, disabled_engines):        assert isinstance(query, str)        # input parameters        self.query = query        self.disabled_engines = disabled_engines if disabled_engines else []        # parsed values        self.enginerefs = []        self.languages = []        self.timeout_limit = None        self.external_bang = None        self.specific = False        self.autocomplete_list = []        # internal properties        self.query_parts = []  # use self.getFullQuery()        self.user_query_parts = []  # use self.getQuery()        self.autocomplete_location = None        self.redirect_to_first_result = False        self._parse_query()    def _parse_query(self):        """        parse self.query, if tags are set, which        change the search engine or search-language        """        # split query, including whitespaces        raw_query_parts = re.split(r'(\s+)', self.query)        last_index_location = None        autocomplete_index = len(raw_query_parts) - 1        for i, query_part in enumerate(raw_query_parts):            # part does only contain spaces, skip            if query_part.isspace() or query_part == '':                continue            # parse special commands            special_part = False            for parser_class in RawTextQuery.PARSER_CLASSES:                if parser_class.check(query_part):                    special_part = parser_class(self, i == autocomplete_index)(query_part)                    break            # append query part to query_part list            qlist = self.query_parts if special_part else self.user_query_parts            qlist.append(query_part)            last_index_location = (qlist, len(qlist) - 1)        self.autocomplete_location = last_index_location    def get_autocomplete_full_query(self, text):        qlist, position = self.autocomplete_location        qlist[position] = text        return self.getFullQuery()    def changeQuery(self, query):        self.user_query_parts = query.strip().split()        self.query = self.getFullQuery()        self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)        self.autocomplete_list = []        return self    def getQuery(self):        return ' '.join(self.user_query_parts)    def getFullQuery(self):        """        get full query including whitespaces        """        return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()    def __str__(self):        return self.getFullQuery()    def __repr__(self):        return (            f"<{self.__class__.__name__} "            + f"query={self.query!r} "            + f"disabled_engines={self.disabled_engines!r}\n  "            + f"languages={self.languages!r} "            + f"timeout_limit={self.timeout_limit!r} "            + f"external_bang={self.external_bang!r} "            + f"specific={self.specific!r} "            + f"enginerefs={self.enginerefs!r}\n  "            + f"autocomplete_list={self.autocomplete_list!r}\n  "            + f"query_parts={self.query_parts!r}\n  "            + f"user_query_parts={self.user_query_parts!r} >\n"            + f"redirect_to_first_result={self.redirect_to_first_result!r}"        )
 |