Loading searx/utils.py +157 −38 Original line number Diff line number Diff line Loading @@ -39,12 +39,17 @@ lang_to_lc_cache = dict() def searx_useragent(): """Return the searx User Agent""" return 'searx/{searx_version} {suffix}'.format( searx_version=VERSION_STRING, suffix=settings['outgoing'].get('useragent_suffix', '')) def gen_useragent(os=None): """Return a random browser User Agent See searx/data/useragents.json """ return str(useragents['ua'].format(os=os or choice(useragents['os']), version=choice(useragents['versions']))) Loading Loading @@ -99,24 +104,40 @@ class HTMLTextExtractor(HTMLParser): return ''.join(self.result).strip() def html_to_text(html): html = html.replace('\n', ' ') html = ' '.join(html.split()) def html_to_text(html_str): """Extract text from a HTML string Args: * html_str (str): string HTML Returns: * str: extracted text Examples: >>> html_to_text('Example <span id="42">#2</span>') 'Example #2' >>> html_to_text('<style>.span { color: red; }</style><span>Example</span>') 'Example' """ html_str = html_str.replace('\n', ' ') html_str = ' '.join(html_str.split()) s = HTMLTextExtractor() try: s.feed(html) s.feed(html_str) except HTMLTextExtractorException: logger.debug("HTMLTextExtractor: invalid HTML\n%s", html) logger.debug("HTMLTextExtractor: invalid HTML\n%s", html_str) return s.get_text() def extract_text(xpath_results): ''' if xpath_results is list, extract the text from each result and concat the list if xpath_results is a xml element, extract all the text node from it """Extract text from a lxml result * if xpath_results is list, extract the text from each result and concat the list * if xpath_results is a xml element, extract all the text node from it ( text_content() method from lxml ) if xpath_results is a string element, then it's already done ''' * if xpath_results is a string element, then it's already done """ if type(xpath_results) == list: # it's list of result : concat everything using recursive call result = '' Loading @@ -135,7 +156,58 @@ def extract_text(xpath_results): return ' '.join(text.split()) def normalize_url(url): parsed_url = urlparse(url) # add a / at this end of the url if there is no path if not parsed_url.netloc: raise Exception('Cannot parse url') if not parsed_url.path: url += '/' # FIXME : hack for yahoo if parsed_url.hostname == 'search.yahoo.com'\ and parsed_url.path.startswith('/r'): p = parsed_url.path mark = p.find('/**') if mark != -1: return unquote(p[mark + 3:]).decode() return url def extract_url(xpath_results, search_url): """Extract and normalize URL from lxml Element Args: * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s) * search_url (str): Base URL Example: >>> def f(s, search_url): >>> return searx.utils.extract_url(html.fromstring(s), search_url) >>> f('<span id="42">https://example.com</span>', 'http://example.com/') 'https://example.com/' >>> f('https://example.com', 'http://example.com/') 'https://example.com/' >>> f('//example.com', 'http://example.com/') 'http://example.com/' >>> f('//example.com', 'https://example.com/') 'https://example.com/' >>> f('/path?a=1', 'https://example.com') 'https://example.com/path?a=1' >>> f('', 'https://example.com') raise lxml.etree.ParserError >>> searx.utils.extract_url([], 'https://example.com') raise Exception Raises: * Exception * lxml.etree.ParserError Returns: * str: normalized URL """ if xpath_results == []: raise Exception('Empty url resultset') url = extract_text(xpath_results) Loading @@ -158,27 +230,15 @@ def extract_url(xpath_results, search_url): return url def normalize_url(url): parsed_url = urlparse(url) # add a / at this end of the url if there is no path if not parsed_url.netloc: raise Exception('Cannot parse url') if not parsed_url.path: url += '/' # FIXME : hack for yahoo if parsed_url.hostname == 'search.yahoo.com'\ and parsed_url.path.startswith('/r'): p = parsed_url.path mark = p.find('/**') if mark != -1: return unquote(p[mark + 3:]).decode() return url def dict_subset(d, properties): """Extract a subset of a dict Examples: >>> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'C']) {'A': 'a', 'C': 'c'} >>> >> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'D']) {'A': 'a'} """ result = {} for k in properties: if k in d: Loading @@ -186,8 +246,19 @@ def dict_subset(d, properties): return result # get element in list or default value def list_get(a_list, index, default=None): """Get element in list or default value Examples: >>> list_get(['A', 'B', 'C'], 0) 'A' >>> list_get(['A', 'B', 'C'], 3) None >>> list_get(['A', 'B', 'C'], 3, 'default') 'default' >>> list_get(['A', 'B', 'C'], -1) 'C' """ if len(a_list) > index: return a_list[index] else: Loading @@ -195,6 +266,21 @@ def list_get(a_list, index, default=None): def get_torrent_size(filesize, filesize_multiplier): """ Args: * filesize (str): size * filesize_multiplier (str): TB, GB, .... TiB, GiB... Returns: * int: number of bytes Example: >>> get_torrent_size('5', 'GB') 5368709120 >>> get_torrent_size('3.14', 'MiB') 3140000 """ try: filesize = float(filesize) Loading @@ -221,14 +307,18 @@ def get_torrent_size(filesize, filesize_multiplier): def convert_str_to_int(number_str): """Convert number_str to int or 0 if number_str is not a number.""" if number_str.isdigit(): return int(number_str) else: return 0 # convert a variable to integer or return 0 if it's not a number def int_or_zero(num): """Convert num to int or 0. num can be either a str or a list. If num is a list, the first element is converted to int (or return 0 if the list is empty). If num is a str, see convert_str_to_int """ if isinstance(num, list): if len(num) < 1: return 0 Loading @@ -237,6 +327,22 @@ def int_or_zero(num): def is_valid_lang(lang): """Return language code and name if lang describe a language. Examples: >>> is_valid_lang('zz') False >>> is_valid_lang('uk') (True, 'uk', 'ukrainian') >>> is_valid_lang(b'uk') (True, 'uk', 'ukrainian') >>> is_valid_lang('en') (True, 'en', 'english') >>> searx.utils.is_valid_lang('Español') (True, 'es', 'spanish') >>> searx.utils.is_valid_lang('Spanish') (True, 'es', 'spanish') """ if isinstance(lang, bytes): lang = lang.decode() is_abbr = (len(lang) == 2) Loading Loading @@ -264,8 +370,8 @@ def _get_lang_to_lc_dict(lang_list): return value # auxiliary function to match lang_code in lang_list def _match_language(lang_code, lang_list=[], custom_aliases={}): """auxiliary function to match lang_code in lang_list""" # replace language code with a custom alias if necessary if lang_code in custom_aliases: lang_code = custom_aliases[lang_code] Loading @@ -287,8 +393,8 @@ def _match_language(lang_code, lang_list=[], custom_aliases={}): return _get_lang_to_lc_dict(lang_list).get(lang_code, None) # get the language code from lang_list that best matches locale_code def match_language(locale_code, lang_list=[], custom_aliases={}, fallback='en-US'): """get the language code from lang_list that best matches locale_code""" # try to get language from given locale_code language = _match_language(locale_code, lang_list, custom_aliases) if language: Loading Loading @@ -330,6 +436,7 @@ def load_module(filename, module_dir): def to_string(obj): """Convert obj to its string representation.""" if isinstance(obj, str): return obj if isinstance(obj, Number): Loading @@ -341,13 +448,19 @@ def to_string(obj): def ecma_unescape(s): """ python implementation of the unescape javascript function """Python implementation of the unescape javascript function https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape Examples: >>> ecma_unescape('%u5409') '吉' >>> ecma_unescape('%20') ' ' >>> ecma_unescape('%F3') 'ó' """ # s = unicode(s) # "%u5409" becomes "吉" s = ecma_unescape4_re.sub(lambda e: chr(int(e.group(1), 16)), s) # "%20" becomes " ", "%F3" becomes "ó" Loading @@ -371,6 +484,11 @@ def get_engine_from_settings(name): def get_xpath(xpath_str): """Return cached compiled XPath There is no thread lock. Worst case scenario, xpath_str is compiled more than one time. """ result = xpath_cache.get(xpath_str, None) if result is None: result = XPath(xpath_str) Loading @@ -379,5 +497,6 @@ def get_xpath(xpath_str): def eval_xpath(element, xpath_str): """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.""" xpath = get_xpath(xpath_str) return xpath(element) Loading
searx/utils.py +157 −38 Original line number Diff line number Diff line Loading @@ -39,12 +39,17 @@ lang_to_lc_cache = dict() def searx_useragent(): """Return the searx User Agent""" return 'searx/{searx_version} {suffix}'.format( searx_version=VERSION_STRING, suffix=settings['outgoing'].get('useragent_suffix', '')) def gen_useragent(os=None): """Return a random browser User Agent See searx/data/useragents.json """ return str(useragents['ua'].format(os=os or choice(useragents['os']), version=choice(useragents['versions']))) Loading Loading @@ -99,24 +104,40 @@ class HTMLTextExtractor(HTMLParser): return ''.join(self.result).strip() def html_to_text(html): html = html.replace('\n', ' ') html = ' '.join(html.split()) def html_to_text(html_str): """Extract text from a HTML string Args: * html_str (str): string HTML Returns: * str: extracted text Examples: >>> html_to_text('Example <span id="42">#2</span>') 'Example #2' >>> html_to_text('<style>.span { color: red; }</style><span>Example</span>') 'Example' """ html_str = html_str.replace('\n', ' ') html_str = ' '.join(html_str.split()) s = HTMLTextExtractor() try: s.feed(html) s.feed(html_str) except HTMLTextExtractorException: logger.debug("HTMLTextExtractor: invalid HTML\n%s", html) logger.debug("HTMLTextExtractor: invalid HTML\n%s", html_str) return s.get_text() def extract_text(xpath_results): ''' if xpath_results is list, extract the text from each result and concat the list if xpath_results is a xml element, extract all the text node from it """Extract text from a lxml result * if xpath_results is list, extract the text from each result and concat the list * if xpath_results is a xml element, extract all the text node from it ( text_content() method from lxml ) if xpath_results is a string element, then it's already done ''' * if xpath_results is a string element, then it's already done """ if type(xpath_results) == list: # it's list of result : concat everything using recursive call result = '' Loading @@ -135,7 +156,58 @@ def extract_text(xpath_results): return ' '.join(text.split()) def normalize_url(url): parsed_url = urlparse(url) # add a / at this end of the url if there is no path if not parsed_url.netloc: raise Exception('Cannot parse url') if not parsed_url.path: url += '/' # FIXME : hack for yahoo if parsed_url.hostname == 'search.yahoo.com'\ and parsed_url.path.startswith('/r'): p = parsed_url.path mark = p.find('/**') if mark != -1: return unquote(p[mark + 3:]).decode() return url def extract_url(xpath_results, search_url): """Extract and normalize URL from lxml Element Args: * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s) * search_url (str): Base URL Example: >>> def f(s, search_url): >>> return searx.utils.extract_url(html.fromstring(s), search_url) >>> f('<span id="42">https://example.com</span>', 'http://example.com/') 'https://example.com/' >>> f('https://example.com', 'http://example.com/') 'https://example.com/' >>> f('//example.com', 'http://example.com/') 'http://example.com/' >>> f('//example.com', 'https://example.com/') 'https://example.com/' >>> f('/path?a=1', 'https://example.com') 'https://example.com/path?a=1' >>> f('', 'https://example.com') raise lxml.etree.ParserError >>> searx.utils.extract_url([], 'https://example.com') raise Exception Raises: * Exception * lxml.etree.ParserError Returns: * str: normalized URL """ if xpath_results == []: raise Exception('Empty url resultset') url = extract_text(xpath_results) Loading @@ -158,27 +230,15 @@ def extract_url(xpath_results, search_url): return url def normalize_url(url): parsed_url = urlparse(url) # add a / at this end of the url if there is no path if not parsed_url.netloc: raise Exception('Cannot parse url') if not parsed_url.path: url += '/' # FIXME : hack for yahoo if parsed_url.hostname == 'search.yahoo.com'\ and parsed_url.path.startswith('/r'): p = parsed_url.path mark = p.find('/**') if mark != -1: return unquote(p[mark + 3:]).decode() return url def dict_subset(d, properties): """Extract a subset of a dict Examples: >>> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'C']) {'A': 'a', 'C': 'c'} >>> >> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'D']) {'A': 'a'} """ result = {} for k in properties: if k in d: Loading @@ -186,8 +246,19 @@ def dict_subset(d, properties): return result # get element in list or default value def list_get(a_list, index, default=None): """Get element in list or default value Examples: >>> list_get(['A', 'B', 'C'], 0) 'A' >>> list_get(['A', 'B', 'C'], 3) None >>> list_get(['A', 'B', 'C'], 3, 'default') 'default' >>> list_get(['A', 'B', 'C'], -1) 'C' """ if len(a_list) > index: return a_list[index] else: Loading @@ -195,6 +266,21 @@ def list_get(a_list, index, default=None): def get_torrent_size(filesize, filesize_multiplier): """ Args: * filesize (str): size * filesize_multiplier (str): TB, GB, .... TiB, GiB... Returns: * int: number of bytes Example: >>> get_torrent_size('5', 'GB') 5368709120 >>> get_torrent_size('3.14', 'MiB') 3140000 """ try: filesize = float(filesize) Loading @@ -221,14 +307,18 @@ def get_torrent_size(filesize, filesize_multiplier): def convert_str_to_int(number_str): """Convert number_str to int or 0 if number_str is not a number.""" if number_str.isdigit(): return int(number_str) else: return 0 # convert a variable to integer or return 0 if it's not a number def int_or_zero(num): """Convert num to int or 0. num can be either a str or a list. If num is a list, the first element is converted to int (or return 0 if the list is empty). If num is a str, see convert_str_to_int """ if isinstance(num, list): if len(num) < 1: return 0 Loading @@ -237,6 +327,22 @@ def int_or_zero(num): def is_valid_lang(lang): """Return language code and name if lang describe a language. Examples: >>> is_valid_lang('zz') False >>> is_valid_lang('uk') (True, 'uk', 'ukrainian') >>> is_valid_lang(b'uk') (True, 'uk', 'ukrainian') >>> is_valid_lang('en') (True, 'en', 'english') >>> searx.utils.is_valid_lang('Español') (True, 'es', 'spanish') >>> searx.utils.is_valid_lang('Spanish') (True, 'es', 'spanish') """ if isinstance(lang, bytes): lang = lang.decode() is_abbr = (len(lang) == 2) Loading Loading @@ -264,8 +370,8 @@ def _get_lang_to_lc_dict(lang_list): return value # auxiliary function to match lang_code in lang_list def _match_language(lang_code, lang_list=[], custom_aliases={}): """auxiliary function to match lang_code in lang_list""" # replace language code with a custom alias if necessary if lang_code in custom_aliases: lang_code = custom_aliases[lang_code] Loading @@ -287,8 +393,8 @@ def _match_language(lang_code, lang_list=[], custom_aliases={}): return _get_lang_to_lc_dict(lang_list).get(lang_code, None) # get the language code from lang_list that best matches locale_code def match_language(locale_code, lang_list=[], custom_aliases={}, fallback='en-US'): """get the language code from lang_list that best matches locale_code""" # try to get language from given locale_code language = _match_language(locale_code, lang_list, custom_aliases) if language: Loading Loading @@ -330,6 +436,7 @@ def load_module(filename, module_dir): def to_string(obj): """Convert obj to its string representation.""" if isinstance(obj, str): return obj if isinstance(obj, Number): Loading @@ -341,13 +448,19 @@ def to_string(obj): def ecma_unescape(s): """ python implementation of the unescape javascript function """Python implementation of the unescape javascript function https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape Examples: >>> ecma_unescape('%u5409') '吉' >>> ecma_unescape('%20') ' ' >>> ecma_unescape('%F3') 'ó' """ # s = unicode(s) # "%u5409" becomes "吉" s = ecma_unescape4_re.sub(lambda e: chr(int(e.group(1), 16)), s) # "%20" becomes " ", "%F3" becomes "ó" Loading @@ -371,6 +484,11 @@ def get_engine_from_settings(name): def get_xpath(xpath_str): """Return cached compiled XPath There is no thread lock. Worst case scenario, xpath_str is compiled more than one time. """ result = xpath_cache.get(xpath_str, None) if result is None: result = XPath(xpath_str) Loading @@ -379,5 +497,6 @@ def get_xpath(xpath_str): def eval_xpath(element, xpath_str): """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.""" xpath = get_xpath(xpath_str) return xpath(element)