Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8ce60438 authored by Adam Tauber's avatar Adam Tauber
Browse files

Merge pull request #346 from Cqoicebordel/youtube

Adds engines : Youtube with or without API and multiple Qwant
parents 9d10277c e0774c84
Loading
Loading
Loading
Loading

searx/engines/qwant.py

0 → 100644
+98 −0
Original line number Diff line number Diff line
"""
 Qwant (Web, Images, News, Social)

 @website     https://qwant.com/
 @provide-api not officially (https://api.qwant.com/api/search/)

 @using-api   yes
 @results     JSON
 @stable      yes
 @parse       url, title, content
"""

from urllib import urlencode
from json import loads
from datetime import datetime

# engine dependent config
categories = None
paging = True
language_support = True

category_to_keyword = {'general': 'web',
                       'images': 'images',
                       'news': 'news',
                       'social media': 'social'}

# search-url
url = 'https://api.qwant.com/api/search/{keyword}?count=10&offset={offset}&f=&{query}'


# do search-request
def request(query, params):
    offset = (params['pageno'] - 1) * 10

    if categories[0] and categories[0] in category_to_keyword:

        params['url'] = url.format(keyword=category_to_keyword[categories[0]],
                                   query=urlencode({'q': query}),
                                   offset=offset)
    else:
        params['url'] = url.format(keyword='web',
                                   query=urlencode({'q': query}),
                                   offset=offset)

    # add language tag if specified
    if params['language'] != 'all':
        params['url'] += '&locale=' + params['language'].lower()

    return params


# get response from search-request
def response(resp):
    results = []

    search_results = loads(resp.text)

    # return empty array if there are no results
    if 'data' not in search_results:
        return []

    data = search_results.get('data', {})

    res = data.get('result', {})

    # parse results
    for result in res.get('items', {}):

        title = result['title']
        res_url = result['url']
        content = result['desc']

        if category_to_keyword.get(categories[0], '') == 'web':
            results.append({'title': title,
                            'content': content,
                            'url': res_url})

        elif category_to_keyword.get(categories[0], '') == 'images':
            thumbnail_src = result['thumbnail']
            img_src = result['media']
            results.append({'template': 'images.html',
                            'url': res_url,
                            'title': title,
                            'content': '',
                            'thumbnail_src': thumbnail_src,
                            'img_src': img_src})

        elif (category_to_keyword.get(categories[0], '') == 'news' or
              category_to_keyword.get(categories[0], '') == 'social'):
            published_date = datetime.fromtimestamp(result['date'], None)

            results.append({'url': res_url,
                            'title': title,
                            'publishedDate': published_date,
                            'content': content})

    # return results
    return results
+83 −0
Original line number Diff line number Diff line
# Youtube (Videos)
#
# @website     https://www.youtube.com/
# @provide-api yes (https://developers.google.com/apis-explorer/#p/youtube/v3/youtube.search.list)
#
# @using-api   yes
# @results     JSON
# @stable      yes
# @parse       url, title, content, publishedDate, thumbnail, embedded

from json import loads
from urllib import urlencode
from dateutil import parser

# engine dependent config
categories = ['videos', 'music']
paging = False
language_support = True
api_key = None

# search-url
base_url = 'https://www.googleapis.com/youtube/v3/search'
search_url = base_url + '?part=snippet&{query}&maxResults=20&key={api_key}'

embedded_url = '<iframe width="540" height="304" ' +\
    'data-src="//www.youtube-nocookie.com/embed/{videoid}" ' +\
    'frameborder="0" allowfullscreen></iframe>'

base_youtube_url = 'https://www.youtube.com/watch?v='


# do search-request
def request(query, params):
    params['url'] = search_url.format(query=urlencode({'q': query}),
                                      api_key=api_key)

    # add language tag if specified
    if params['language'] != 'all':
        params['url'] += '&relevanceLanguage=' + params['language'].split('_')[0]

    return params


