| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 | # -*- coding: utf-8 -*-from __future__ import annotationsimport osimport pathlibimport csvimport hashlibimport hmacimport reimport inspectimport itertoolsfrom datetime import datetime, timedeltafrom typing import Iterable, List, Tuple, Dict, TYPE_CHECKINGfrom io import StringIOfrom codecs import getincrementalencoderfrom flask_babel import gettext, format_datefrom searx import logger, settingsfrom searx.engines import DEFAULT_CATEGORYif TYPE_CHECKING:    from searx.enginelib import EngineVALID_LANGUAGE_CODE = re.compile(r'^[a-z]{2,3}(-[a-zA-Z]{2})?$')logger = logger.getChild('webutils')class UnicodeWriter:    """    A CSV writer which will write rows to CSV file "f",    which is encoded in the given encoding.    """    def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):        # Redirect output to a queue        self.queue = StringIO()        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)        self.stream = f        self.encoder = getincrementalencoder(encoding)()    def writerow(self, row):        self.writer.writerow(row)        # Fetch UTF-8 output from the queue ...        data = self.queue.getvalue()        data = data.strip('\x00')        # ... and re-encode it into the target encoding        data = self.encoder.encode(data)        # write to the target stream        self.stream.write(data.decode())        # empty queue        self.queue.truncate(0)    def writerows(self, rows):        for row in rows:            self.writerow(row)def get_themes(templates_path):    """Returns available themes list."""    return os.listdir(templates_path)def get_hash_for_file(file: pathlib.Path) -> str:    m = hashlib.sha1()    with file.open('rb') as f:        m.update(f.read())    return m.hexdigest()def get_static_files(static_path: str) -> Dict[str, str]:    static_files: Dict[str, str] = {}    static_path_path = pathlib.Path(static_path)    def walk(path: pathlib.Path):        for file in path.iterdir():            if file.name.startswith('.'):                # ignore hidden file                continue            if file.is_file():                static_files[str(file.relative_to(static_path_path))] = get_hash_for_file(file)            if file.is_dir() and file.name not in ('node_modules', 'src'):                # ignore "src" and "node_modules" directories                walk(file)    walk(static_path_path)    return static_filesdef get_result_templates(templates_path):    result_templates = set()    templates_path_length = len(templates_path) + 1    for directory, _, files in os.walk(templates_path):        if directory.endswith('result_templates'):            for filename in files:                f = os.path.join(directory[templates_path_length:], filename)                result_templates.add(f)    return result_templatesdef new_hmac(secret_key, url):    return hmac.new(secret_key.encode(), url, hashlib.sha256).hexdigest()def is_hmac_of(secret_key, value, hmac_to_check):    hmac_of_value = new_hmac(secret_key, value)    return len(hmac_of_value) == len(hmac_to_check) and hmac.compare_digest(hmac_of_value, hmac_to_check)def prettify_url(url, max_length=74):    if len(url) > max_length:        chunk_len = int(max_length / 2 + 1)        return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])    else:        return urldef contains_cjko(s: str) -> bool:    """This function check whether or not a string contains Chinese, Japanese,    or Korean characters. It employs regex and uses the u escape sequence to    match any character in a set of Unicode ranges.    Args:        s (str): string to be checked.    Returns:        bool: True if the input s contains the characters and False otherwise.    """    unicode_ranges = (        '\u4e00-\u9fff'  # Chinese characters        '\u3040-\u309f'  # Japanese hiragana        '\u30a0-\u30ff'  # Japanese katakana        '\u4e00-\u9faf'  # Japanese kanji        '\uac00-\ud7af'  # Korean hangul syllables        '\u1100-\u11ff'  # Korean hangul jamo    )    return bool(re.search(fr'[{unicode_ranges}]', s))def regex_highlight_cjk(word: str) -> str:    """Generate the regex pattern to match for a given word according    to whether or not the word contains CJK characters or not.    If the word is and/or contains CJK character, the regex pattern    will match standalone word by taking into account the presence    of whitespace before and after it; if not, it will match any presence    of the word throughout the text, ignoring the whitespace.    Args:        word (str): the word to be matched with regex pattern.    Returns:        str: the regex pattern for the word.    """    rword = re.escape(word)    if contains_cjko(rword):        return fr'({rword})'    else:        return fr'\b({rword})(?!\w)'def highlight_content(content, query):    if not content:        return None    # ignoring html contents    # TODO better html content detection    if content.find('<') != -1:        return content    querysplit = query.split()    queries = []    for qs in querysplit:        qs = qs.replace("'", "").replace('"', '').replace(" ", "")        if len(qs) > 0:            queries.extend(re.findall(regex_highlight_cjk(qs), content, flags=re.I | re.U))    if len(queries) > 0:        for q in set(queries):            content = re.sub(                regex_highlight_cjk(q), f'<span class="highlight">{q}</span>'.replace('\\', r'\\'), content            )    return contentdef searxng_l10n_timespan(dt: datetime) -> str:  # pylint: disable=invalid-name    """Returns a human-readable and translated string indicating how long ago    a date was in the past / the time span of the date to the present.    On January 1st, midnight, the returned string only indicates how many years    ago the date was.    """    # TODO, check if timezone is calculated right  # pylint: disable=fixme    d = dt.date()    t = dt.time()    if d.month == 1 and d.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:        return str(d.year)    if dt.replace(tzinfo=None) >= datetime.now() - timedelta(days=1):        timedifference = datetime.now() - dt.replace(tzinfo=None)        minutes = int((timedifference.seconds / 60) % 60)        hours = int(timedifference.seconds / 60 / 60)        if hours == 0:            return gettext('{minutes} minute(s) ago').format(minutes=minutes)        return gettext('{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes)    return format_date(dt)def is_flask_run_cmdline():    """Check if the application was started using "flask run" command line    Inspect the callstack.    See https://github.com/pallets/flask/blob/master/src/flask/__main__.py    Returns:        bool: True if the application was started using "flask run".    """    frames = inspect.stack()    if len(frames) < 2:        return False    return frames[-2].filename.endswith('flask/cli.py')NO_SUBGROUPING = 'without further subgrouping'def group_engines_in_tab(engines: Iterable[Engine]) -> List[Tuple[str, Iterable[Engine]]]:    """Groups an Iterable of engines by their first non tab category (first subgroup)"""    def get_subgroup(eng):        non_tab_categories = [c for c in eng.categories if c not in tabs + [DEFAULT_CATEGORY]]        return non_tab_categories[0] if len(non_tab_categories) > 0 else NO_SUBGROUPING    def group_sort_key(group):        return (group[0] == NO_SUBGROUPING, group[0].lower())    def engine_sort_key(engine):        return (engine.about.get('language', ''), engine.name)    tabs = list(settings['categories_as_tabs'].keys())    subgroups = itertools.groupby(sorted(engines, key=get_subgroup), get_subgroup)    sorted_groups = sorted(((name, list(engines)) for name, engines in subgroups), key=group_sort_key)    ret_val = []    for groupname, engines in sorted_groups:        group_bang = '!' + groupname.replace(' ', '_') if groupname != NO_SUBGROUPING else ''        ret_val.append((groupname, group_bang, sorted(engines, key=engine_sort_key)))    return ret_val
 |