Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3e0b75ce authored by Nivesh Krishna's avatar Nivesh Krishna
Browse files

remove httpx

parent 6f8183be
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -8,7 +8,7 @@ from urllib.parse import urlencode
from searx.utils import match_language, HTMLTextExtractor
from searx.utils import match_language, HTMLTextExtractor
from searx import logger
from searx import logger
import re
import re
from searx.network import get
from searx.poolrequests import get


logger = logger.getChild('ddg engine')
logger = logger.getChild('ddg engine')
# about
# about
+1 −2
Original line number Original line Diff line number Diff line
@@ -14,8 +14,7 @@ from searx.utils import (
    extract_text,
    extract_text,
    match_language,
    match_language,
)
)
from searx.network import get
from searx.poolrequests import get

# about
# about
about = {
about = {
    "website": 'https://lite.duckduckgo.com/lite',
    "website": 'https://lite.duckduckgo.com/lite',

searx/network/__init__.py

deleted100644 → 0
+0 −205
Original line number Original line Diff line number Diff line
# SPDX-License-Identifier: AGPL-3.0-or-later

import asyncio
import threading
import concurrent.futures
from time import time
from queue import SimpleQueue
from types import MethodType

import httpx
import h2.exceptions

from .network import get_network, initialize, check_network_configuration
from .client import get_loop
from .raise_for_httperror import raise_for_httperror
from searx import settings
from .cache import redis_cache


THREADLOCAL = threading.local()


def reset_time_for_thread():
    THREADLOCAL.total_time = 0


def get_time_for_thread():
    return THREADLOCAL.total_time


def set_timeout_for_thread(timeout, start_time=None):
    THREADLOCAL.timeout = timeout
    THREADLOCAL.start_time = start_time


def set_context_network_name(network_name):
    THREADLOCAL.network = get_network(network_name)


def get_context_network():
    try:
        return THREADLOCAL.network
    except AttributeError:
        return get_network()


def request(method, url, **kwargs):
    """same as requests/requests/api.py request(...)"""
    time_before_request = time()

    # timeout (httpx)
    if 'timeout' in kwargs:
        timeout = kwargs['timeout']
    else:
        timeout = getattr(THREADLOCAL, 'timeout', None)
        if timeout is not None:
            kwargs['timeout'] = timeout

    # 2 minutes timeout for the requests without timeout
    timeout = timeout or 120

    # ajdust actual timeout
    timeout += 0.2  # overhead
    start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
    if start_time:
        timeout -= time() - start_time

    # raise_for_error
    check_for_httperror = True
    if 'raise_for_httperror' in kwargs:
        check_for_httperror = kwargs['raise_for_httperror']
        del kwargs['raise_for_httperror']

    # requests compatibility
    if isinstance(url, bytes):
        url = url.decode()

    # network
    network = get_context_network()

    # do request
    future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
    try:
        response = future.result(timeout)
    except concurrent.futures.TimeoutError as e:
        raise httpx.TimeoutException('Timeout', request=None) from e

    # requests compatibility
    # see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
    response.ok = not response.is_error

    # update total_time.
    # See get_time_for_thread() and reset_time_for_thread()
    if hasattr(THREADLOCAL, 'total_time'):
        time_after_request = time()
        THREADLOCAL.total_time += time_after_request - time_before_request

    # raise an exception
    if check_for_httperror:
        raise_for_httperror(response)

    return response


if "redis_host" not in settings["server"]:
    def get(url, **kwargs):
        kwargs.setdefault('follow_redirects', True)
        return request('get', url, **kwargs)


def options(url, **kwargs):
    kwargs.setdefault('follow_redirects', True)
    return request('options', url, **kwargs)


def head(url, **kwargs):
    kwargs.setdefault('follow_redirects', False)
    return request('head', url, **kwargs)


if "redis_host" not in settings["server"]:
    def post(url, data=None, **kwargs):
        return request('post', url, data=data, **kwargs)


def put(url, data=None, **kwargs):
    return request('put', url, data=data, **kwargs)


def patch(url, data=None, **kwargs):
    return request('patch', url, data=data, **kwargs)


def delete(url, **kwargs):
    return request('delete', url, **kwargs)


if "redis_host" in settings["server"]:
    @redis_cache()
    def get(url, **kwargs):
        kwargs = kwargs.get("kwargs", kwargs)
        kwargs.setdefault('allow_redirects', True)
        return request('get', url, **kwargs)

    @redis_cache()
    def post(url, data=None, **kwargs):
        kwargs = kwargs.get("kwargs", kwargs)
        return request('post', url, data=data, **kwargs)


async def stream_chunk_to_queue(network, q, method, url, **kwargs):
    try:
        async with await network.stream(method, url, **kwargs) as response:
            q.put(response)
            # aiter_raw: access the raw bytes on the response without applying any HTTP content decoding
            # https://www.python-httpx.org/quickstart/#streaming-responses
            async for chunk in response.aiter_bytes(65536):
                if len(chunk) > 0:
                    q.put(chunk)
    except httpx.ResponseClosed as e:
        # the response was closed
        pass
    except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e:
        q.put(e)
    finally:
        q.put(None)


def _close_response_method(self):
    asyncio.run_coroutine_threadsafe(
        self.aclose(),
        get_loop()
    )


def stream(method, url, **kwargs):
    """Replace httpx.stream.

    Usage:
    stream = poolrequests.stream(...)
    response = next(stream)
    for chunk in stream:
        ...

    httpx.Client.stream requires to write the httpx.HTTPTransport version of the
    the httpx.AsyncHTTPTransport declared above.
    """
    q = SimpleQueue()
    future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(get_network(), q, method, url, **kwargs),
                                              get_loop())
    # yield response
    response = q.get()
    if isinstance(response, Exception):
        raise response
    response.close = MethodType(_close_response_method, response)
    yield response

    # yield chunks
    chunk_or_exception = q.get()
    while chunk_or_exception is not None:
        if isinstance(chunk_or_exception, Exception):
            raise chunk_or_exception
        yield chunk_or_exception
        chunk_or_exception = q.get()
    future.result()

searx/network/client.py

deleted100644 → 0
+0 −162
Original line number Original line Diff line number Diff line
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, global-statement

import asyncio
import logging
from ssl import SSLContext
import threading
from typing import Any, Dict
import uvloop

import httpx
from httpx_socks import AsyncProxyTransport
from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError

from searx import logger


logger = logger.getChild('searx.network.client')
LOOP = None
SSLCONTEXTS: Dict[Any, SSLContext] = {}
TRANSPORT_KWARGS = {
    'trust_env': False,
}


def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
    key = (proxy_url, cert, verify, trust_env, http2)
    if key not in SSLCONTEXTS:
        SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
    return SSLCONTEXTS[key]


class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
    """Block HTTP request"""

    async def handle_async_request(self, request):
        raise httpx.UnsupportedProtocol('HTTP protocol is disabled')


class AsyncProxyTransportFixed(AsyncProxyTransport):
    """Fix httpx_socks.AsyncProxyTransport

    Map python_socks exceptions to httpx.ProxyError exceptions
    """

    async def handle_async_request(self, request):
        try:
            return await super().handle_async_request(request)
        except ProxyConnectionError as e:
            raise httpx.ProxyError("ProxyConnectionError: " + e.strerror, request=request) from e
        except ProxyTimeoutError as e:
            raise httpx.ProxyError("ProxyTimeoutError: " + e.args[0], request=request) from e
        except ProxyError as e:
            raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e


def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
    # support socks5h (requests compatibility):
    # https://requests.readthedocs.io/en/master/user/advanced/#socks
    # socks5://   hostname is resolved on client side
    # socks5h://  hostname is resolved on proxy side
    rdns = False
    socks5h = 'socks5h://'
    if proxy_url.startswith(socks5h):
        proxy_url = 'socks5://' + proxy_url[len(socks5h):]
        rdns = True

    proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
    verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
    return AsyncProxyTransportFixed(
        proxy_type=proxy_type,
        proxy_host=proxy_host,
        proxy_port=proxy_port,
        username=proxy_username,
        password=proxy_password,
        rdns=rdns,
        loop=get_loop(),
        verify=verify,
        http2=http2,
        local_address=local_address,
        limits=limit,
        retries=retries,
        **TRANSPORT_KWARGS,
    )


def get_transport(verify, http2, local_address, proxy_url, limit, retries):
    verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
    return httpx.AsyncHTTPTransport(
        # pylint: disable=protected-access
        verify=verify,
        http2=http2,
        limits=limit,
        proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
        local_address=local_address,
        retries=retries,
        **TRANSPORT_KWARGS,
    )


def iter_proxies(proxies):
    # https://www.python-httpx.org/compatibility/#proxy-keys
    if isinstance(proxies, str):
        yield 'all://', proxies
    elif isinstance(proxies, dict):
        for pattern, proxy_url in proxies.items():
            yield pattern, proxy_url


def new_client(enable_http, verify, enable_http2,
               max_connections, max_keepalive_connections, keepalive_expiry,
               proxies, local_address, retries, max_redirects, hook_log_response):
    limit = httpx.Limits(max_connections=max_connections,
                         max_keepalive_connections=max_keepalive_connections,
                         keepalive_expiry=keepalive_expiry)
    # See https://www.python-httpx.org/advanced/#routing
    mounts = {}
    for pattern, proxy_url in proxies.items():
        if not enable_http and pattern.startswith('http://'):
            continue
        if proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') or proxy_url.startswith('socks5h://'):
            mounts[pattern] = get_transport_for_socks_proxy(
                verify, enable_http2, local_address, proxy_url, limit, retries
            )
        else:
            mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)

    if not enable_http:
        mounts['http://'] = AsyncHTTPTransportNoHttp()

    transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
    event_hooks = None
    if hook_log_response:
        event_hooks = {'response': [hook_log_response]}
    return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects, event_hooks=event_hooks)


def get_loop():
    return LOOP


def init():
    # log
    for logger_name in ('hpack.hpack', 'hpack.table', 'httpx._client'):
        logging.getLogger(logger_name).setLevel(logging.WARNING)

    # loop
    def loop_thread():
        global LOOP
        LOOP = asyncio.new_event_loop()
        LOOP.run_forever()

    thread = threading.Thread(
        target=loop_thread,
        name='asyncio_loop',
        daemon=True,
    )
    thread.start()


init()

searx/network/network.py

deleted100644 → 0
+0 −404

File deleted.

Preview size limit exceeded, changes collapsed.