# get response from search-request
def response(resp):
    results = []

    search_results = loads(resp.text)

    # return empty array if there are no results
    if 'items' not in search_results:
        return []

    # parse results
    for result in search_results['items']:
        videoid = result['id']['videoId']

        title = result['snippet']['title']
        content = ''
        thumbnail = ''

        pubdate = result['snippet']['publishedAt']
        publishedDate = parser.parse(pubdate)

        thumbnail = result['snippet']['thumbnails']['high']['url']

        content = result['snippet']['description']

        url = base_youtube_url + videoid

        embedded = embedded_url.format(videoid=videoid)

        # append result
        results.append({'url': url,
                        'title': title,
                        'content': content,
                        'template': 'videos.html',
                        'publishedDate': publishedDate,
                        'embedded': embedded,
                        'thumbnail': thumbnail})

    # return results
    return results
+72 −0
Original line number Diff line number Diff line
# Youtube (Videos)
#
# @website     https://www.youtube.com/
# @provide-api yes (https://developers.google.com/apis-explorer/#p/youtube/v3/youtube.search.list)
#
# @using-api   no
# @results     HTML
# @stable      no
# @parse       url, title, content, publishedDate, thumbnail, embedded

from urllib import quote_plus
from lxml import html
from searx.engines.xpath import extract_text

# engine dependent config
categories = ['videos', 'music']
paging = True
language_support = False

# search-url
base_url = 'https://www.youtube.com/results'
search_url = base_url + '?search_query={query}&page={page}'

embedded_url = '<iframe width="540" height="304" ' +\
    'data-src="//www.youtube-nocookie.com/embed/{videoid}" ' +\
    'frameborder="0" allowfullscreen></iframe>'

base_youtube_url = 'https://www.youtube.com/watch?v='

# specific xpath variables
results_xpath = "//ol/li/div[contains(@class, 'yt-lockup yt-lockup-tile yt-lockup-video vve-check')]"
url_xpath = './/h3/a/@href'
title_xpath = './/div[@class="yt-lockup-content"]/h3/a'
content_xpath = './/div[@class="yt-lockup-content"]/div[@class="yt-lockup-description yt-ui-ellipsis yt-ui-ellipsis-2"]'


# do search-request
def request(query, params):
    params['url'] = search_url.format(query=quote_plus(query),
                                      page=params['pageno'])

    return params


# get response from search-request
def response(resp):
    results = []

    dom = html.fromstring(resp.text)

    # parse results
    for result in dom.xpath(results_xpath):
        videoid = result.xpath('@data-context-item-id')[0]

        url = base_youtube_url + videoid
        thumbnail = 'https://i.ytimg.com/vi/' + videoid + '/hqdefault.jpg'

        title = extract_text(result.xpath(title_xpath)[0])
        content = extract_text(result.xpath(content_xpath)[0])

        embedded = embedded_url.format(videoid=videoid)

        # append result
        results.append({'url': url,
                        'title': title,
                        'content': content,
                        'template': 'videos.html',
                        'embedded': embedded,
                        'thumbnail': thumbnail})

    # return results
    return results
+26 −1
Original line number Diff line number Diff line
@@ -168,6 +168,26 @@ engines:
    engine : piratebay
    shortcut : tpb

  - name : qwant
    engine : qwant
    shortcut : qw
    categories : general

  - name : qwant images
    engine : qwant
    shortcut : qwi
    categories : images

  - name : qwant news
    engine : qwant
    shortcut : qwn
    categories : news

  - name : qwant social
    engine : qwant
    shortcut : qws
    categories : social media

  - name : kickass
    engine : kickass
    shortcut : ka
@@ -246,8 +266,13 @@ engines:
    shortcut : yhn

  - name : youtube
    engine : youtube
    shortcut : yt
    # You can use the engine using the official stable API, but you need an API key
    # See : https://console.developers.google.com/project
    #    engine : youtube_api
    #    api_key: 'apikey' # required!
    # Or you can use the html non-stable engine, activated by default
    engine : youtube_noapi

  - name : dailymotion
    engine : dailymotion
