Browse Source

[enh][mod] result handling refactor

Several changes has been made:
 - Parallel result merge
 - Scoring algorithm slightly changed (see result_score())
 - Proper Thread locking on global data manipulation
Adam Tauber 9 years ago
parent
commit
b6c3cb0bdd
6 changed files with 321 additions and 292 deletions
  1. 239 0
      searx/results.py
  2. 16 257
      searx/search.py
  3. 41 0
      searx/tests/test_results.py
  4. 4 19
      searx/tests/test_search.py
  5. 7 1
      searx/tests/test_webapp.py
  6. 14 15
      searx/webapp.py

+ 239 - 0
searx/results.py

@@ -0,0 +1,239 @@
+import re
+from collections import defaultdict
+from operator import itemgetter
+from threading import RLock
+from urlparse import urlparse, unquote
+from searx.engines import engines
+
+CONTENT_LEN_IGNORED_CHARS_REGEX = re.compile('[,;:!?\./\\\\ ()-_]', re.M | re.U)
+WHITESPACE_REGEX = re.compile('( |\t|\n)+', re.M | re.U)
+
+
+# return the meaningful length of the content for a result
+def result_content_len(content):
+    if isinstance(content, basestring):
+        return len(CONTENT_LEN_IGNORED_CHARS_REGEX.sub('', content))
+    else:
+        return 0
+
+
+def compare_urls(url_a, url_b):
+    if url_a.netloc != url_b.netloc or url_a.query != url_b.query:
+        return False
+
+    # remove / from the end of the url if required
+    path_a = url_a.path[:-1]\
+        if url_a.path.endswith('/')\
+        else url_a.path
+    path_b = url_b.path[:-1]\
+        if url_b.path.endswith('/')\
+        else url_b.path
+
+    return unquote(path_a) == unquote(path_b)
+
+
+def merge_two_infoboxes(infobox1, infobox2):
+    if 'urls' in infobox2:
+        urls1 = infobox1.get('urls', None)
+        if urls1 is None:
+            urls1 = []
+            infobox1.set('urls', urls1)
+
+        urlSet = set()
+        for url in infobox1.get('urls', []):
+            urlSet.add(url.get('url', None))
+
+        for url in infobox2.get('urls', []):
+            if url.get('url', None) not in urlSet:
+                urls1.append(url)
+
+    if 'attributes' in infobox2:
+        attributes1 = infobox1.get('attributes', None)
+        if attributes1 is None:
+            attributes1 = []
+            infobox1.set('attributes', attributes1)
+
+        attributeSet = set()
+        for attribute in infobox1.get('attributes', []):
+            if attribute.get('label', None) not in attributeSet:
+                attributeSet.add(attribute.get('label', None))
+
+        for attribute in infobox2.get('attributes', []):
+            attributes1.append(attribute)
+
+    if 'content' in infobox2:
+        content1 = infobox1.get('content', None)
+        content2 = infobox2.get('content', '')
+        if content1 is not None:
+            if result_content_len(content2) > result_content_len(content1):
+                infobox1['content'] = content2
+        else:
+            infobox1.set('content', content2)
+
+
+def result_score(result):
+    weight = 1.0
+
+    for result_engine in result['engines']:
+        if hasattr(engines[result_engine], 'weight'):
+            weight *= float(engines[result_engine].weight)
+
+    occurences = len(result['positions'])
+
+    return sum((occurences * weight) / position for position in result['positions'])
+
+
+class ResultContainer(object):
+    """docstring for ResultContainer"""
+    def __init__(self):
+        super(ResultContainer, self).__init__()
+        self.results = defaultdict(list)
+        self._merged_results = []
+        self.infoboxes = []
+        self._infobox_ids = {}
+        self.suggestions = set()
+        self.answers = set()
+
+    def extend(self, engine_name, results):
+        for result in list(results):
+            if 'suggestion' in result:
+                self.suggestions.add(result['suggestion'])
+                results.remove(result)
+            elif 'answer' in result:
+                self.answers.add(result['suggestion'])
+                results.remve(result)
+            elif 'infobox' in result:
+                self._merge_infobox(result)
+                results.remove(result)
+
+        with RLock():
+            engines[engine_name].stats['search_count'] += 1
+            engines[engine_name].stats['result_count'] += len(results)
+
+        if not results:
+            return
+
+        self.results[engine_name].extend(results)
+
+        for i, result in enumerate(results):
+            position = i + 1
+            self._merge_result(result, position)
+
+    def _merge_infobox(self, infobox):
+        add_infobox = True
+        infobox_id = infobox.get('id', None)
+        if infobox_id is not None:
+            existingIndex = self._infobox_ids.get(infobox_id, None)
+            if existingIndex is not None:
+                merge_two_infoboxes(self.infoboxes[existingIndex], infobox)
+                add_infobox = False
+
+        if add_infobox:
+            self.infoboxes.append(infobox)
+            self._infobox_ids[infobox_id] = len(self.infoboxes) - 1
+
+    def _merge_result(self, result, position):
+        result['parsed_url'] = urlparse(result['url'])
+
+        # if the result has no scheme, use http as default
+        if not result['parsed_url'].scheme:
+            result['parsed_url'] = result['parsed_url']._replace(scheme="http")
+
+        result['host'] = result['parsed_url'].netloc
+
+        if result['host'].startswith('www.'):
+            result['host'] = result['host'].replace('www.', '', 1)
+
+        result['engines'] = [result['engine']]
+
+        # strip multiple spaces and cariage returns from content
+        if result.get('content'):
+            result['content'] = WHITESPACE_REGEX.sub(' ', result['content'])
+
+        # check for duplicates
+        duplicated = False
+        for merged_result in self._merged_results:
+            if compare_urls(result['parsed_url'], merged_result['parsed_url'])\
+               and result.get('template') == merged_result.get('template'):
+                duplicated = merged_result
+                break
+
+        # merge duplicates together
+        if duplicated:
+            # using content with more text
+            if result_content_len(result.get('content', '')) >\
+                    result_content_len(duplicated.get('content', '')):
+                duplicated['content'] = result['content']
+
+            # add the new position
+            duplicated['positions'].append(position)
+
+            # add engine to list of result-engines
+            duplicated['engines'].append(result['engine'])
+
+            # using https if possible
+            if duplicated['parsed_url'].scheme != 'https' and result['parsed_url'].scheme == 'https':
+                duplicated['url'] = result['parsed_url'].geturl()
+                duplicated['parsed_url'] = result['parsed_url']
+
+        # if there is no duplicate found, append result
+        else:
+            result['positions'] = [position]
+            with RLock():
+                self._merged_results.append(result)
+
+    def get_ordered_results(self):
+        for result in self._merged_results:
+            score = result_score(result)
+            result['score'] = score
+            with RLock():
+                for result_engine in result['engines']:
+                    engines[result_engine].stats['score_count'] += score
+
+        results = sorted(self._merged_results, key=itemgetter('score'), reverse=True)
+
+        # pass 2 : group results by category and template
+        gresults = []
+        categoryPositions = {}
+
+        for i, res in enumerate(results):
+            # FIXME : handle more than one category per engine
+            category = engines[res['engine']].categories[0] + ':' + ''\
+                if 'template' not in res\
+                else res['template']
+
+            current = None if category not in categoryPositions\
+                else categoryPositions[category]
+
+            # group with previous results using the same category
+            # if the group can accept more result and is not too far
+            # from the current position
+            if current is not None and (current['count'] > 0)\
+                    and (len(gresults) - current['index'] < 20):
+                # group with the previous results using
+                # the same category with this one
+                index = current['index']
+                gresults.insert(index, res)
+
+                # update every index after the current one
+                # (including the current one)
+                for k in categoryPositions:
+                    v = categoryPositions[k]['index']
+                    if v >= index:
+                        categoryPositions[k]['index'] = v + 1
+
+                # update this category
+                current['count'] -= 1
+
+            else:
+                # same category
+                gresults.append(res)
+
+                # update categoryIndex
+                categoryPositions[category] = {'index': len(gresults), 'count': 8}
+
+        # return gresults
+        return gresults
+
+    def results_length(self):
+        return len(self._merged_results)

