| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 | import jsonfrom searx import loggerfrom searx.poolrequests import getfrom searx.utils import format_date_by_localefrom datetime import datetimefrom dateutil.parser import parse as dateutil_parsefrom urllib import urlencodelogger = logger.getChild('wikidata')result_count = 1wikidata_host = 'https://www.wikidata.org'wikidata_api = wikidata_host + '/w/api.php'url_search = wikidata_api \    + '?action=query&list=search&format=json'\    + '&srnamespace=0&srprop=sectiontitle&{query}'url_detail = wikidata_api\    + '?action=wbgetentities&format=json'\    + '&props=labels%7Cinfo%7Csitelinks'\    + '%7Csitelinks%2Furls%7Cdescriptions%7Cclaims'\    + '&{query}'url_map = 'https://www.openstreetmap.org/'\    + '?lat={latitude}&lon={longitude}&zoom={zoom}&layers=M'def request(query, params):    params['url'] = url_search.format(        query=urlencode({'srsearch': query,                        'srlimit': result_count}))    return paramsdef response(resp):    results = []    search_res = json.loads(resp.text)    wikidata_ids = set()    for r in search_res.get('query', {}).get('search', {}):        wikidata_ids.add(r.get('title', ''))    language = resp.search_params['language'].split('_')[0]    if language == 'all':        language = 'en'    url = url_detail.format(query=urlencode({'ids': '|'.join(wikidata_ids),                                            'languages': language + '|en'}))    htmlresponse = get(url)    jsonresponse = json.loads(htmlresponse.content)    for wikidata_id in wikidata_ids:        results = results + getDetail(jsonresponse, wikidata_id, language, resp.search_params['language'])    return resultsdef getDetail(jsonresponse, wikidata_id, language, locale):    results = []    urls = []    attributes = []    result = jsonresponse.get('entities', {}).get(wikidata_id, {})    title = result.get('labels', {}).get(language, {}).get('value', None)    if title is None:        title = result.get('labels', {}).get('en', {}).get('value', None)    if title is None:        return results    description = result\        .get('descriptions', {})\        .get(language, {})\        .get('value', None)    if description is None:        description = result\            .get('descriptions', {})\            .get('en', {})\            .get('value', '')    claims = result.get('claims', {})    official_website = get_string(claims, 'P856', None)    if official_website is not None:        urls.append({'title': 'Official site', 'url': official_website})        results.append({'title': title, 'url': official_website})    wikipedia_link_count = 0    if language != 'en':        wikipedia_link_count += add_url(urls,                                        'Wikipedia (' + language + ')',                                        get_wikilink(result, language +                                                     'wiki'))    wikipedia_en_link = get_wikilink(result, 'enwiki')    wikipedia_link_count += add_url(urls,                                    'Wikipedia (en)',                                    wikipedia_en_link)    if wikipedia_link_count == 0:        misc_language = get_wiki_firstlanguage(result, 'wiki')        if misc_language is not None:            add_url(urls,                    'Wikipedia (' + misc_language + ')',                    get_wikilink(result, misc_language + 'wiki'))    if language != 'en':        add_url(urls,                'Wiki voyage (' + language + ')',                get_wikilink(result, language + 'wikivoyage'))    add_url(urls,            'Wiki voyage (en)',            get_wikilink(result, 'enwikivoyage'))    if language != 'en':        add_url(urls,                'Wikiquote (' + language + ')',                get_wikilink(result, language + 'wikiquote'))    add_url(urls,            'Wikiquote (en)',            get_wikilink(result, 'enwikiquote'))    add_url(urls,            'Commons wiki',            get_wikilink(result, 'commonswiki'))    add_url(urls,            'Location',            get_geolink(claims, 'P625', None))    add_url(urls,            'Wikidata',            'https://www.wikidata.org/wiki/'            + wikidata_id + '?uselang=' + language)    musicbrainz_work_id = get_string(claims, 'P435')    if musicbrainz_work_id is not None:        add_url(urls,                'MusicBrainz',                'http://musicbrainz.org/work/'                + musicbrainz_work_id)    musicbrainz_artist_id = get_string(claims, 'P434')    if musicbrainz_artist_id is not None:        add_url(urls,                'MusicBrainz',                'http://musicbrainz.org/artist/'                + musicbrainz_artist_id)    musicbrainz_release_group_id = get_string(claims, 'P436')    if musicbrainz_release_group_id is not None:        add_url(urls,                'MusicBrainz',                'http://musicbrainz.org/release-group/'                + musicbrainz_release_group_id)    musicbrainz_label_id = get_string(claims, 'P966')    if musicbrainz_label_id is not None:        add_url(urls,                'MusicBrainz',                'http://musicbrainz.org/label/'                + musicbrainz_label_id)    # musicbrainz_area_id = get_string(claims, 'P982')    # P1407 MusicBrainz series ID    # P1004 MusicBrainz place ID    # P1330 MusicBrainz instrument ID    # P1407 MusicBrainz series ID    postal_code = get_string(claims, 'P281', None)    if postal_code is not None:        attributes.append({'label': 'Postal code(s)', 'value': postal_code})    date_of_birth = get_time(claims, 'P569', locale, None)    if date_of_birth is not None:        attributes.append({'label': 'Date of birth', 'value': date_of_birth})    date_of_death = get_time(claims, 'P570', locale, None)    if date_of_death is not None:        attributes.append({'label': 'Date of death', 'value': date_of_death})    if len(attributes) == 0 and len(urls) == 2 and len(description) == 0:        results.append({                       'url': urls[0]['url'],                       'title': title,                       'content': description                       })    else:        results.append({                       'infobox': title,                       'id': wikipedia_en_link,                       'content': description,                       'attributes': attributes,                       'urls': urls                       })    return resultsdef add_url(urls, title, url):    if url is not None:        urls.append({'title': title, 'url': url})        return 1    else:        return 0def get_mainsnak(claims, propertyName):    propValue = claims.get(propertyName, {})    if len(propValue) == 0:        return None    propValue = propValue[0].get('mainsnak', None)    return propValuedef get_string(claims, propertyName, defaultValue=None):    propValue = claims.get(propertyName, {})    if len(propValue) == 0:        return defaultValue    result = []    for e in propValue:        mainsnak = e.get('mainsnak', {})        datavalue = mainsnak.get('datavalue', {})        if datavalue is not None:            result.append(datavalue.get('value', ''))    if len(result) == 0:        return defaultValue    else:        # TODO handle multiple urls        return result[0]def get_time(claims, propertyName, locale, defaultValue=None):    propValue = claims.get(propertyName, {})    if len(propValue) == 0:        return defaultValue    result = []    for e in propValue:        mainsnak = e.get('mainsnak', {})        datavalue = mainsnak.get('datavalue', {})        if datavalue is not None:            value = datavalue.get('value', '')            result.append(value.get('time', ''))    if len(result) == 0:        date_string = defaultValue    else:        date_string = ', '.join(result)    try:        parsed_date = datetime.strptime(date_string, "+%Y-%m-%dT%H:%M:%SZ")    except:        if date_string.startswith('-'):            return date_string.split('T')[0]        try:            parsed_date = dateutil_parse(date_string, fuzzy=False, default=False)        except:            logger.debug('could not parse date %s', date_string)            return date_string.split('T')[0]    return format_date_by_locale(parsed_date, locale)def get_geolink(claims, propertyName, defaultValue=''):    mainsnak = get_mainsnak(claims, propertyName)    if mainsnak is None:        return defaultValue    datatype = mainsnak.get('datatype', '')    datavalue = mainsnak.get('datavalue', {})    if datatype != 'globe-coordinate':        return defaultValue    value = datavalue.get('value', {})    precision = value.get('precision', 0.0002)    # there is no zoom information, deduce from precision (error prone)    # samples :    # 13 --> 5    # 1 --> 6    # 0.016666666666667 --> 9    # 0.00027777777777778 --> 19    # wolframalpha :    # quadratic fit { {13, 5}, {1, 6}, {0.0166666, 9}, {0.0002777777,19}}    # 14.1186-8.8322 x+0.625447 x^2    if precision < 0.0003:        zoom = 19    else:        zoom = int(15 - precision*8.8322 + precision*precision*0.625447)    url = url_map\        .replace('{latitude}', str(value.get('latitude', 0)))\        .replace('{longitude}', str(value.get('longitude', 0)))\        .replace('{zoom}', str(zoom))    return urldef get_wikilink(result, wikiid):    url = result.get('sitelinks', {}).get(wikiid, {}).get('url', None)    if url is None:        return url    elif url.startswith('http://'):        url = url.replace('http://', 'https://')    elif url.startswith('//'):        url = 'https:' + url    return urldef get_wiki_firstlanguage(result, wikipatternid):    for k in result.get('sitelinks', {}).keys():        if k.endswith(wikipatternid) and len(k) == (2+len(wikipatternid)):            return k[0:2]    return None
 |