+317 −0
Original line number Diff line number Diff line
from collections import defaultdict
import mock
from searx.engines import qwant
from searx.testing import SearxTestCase


class TestQwantEngine(SearxTestCase):

    def test_request(self):
        query = 'test_query'
        dicto = defaultdict(dict)
        dicto['pageno'] = 0
        dicto['language'] = 'fr_FR'
        qwant.categories = ['']
        params = qwant.request(query, dicto)
        self.assertIn('url', params)
        self.assertIn(query, params['url'])
        self.assertIn('web', params['url'])
        self.assertIn('qwant.com', params['url'])
        self.assertIn('fr_fr', params['url'])

        dicto['language'] = 'all'
        qwant.categories = ['news']
        params = qwant.request(query, dicto)
        self.assertFalse('fr' in params['url'])
        self.assertIn('news', params['url'])

    def test_response(self):
        self.assertRaises(AttributeError, qwant.response, None)
        self.assertRaises(AttributeError, qwant.response, [])
        self.assertRaises(AttributeError, qwant.response, '')
        self.assertRaises(AttributeError, qwant.response, '[]')

        response = mock.Mock(text='{}')
        self.assertEqual(qwant.response(response), [])

        response = mock.Mock(text='{"data": {}}')
        self.assertEqual(qwant.response(response), [])

        json = """
        {
          "status": "success",
          "data": {
            "query": {
              "locale": "en_us",
              "query": "Test",
              "offset": 10
            },
            "result": {
              "items": [
                {
                  "title": "Title",
                  "score": 9999,
                  "url": "http://www.url.xyz",
                  "source": "...",
                  "desc": "Description",
                  "date": "",
                  "_id": "db0aadd62c2a8565567ffc382f5c61fa",
                  "favicon": "https://s.qwant.com/fav.ico"
                }
              ],
              "filters": []
            },
            "cache": {
              "key": "e66aa864c00147a0e3a16ff7a5efafde",
              "created": 1433092754,
              "expiration": 259200,
              "status": "miss",
              "age": 0
            }
          }
        }
        """
        response = mock.Mock(text=json)
        qwant.categories = ['general']
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 1)
        self.assertEqual(results[0]['title'], 'Title')
        self.assertEqual(results[0]['url'], 'http://www.url.xyz')
        self.assertEqual(results[0]['content'], 'Description')

        json = """
        {
          "status": "success",
          "data": {
            "query": {
              "locale": "en_us",
              "query": "Test",
              "offset": 10
            },
            "result": {
              "items": [
                {
                  "title": "Title",
                  "score": 9999,
                  "url": "http://www.url.xyz",
                  "source": "...",
                  "media": "http://image.jpg",
                  "desc": "",
                  "thumbnail": "http://thumbnail.jpg",
                  "date": "",
                  "_id": "db0aadd62c2a8565567ffc382f5c61fa",
                  "favicon": "https://s.qwant.com/fav.ico"
                }
              ],
              "filters": []
            },
            "cache": {
              "key": "e66aa864c00147a0e3a16ff7a5efafde",
              "created": 1433092754,
              "expiration": 259200,
              "status": "miss",
              "age": 0
            }
          }
        }
        """
        response = mock.Mock(text=json)
        qwant.categories = ['images']
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 1)
        self.assertEqual(results[0]['title'], 'Title')
        self.assertEqual(results[0]['url'], 'http://www.url.xyz')
        self.assertEqual(results[0]['content'], '')
        self.assertEqual(results[0]['thumbnail_src'], 'http://thumbnail.jpg')
        self.assertEqual(results[0]['img_src'], 'http://image.jpg')

        json = """
        {
          "status": "success",
          "data": {
            "query": {
              "locale": "en_us",
              "query": "Test",
              "offset": 10
            },
            "result": {
              "items": [
                {
                  "title": "Title",
                  "score": 9999,
                  "url": "http://www.url.xyz",
                  "source": "...",
                  "desc": "Description",
                  "date": 1433260920,
                  "_id": "db0aadd62c2a8565567ffc382f5c61fa",
                  "favicon": "https://s.qwant.com/fav.ico"
                }
              ],
              "filters": []
            },
            "cache": {
              "key": "e66aa864c00147a0e3a16ff7a5efafde",
              "created": 1433092754,
              "expiration": 259200,
              "status": "miss",
              "age": 0
            }
          }
        }
        """
        response = mock.Mock(text=json)
        qwant.categories = ['news']
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 1)
        self.assertEqual(results[0]['title'], 'Title')
        self.assertEqual(results[0]['url'], 'http://www.url.xyz')
        self.assertEqual(results[0]['content'], 'Description')
        self.assertIn('publishedDate', results[0])

        json = """
        {
          "status": "success",
          "data": {
            "query": {
              "locale": "en_us",
              "query": "Test",
              "offset": 10
            },
            "result": {
              "items": [
                {
                  "title": "Title",
                  "score": 9999,
                  "url": "http://www.url.xyz",
                  "source": "...",
                  "desc": "Description",
                  "date": 1433260920,
                  "_id": "db0aadd62c2a8565567ffc382f5c61fa",
                  "favicon": "https://s.qwant.com/fav.ico"
                }
              ],
              "filters": []
            },
            "cache": {
              "key": "e66aa864c00147a0e3a16ff7a5efafde",
              "created": 1433092754,
              "expiration": 259200,
              "status": "miss",
              "age": 0
            }
          }
        }
        """
        response = mock.Mock(text=json)
        qwant.categories = ['social media']
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 1)
        self.assertEqual(results[0]['title'], 'Title')
        self.assertEqual(results[0]['url'], 'http://www.url.xyz')
        self.assertEqual(results[0]['content'], 'Description')
        self.assertIn('publishedDate', results[0])

        json = """
        {
          "status": "success",
          "data": {
            "query": {
              "locale": "en_us",
              "query": "Test",
              "offset": 10
            },
            "result": {
              "items": [
                {
                  "title": "Title",
                  "score": 9999,
                  "url": "http://www.url.xyz",
                  "source": "...",
                  "desc": "Description",
                  "date": 1433260920,
                  "_id": "db0aadd62c2a8565567ffc382f5c61fa",
                  "favicon": "https://s.qwant.com/fav.ico"
                }
              ],
              "filters": []
            },
            "cache": {
              "key": "e66aa864c00147a0e3a16ff7a5efafde",
              "created": 1433092754,
              "expiration": 259200,
              "status": "miss",
              "age": 0
            }
          }
        }
        """
        response = mock.Mock(text=json)
        qwant.categories = ['']
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 0)

        json = """
        {
          "status": "success",
          "data": {
            "query": {
              "locale": "en_us",
              "query": "Test",
              "offset": 10
            },
            "result": {
              "filters": []
            },
            "cache": {
              "key": "e66aa864c00147a0e3a16ff7a5efafde",
              "created": 1433092754,
              "expiration": 259200,
              "status": "miss",
              "age": 0
            }
          }
        }
        """
        response = mock.Mock(text=json)
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 0)

        json = """
        {
          "status": "success",
          "data": {
            "query": {
              "locale": "en_us",
              "query": "Test",
              "offset": 10
            },
            "cache": {
              "key": "e66aa864c00147a0e3a16ff7a5efafde",
              "created": 1433092754,
              "expiration": 259200,
              "status": "miss",
              "age": 0
            }
          }
        }
        """
        response = mock.Mock(text=json)
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 0)

        json = """
        {
          "status": "success"
        }
        """
        response = mock.Mock(text=json)
        results = qwant.response(response)
        self.assertEqual(type(results), list)
        self.assertEqual(len(results), 0)
Loading