+ 16 - 257
searx/search.py

@@ -16,13 +16,8 @@ along with searx. If not, see < http://www.gnu.org/licenses/ >.
 '''
 
 import threading
-import re
 import searx.poolrequests as requests_lib
-from itertools import izip_longest, chain
-from operator import itemgetter
-from Queue import Queue
 from time import time
-from urlparse import urlparse, unquote
 from searx import settings
 from searx.engines import (
     categories, engines
@@ -30,6 +25,7 @@ from searx.engines import (
 from searx.languages import language_codes
 from searx.utils import gen_useragent, get_blocked_engines
 from searx.query import Query
+from searx.results import ResultContainer
 from searx import logger
 
 logger = logger.getChild('search')
@@ -42,7 +38,8 @@ def search_request_wrapper(fn, url, engine_name, **kwargs):
         return fn(url, **kwargs)
     except:
         # increase errors stats
-        engines[engine_name].stats['errors'] += 1
+        with threading.RLock():
+            engines[engine_name].stats['errors'] += 1
 
         # print engine name and specific error message
         logger.exception('engine crash: {0}'.format(engine_name))
@@ -84,7 +81,7 @@ def default_request_params():
 
 
 # create a callback wrapper for the search engine results
-def make_callback(engine_name, results_queue, callback, params):
+def make_callback(engine_name, callback, params, result_container):
 
     # creating a callback wrapper for the search engine results
     def process_callback(response, **kwargs):
@@ -96,12 +93,17 @@ def make_callback(engine_name, results_queue, callback, params):
 
         response.search_params = params
 
-        timeout_overhead = 0.2  # seconds
         search_duration = time() - params['started']
+        # update stats with current page-load-time
+        with threading.RLock():
+            engines[engine_name].stats['page_load_time'] += search_duration
+
+        timeout_overhead = 0.2  # seconds
         timeout_limit = engines[engine_name].timeout + timeout_overhead
+
         if search_duration > timeout_limit:
-            engines[engine_name].stats['page_load_time'] += timeout_limit
-            engines[engine_name].stats['errors'] += 1
+            with threading.RLock():
+                engines[engine_name].stats['errors'] += 1
             return
 
         # callback
@@ -111,212 +113,11 @@ def make_callback(engine_name, results_queue, callback, params):
         for result in search_results:
             result['engine'] = engine_name
 
-        results_queue.put_nowait((engine_name, search_results))
-
-        # update stats with current page-load-time
-        engines[engine_name].stats['page_load_time'] += search_duration
+        result_container.extend(engine_name, search_results)
 
     return process_callback
 
 
-# return the meaningful length of the content for a result
-def content_result_len(content):
-    if isinstance(content, basestring):
-        content = re.sub('[,;:!?\./\\\\ ()-_]', '', content)
-        return len(content)
-    else:
-        return 0
-
-
-# score results and remove duplications
-def score_results(results):
-    # calculate scoring parameters
-    flat_res = filter(
-        None, chain.from_iterable(izip_longest(*results.values())))
-    flat_len = len(flat_res)
-    engines_len = len(results)
-
-    results = []
-
-    # pass 1: deduplication + scoring
-    for i, res in enumerate(flat_res):
-
-        res['parsed_url'] = urlparse(res['url'])
-
-        # if the result has no scheme, use http as default
-        if not res['parsed_url'].scheme:
-            res['parsed_url'] = res['parsed_url']._replace(scheme="http")
-
-        res['host'] = res['parsed_url'].netloc
-
-        if res['host'].startswith('www.'):
-            res['host'] = res['host'].replace('www.', '', 1)
-
-        res['engines'] = [res['engine']]
-
-        weight = 1.0
-
-        # strip multiple spaces and cariage returns from content
-        if res.get('content'):
-            res['content'] = re.sub(' +', ' ',
-                                    res['content'].strip().replace('\n', ''))
-
-        # get weight of this engine if possible
-        if hasattr(engines[res['engine']], 'weight'):
-            weight = float(engines[res['engine']].weight)
-
-        # calculate score for that engine
-        score = int((flat_len - i) / engines_len) * weight + 1
-
-        # check for duplicates
-        duplicated = False
-        for new_res in results:
-            # remove / from the end of the url if required
-            p1 = res['parsed_url'].path[:-1]\
-                if res['parsed_url'].path.endswith('/')\
-                else res['parsed_url'].path
-            p2 = new_res['parsed_url'].path[:-1]\
-                if new_res['parsed_url'].path.endswith('/')\
-                else new_res['parsed_url'].path
-
-            # check if that result is a duplicate
-            if res['host'] == new_res['host'] and\
-               unquote(p1) == unquote(p2) and\
-               res['parsed_url'].query == new_res['parsed_url'].query and\
-               res.get('template') == new_res.get('template'):
-                duplicated = new_res
-                break
-
-        # merge duplicates together
-        if duplicated:
-            # using content with more text
-            if content_result_len(res.get('content', '')) >\
-                    content_result_len(duplicated.get('content', '')):
-                duplicated['content'] = res['content']
-
-            # increase result-score
-            duplicated['score'] += score
-
-            # add engine to list of result-engines
-            duplicated['engines'].append(res['engine'])
-
-            # using https if possible
-            if duplicated['parsed_url'].scheme == 'https':
-                continue
-            elif res['parsed_url'].scheme == 'https':
-                duplicated['url'] = res['parsed_url'].geturl()
-                duplicated['parsed_url'] = res['parsed_url']
-
-        # if there is no duplicate found, append result
-        else:
-            res['score'] = score
-
-            results.append(res)
-
-    results = sorted(results, key=itemgetter('score'), reverse=True)
-
-    # pass 2 : group results by category and template
-    gresults = []
-    categoryPositions = {}
-
-    for i, res in enumerate(results):
-        # FIXME : handle more than one category per engine
-        category = engines[res['engine']].categories[0] + ':' + ''\
-            if 'template' not in res\
-            else res['template']
-
-        current = None if category not in categoryPositions\
-            else categoryPositions[category]
-
-        # group with previous results using the same category
-        # if the group can accept more result and is not too far
-        # from the current position
-        if current is not None and (current['count'] > 0)\
-                and (len(gresults) - current['index'] < 20):
-            # group with the previous results using
-            # the same category with this one
-            index = current['index']
-            gresults.insert(index, res)
-
-            # update every index after the current one
-            # (including the current one)
-            for k in categoryPositions:
-                v = categoryPositions[k]['index']
-                if v >= index:
-                    categoryPositions[k]['index'] = v + 1
-
-            # update this category
-            current['count'] -= 1
-
-        else:
-            # same category
-            gresults.append(res)
-
-            # update categoryIndex
-            categoryPositions[category] = {'index': len(gresults), 'count': 8}
-
-    # return gresults
-    return gresults
-
-
-def merge_two_infoboxes(infobox1, infobox2):
-    if 'urls' in infobox2:
-        urls1 = infobox1.get('urls', None)
-        if urls1 is None:
-            urls1 = []
-            infobox1.set('urls', urls1)
-
-        urlSet = set()
-        for url in infobox1.get('urls', []):
-            urlSet.add(url.get('url', None))
-
-        for url in infobox2.get('urls', []):
-            if url.get('url', None) not in urlSet:
-                urls1.append(url)
-
-    if 'attributes' in infobox2:
-        attributes1 = infobox1.get('attributes', None)
-        if attributes1 is None:
-            attributes1 = []
-            infobox1.set('attributes', attributes1)
-
-        attributeSet = set()
-        for attribute in infobox1.get('attributes', []):
-            if attribute.get('label', None) not in attributeSet:
-                attributeSet.add(attribute.get('label', None))
-
-        for attribute in infobox2.get('attributes', []):
-            attributes1.append(attribute)
-
-    if 'content' in infobox2:
-        content1 = infobox1.get('content', None)
-        content2 = infobox2.get('content', '')
-        if content1 is not None:
-            if content_result_len(content2) > content_result_len(content1):
-                infobox1['content'] = content2
-        else:
-            infobox1.set('content', content2)
-
-
-def merge_infoboxes(infoboxes):
-    results = []
-    infoboxes_id = {}
-    for infobox in infoboxes:
-        add_infobox = True
-        infobox_id = infobox.get('id', None)
-        if infobox_id is not None:
-            existingIndex = infoboxes_id.get(infobox_id, None)
-            if existingIndex is not None:
-                merge_two_infoboxes(results[existingIndex], infobox)
-                add_infobox = False
-
-        if add_infobox:
-            results.append(infobox)
-            infoboxes_id[infobox_id] = len(results) - 1
-
-    return results
-
-
 class Search(object):
 
     """Search information container"""
@@ -334,10 +135,7 @@ class Search(object):
         # set blocked engines
         self.blocked_engines = get_blocked_engines(engines, request.cookies)
 
-        self.results = []
-        self.suggestions = set()
-        self.answers = set()
-        self.infoboxes = []
+        self.result_container = ResultContainer()
         self.request_data = {}
 
         # set specific language if set
@@ -449,8 +247,6 @@ class Search(object):
 
         # init vars
         requests = []
-        results_queue = Queue()
-        results = {}
 
         # increase number of searches
         number_of_searches += 1
@@ -504,9 +300,9 @@ class Search(object):
             # create a callback wrapper for the search engine results
             callback = make_callback(
                 selected_engine['name'],
-                results_queue,
                 engine.response,
-                request_params)
+                request_params,
+                self.result_container)
 
             # create dictionary which contain all
             # informations about the request
@@ -539,42 +335,5 @@ class Search(object):
         # send all search-request
         threaded_requests(requests)
 
-        while not results_queue.empty():
-            engine_name, engine_results = results_queue.get_nowait()
-
-            # TODO type checks
-            [self.suggestions.add(x['suggestion'])
-             for x in list(engine_results)
-             if 'suggestion' in x
-             and engine_results.remove(x) is None]
-
-            [self.answers.add(x['answer'])
-             for x in list(engine_results)
-             if 'answer' in x
-             and engine_results.remove(x) is None]
-
-            self.infoboxes.extend(x for x in list(engine_results)
-                                  if 'infobox' in x
-                                  and engine_results.remove(x) is None)
-
-            results[engine_name] = engine_results
-
-        # update engine-specific stats
-        for engine_name, engine_results in results.items():
-            engines[engine_name].stats['search_count'] += 1
-            engines[engine_name].stats['result_count'] += len(engine_results)
-
-        # score results and remove duplications
-        self.results = score_results(results)
-
-        # merge infoboxes according to their ids
-        self.infoboxes = merge_infoboxes(self.infoboxes)
-
-        # update engine stats, using calculated score
-        for result in self.results:
-            for res_engine in result['engines']:
-                engines[result['engine']]\
-                    .stats['score_count'] += result['score']
-
         # return results, suggestions, answers and infoboxes
         return self

+ 41 - 0
searx/tests/test_results.py

@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+
+from searx.results import ResultContainer
+from searx.testing import SearxTestCase
+
+
+def fake_result(url='https://aa.bb/cc?dd=ee#ff',
+                title='aaa',
+                content='bbb',
+                engine='wikipedia', **kwargs):
+    result = {'url': url,
+              'title': title,
+              'content': content,
+              'engine': engine}
+    result.update(kwargs)
+    return result
+
+
+#  TODO
+class ResultContainerTestCase(SearxTestCase):
+
+    def test_empty(self):
+        c = ResultContainer()
+        self.assertEqual(c.get_ordered_results(), [])
+
+    def test_one_result(self):
+        c = ResultContainer()
+        c.extend('wikipedia', [fake_result()])
+        self.assertEqual(c.results_length(), 1)
+
+    def test_one_suggestion(self):
+        c = ResultContainer()
+        c.extend('wikipedia', [fake_result(suggestion=True)])
+        self.assertEqual(len(c.suggestions), 1)
+        self.assertEqual(c.results_length(), 0)
+
+    def test_result_merge(self):
+        c = ResultContainer()
+        c.extend('wikipedia', [fake_result()])
+        c.extend('wikidata', [fake_result(), fake_result(url='https://example.com/')])
+        self.assertEqual(c.results_length(), 2)

+ 4 - 19
searx/tests/test_search.py

@@ -1,25 +1,10 @@
 # -*- coding: utf-8 -*-
 
-from searx.search import score_results
 from searx.testing import SearxTestCase
 
 
-def fake_result(url='https://aa.bb/cc?dd=ee#ff',
-                title='aaa',
-                content='bbb',
-                engine='wikipedia'):
-    return {'url': url,
-            'title': title,
-            'content': content,
-            'engine': engine}
+#  TODO
+class SearchTestCase(SearxTestCase):
 
-
-class ScoreResultsTestCase(SearxTestCase):
-
-    def test_empty(self):
-        self.assertEqual(score_results(dict()), [])
-
-    def test_urlparse(self):
-        results = score_results(dict(a=[fake_result(url='https://aa.bb/cc?dd=ee#ff')]))
-        parsed_url = results[0]['parsed_url']
-        self.assertEqual(parsed_url.query, 'dd=ee')
+    def test_(self):
+        pass

+ 7 - 1
searx/tests/test_webapp.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 
 import json
+from mock import Mock
 from urlparse import ParseResult
 from searx import webapp
 from searx.testing import SearxTestCase
@@ -33,7 +34,12 @@ class ViewsTestCase(SearxTestCase):
         ]
 
         def search_mock(search_self, *args):
-            search_self.results = self.test_results
+            search_self.result_container = Mock(get_ordered_results=lambda: self.test_results,
+                                                answers=set(),
+                                                suggestions=set(),
+                                                infoboxes=[],
+                                                results=self.test_results,
+                                                results_length=lambda: len(self.test_results))
 
         webapp.Search.search = search_mock
 

+ 14 - 15
searx/webapp.py

@@ -383,7 +383,7 @@ def index():
 
     plugins.call('post_search', request, locals())
 
-    for result in search.results:
+    for result in search.result_container.get_ordered_results():
 
         plugins.call('on_result', request, locals())
         if not search.paging and engines[result['engine']].paging:
@@ -411,7 +411,7 @@ def index():
                 minutes = int((timedifference.seconds / 60) % 60)
                 hours = int(timedifference.seconds / 60 / 60)
                 if hours == 0:
-                    result['publishedDate'] = gettext(u'{minutes} minute(s) ago').format(minutes=minutes)  # noqa
+                    result['publishedDate'] = gettext(u'{minutes} minute(s) ago').format(minutes=minutes)
                 else:
                     result['publishedDate'] = gettext(u'{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes)  # noqa
             else:
@@ -419,17 +419,16 @@ def index():
 
     if search.request_data.get('format') == 'json':
         return Response(json.dumps({'query': search.query,
-                                    'results': search.results}),
+                                    'results': search.result_container.get_ordered_results()}),
                         mimetype='application/json')
     elif search.request_data.get('format') == 'csv':
         csv = UnicodeWriter(cStringIO.StringIO())
         keys = ('title', 'url', 'content', 'host', 'engine', 'score')
-        if search.results:
-            csv.writerow(keys)
-            for row in search.results:
-                row['host'] = row['parsed_url'].netloc
-                csv.writerow([row.get(key, '') for key in keys])
-            csv.stream.seek(0)
+        csv.writerow(keys)
+        for row in search.result_container.get_ordered_results():
+            row['host'] = row['parsed_url'].netloc
+            csv.writerow([row.get(key, '') for key in keys])
+        csv.stream.seek(0)
         response = Response(csv.stream.read(), mimetype='application/csv')
         cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search.query)
         response.headers.add('Content-Disposition', cont_disp)
@@ -437,24 +436,24 @@ def index():
     elif search.request_data.get('format') == 'rss':
         response_rss = render(
             'opensearch_response_rss.xml',
-            results=search.results,
+            results=search.result_container.get_ordered_results(),
             q=search.request_data['q'],
-            number_of_results=len(search.results),
+            number_of_results=search.result_container.results_length(),
             base_url=get_base_url()
         )
         return Response(response_rss, mimetype='text/xml')
 
     return render(
         'results.html',
-        results=search.results,
+        results=search.result_container.get_ordered_results(),
         q=search.request_data['q'],
         selected_categories=search.categories,
         paging=search.paging,
         pageno=search.pageno,
         base_url=get_base_url(),
-        suggestions=search.suggestions,
-        answers=search.answers,
-        infoboxes=search.infoboxes,
+        suggestions=search.result_container.suggestions,
+        answers=search.result_container.answers,
+        infoboxes=search.result_container.infoboxes,
         theme=get_current_theme_name(),
         favicons=global_favicons[themes.index(get_current_theme_name())]
     )