Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6c075ffe authored by Nivesh Krishna's avatar Nivesh Krishna
Browse files

Merge branch '4434-safesearch-ddg' into 'master'

DDG safesearch

Closes e/backlog#4434

See merge request e/cloud/my-spot!95
parents 1866c069 4faebacd
Loading
Loading
Loading
Loading
+94 −109
Original line number Diff line number Diff line
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""DuckDuckGo Lite
"""
 DuckDuckGo (Web)
"""

from json import loads

from lxml.html import fromstring

from searx.utils import (
    dict_subset,
    eval_xpath,
    eval_xpath_getindex,
    extract_text,
    match_language,
)
from urllib.parse import urlencode
from searx.utils import match_language, HTMLTextExtractor
from searx import logger
import re
from searx.network import get

logger = logger.getChild('ddg engine')
# about
about = {
    "website": 'https://lite.duckduckgo.com/lite',
    "website": 'https://duckduckgo.com/',
    "wikidata_id": 'Q12805',
    "official_api_documentation": 'https://duckduckgo.com/api',
    "use_official_api": False,
@@ -29,9 +24,11 @@ about = {
# engine dependent config
categories = ['general']
paging = True
supported_languages_url = 'https://duckduckgo.com/util/u588.js'
supported_languages_url = 'https://duckduckgo.com/util/u172.js'
number_of_results = 10
time_range_support = True

safesearch = True
VQD_REGEX = r"vqd='(\d+-\d+-\d+)'"
language_aliases = {
    'ar-SA': 'ar-XA',
    'es-419': 'es-XL',
@@ -42,16 +39,13 @@ language_aliases = {
    'zh-HK': 'tzh-HK'
}

time_range_dict = {
    'day': 'd',
# search-url
url = 'https://links.duckduckgo.com/d.js?'
url_ping = 'https://duckduckgo.com/t/sl_h'
time_range_dict = {'day': 'd',
                   'week': 'w',
                   'month': 'm',
    'year': 'y'
}

# search-url
url = 'https://lite.duckduckgo.com/lite'
url_ping = 'https://duckduckgo.com/t/sl_l'
                   'year': 'y'}


# match query's language to a region code that duckduckgo will accept
@@ -66,109 +60,100 @@ def get_region_code(lang, lang_list=None):
    return lang_parts[1].lower() + '-' + lang_parts[0].lower()


def get_vqd(query, headers):
    resp = get(f"https://duckduckgo.com/?q={query}&ia=web", headers=headers)
    resp = re.findall(VQD_REGEX, resp.text)
    return resp[0]


def request(query, params):

    params['url'] = url
    params['method'] = 'POST'

    params['data']['q'] = query

    # The API is not documented, so we do some reverse engineering and emulate
    # what https://lite.duckduckgo.com/lite/ does when you press "next Page"
    # link again and again ..

    params['headers']['Content-Type'] = 'application/x-www-form-urlencoded'

    # initial page does not have an offset
    if params['pageno'] == 2:
        # second page does have an offset of 30
        offset = (params['pageno'] - 1) * 30
        params['data']['s'] = offset
        params['data']['dc'] = offset + 1

    elif params['pageno'] > 2:
        # third and following pages do have an offset of 30 + n*50
        offset = 30 + (params['pageno'] - 2) * 50
        params['data']['s'] = offset
        params['data']['dc'] = offset + 1

    # initial page does not have additional data in the input form
    if params['pageno'] > 1:
        # request the second page (and more pages) needs 'o' and 'api' arguments
        params['data']['o'] = 'json'
        params['data']['api'] = 'd.js'

    # initial page does not have additional data in the input form
    if params['pageno'] > 2:
        # request the third page (and more pages) some more arguments
        params['data']['nextParams'] = ''
        params['data']['v'] = ''
        params['data']['vqd'] = ''

    region_code = get_region_code(params['language'], supported_languages)
    if region_code:
        params['data']['kl'] = region_code
        params['cookies']['kl'] = region_code

    params['data']['df'] = ''
    params['method'] = 'GET'

    vqd = get_vqd(query, params["headers"])
    dl, ct = match_language(params["language"], supported_languages, language_aliases, 'wt-WT').split("-")
    query_dict = {
        "q": query,
        't': 'D',
        'l': params["language"],
        'kl': f"{ct}-{dl}",
        's': (params['pageno'] - 1) * number_of_results,
        'dl': dl,
        'ct': ct,
        'ss_mkt': get_region_code(params["language"], supported_languages),
        'df': params['time_range'],
        'vqd': vqd,
        'ex': -2,
        'sp': '1',
        'bpa': '1',
        'biaexp': 'b',
        'msvrtexp': 'b'
    }
    if params['safesearch'] == 2:  # STRICT
        del query_dict['t']
        query_dict['p'] = 1
        query_dict.update({
            'videxp': 'a',
            'nadse': 'b',
            'eclsexp': 'a',
            'stiaexp': 'a',
            'tjsexp': 'b',
            'related': 'b',
            'msnexp': 'a'
        })
    elif params['safesearch'] == 1:  # MODERATE
        query_dict['ex'] = -1
        query_dict.update({
            'nadse': 'b',
            'eclsexp': 'b',
            'tjsexp': 'b'
        })
    else:  # OFF
        query_dict['ex'] = -2
        query_dict.update({
            'nadse': 'b',
            'eclsexp': 'b',
            'tjsexp': 'b'
        })

    params['allow_redirects'] = False
    params["data"] = query_dict
    params['cookies']['kl'] = params["data"]["kl"]
    if params['time_range'] in time_range_dict:
        params['data']['df'] = time_range_dict[params['time_range']]
        params['cookies']['df'] = time_range_dict[params['time_range']]

    params["url"] = url + urlencode(params["data"])
    return params


# get response from search-request
def response(resp):

    headers_ping = dict_subset(resp.request.headers, ['User-Agent', 'Accept-Encoding', 'Accept', 'Cookie'])
    get(url_ping, headers=headers_ping)

    if resp.status_code == 303:
        return []

    # parse the response
    results = []
    doc = fromstring(resp.text)

    result_table = eval_xpath(doc, '//html/body/form/div[@class="filters"]/table')
    if not len(result_table) >= 3:
        # no more results
        return []
    result_table = result_table[2]

    tr_rows = eval_xpath(result_table, './/tr')

    # In the last <tr> is the form of the 'previous/next page' links
    tr_rows = tr_rows[:-1]

    len_tr_rows = len(tr_rows)
    offset = 0

    while len_tr_rows >= offset + 4:
    data = re.findall(r"DDG\.pageLayout\.load\('d',(\[.+\])\);DDG\.duckbar\.load\('images'", str(resp.text))
    try:
        search_data = loads(data[0].replace('/\t/g', '    '))
    except IndexError:
        return

        # assemble table rows we need to scrap
        tr_title = tr_rows[offset]
        tr_content = tr_rows[offset + 1]
        offset += 4
    if len(search_data) == 1 and ('n' not in search_data[0]):
        only_result = search_data[0]
        if ((only_result.get("da") is not None and only_result.get("t") == 'EOF') or
                only_result.get('a') is not None or only_result.get('d') == 'google.com search'):
            return

        # ignore sponsored Adds <tr class="result-sponsored">
        if tr_content.get('class') == 'result-sponsored':
    for search_result in search_data:
        if 'n' in search_result:
            continue

        a_tag = eval_xpath_getindex(tr_title, './/td//a[@class="result-link"]', 0, None)
        if a_tag is None:
            continue

        td_content = eval_xpath_getindex(tr_content, './/td[@class="result-snippet"]', 0, None)
        if td_content is None:
            continue

        results.append({
            'title': a_tag.text_content(),
            'content': extract_text(td_content),
            'url': a_tag.get('href'),
        })

        html2text = HTMLTextExtractor()
        html2text.feed(search_result.get('a'))
        results.append({'title': search_result.get("t"),
                        'content': html2text.get_text(),
                        'url': search_result.get('u')})
    return results


+186 −0
Original line number Diff line number Diff line
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""DuckDuckGo Lite
"""

from json import loads

from lxml.html import fromstring

from searx.utils import (
    dict_subset,
    eval_xpath,
    eval_xpath_getindex,
    extract_text,
    match_language,
)
from searx.network import get

# about
about = {
    "website": 'https://lite.duckduckgo.com/lite',
    "wikidata_id": 'Q12805',
    "official_api_documentation": 'https://duckduckgo.com/api',
    "use_official_api": False,
    "require_api_key": False,
    "results": 'HTML',
}

# engine dependent config
categories = ['general']
paging = True
supported_languages_url = 'https://duckduckgo.com/util/u588.js'
time_range_support = True

language_aliases = {
    'ar-SA': 'ar-XA',
    'es-419': 'es-XL',
    'ja': 'jp-JP',
    'ko': 'kr-KR',
    'sl-SI': 'sl-SL',
    'zh-TW': 'tzh-TW',
    'zh-HK': 'tzh-HK'
}

time_range_dict = {
    'day': 'd',
    'week': 'w',
    'month': 'm',
    'year': 'y'
}

# search-url
url = 'https://lite.duckduckgo.com/lite'
url_ping = 'https://duckduckgo.com/t/sl_l'


# match query's language to a region code that duckduckgo will accept
def get_region_code(lang, lang_list=None):
    if lang == 'all':
        return None

    lang_code = match_language(lang, lang_list or [], language_aliases, 'wt-WT')
    lang_parts = lang_code.split('-')

    # country code goes first
    return lang_parts[1].lower() + '-' + lang_parts[0].lower()


def request(query, params):

    params['url'] = url
    params['method'] = 'POST'

    params['data']['q'] = query

    # The API is not documented, so we do some reverse engineering and emulate
    # what https://lite.duckduckgo.com/lite/ does when you press "next Page"
    # link again and again ..

    params['headers']['Content-Type'] = 'application/x-www-form-urlencoded'

    # initial page does not have an offset
    if params['pageno'] == 2:
        # second page does have an offset of 30
        offset = (params['pageno'] - 1) * 30
        params['data']['s'] = offset
        params['data']['dc'] = offset + 1

    elif params['pageno'] > 2:
        # third and following pages do have an offset of 30 + n*50
        offset = 30 + (params['pageno'] - 2) * 50
        params['data']['s'] = offset
        params['data']['dc'] = offset + 1

    # initial page does not have additional data in the input form
    if params['pageno'] > 1:
        # request the second page (and more pages) needs 'o' and 'api' arguments
        params['data']['o'] = 'json'
        params['data']['api'] = 'd.js'

    # initial page does not have additional data in the input form
    if params['pageno'] > 2:
        # request the third page (and more pages) some more arguments
        params['data']['nextParams'] = ''
        params['data']['v'] = ''
        params['data']['vqd'] = ''

    region_code = get_region_code(params['language'], supported_languages)
    if region_code:
        params['data']['kl'] = region_code
        params['cookies']['kl'] = region_code

    params['data']['df'] = ''
    if params['time_range'] in time_range_dict:
        params['data']['df'] = time_range_dict[params['time_range']]
        params['cookies']['df'] = time_range_dict[params['time_range']]

    return params


# get response from search-request
def response(resp):

    headers_ping = dict_subset(resp.request.headers, ['User-Agent', 'Accept-Encoding', 'Accept', 'Cookie'])
    get(url_ping, headers=headers_ping)

    if resp.status_code == 303:
        return []

    results = []
    doc = fromstring(resp.text)

    result_table = eval_xpath(doc, '//html/body/form/div[@class="filters"]/table')
    if not len(result_table) >= 3:
        # no more results
        return []
    result_table = result_table[2]

    tr_rows = eval_xpath(result_table, './/tr')

    # In the last <tr> is the form of the 'previous/next page' links
    tr_rows = tr_rows[:-1]

    len_tr_rows = len(tr_rows)
    offset = 0

    while len_tr_rows >= offset + 4:

        # assemble table rows we need to scrap
        tr_title = tr_rows[offset]
        tr_content = tr_rows[offset + 1]
        offset += 4

        # ignore sponsored Adds <tr class="result-sponsored">
        if tr_content.get('class') == 'result-sponsored':
            continue

        a_tag = eval_xpath_getindex(tr_title, './/td//a[@class="result-link"]', 0, None)
        if a_tag is None:
            continue

        td_content = eval_xpath_getindex(tr_content, './/td[@class="result-snippet"]', 0, None)
        if td_content is None:
            continue

        results.append({
            'title': a_tag.text_content(),
            'content': extract_text(td_content),
            'url': a_tag.get('href'),
        })

    return results


# get supported languages from their site
def _fetch_supported_languages(resp):

    # response is a js file with regions as an embedded object
    response_page = resp.text
    response_page = response_page[response_page.find('regions:{') + 8:]
    response_page = response_page[:response_page.find('}') + 1]

    regions_json = loads(response_page)
    supported_languages = map((lambda x: x[3:] + '-' + x[:2].upper()), regions_json.keys())

    return list(supported_languages)
+5 −0
Original line number Diff line number Diff line
@@ -415,6 +415,11 @@ engines:
    engine : duckduckgo
    shortcut : ddg

  - name : duckduckgo (lite)
    engine : duckduckgo_lite
    shortcut : ddgl
    disabled : True

  - name : duckduckgo images
    engine : duckduckgo_images
    shortcut : ddi