Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5a3c3d8b authored by Israel Yago Pereira's avatar Israel Yago Pereira Committed by Nivesh Krishna
Browse files

Updating lib version which fix TooManyStreams error

parent b7f8aadc
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -7,11 +7,11 @@ lxml==4.6.3
pygments==2.8.0
python-dateutil==2.8.2
pyyaml==5.4.1
httpx[http2]==0.19.0
httpx[http2]==0.21.2
Brotli==1.0.9
uvloop==0.16.0; python_version >= '3.7'
uvloop==0.14.0; python_version < '3.7'
httpx-socks[asyncio]==0.4.1
httpx-socks[asyncio]==0.7.2
langdetect==1.0.9
setproctitle==1.2.2
redis==3.4.1
+4 −4
Original line number Diff line number Diff line
@@ -126,17 +126,17 @@ def request(method, url, **kwargs):

if "redis_host" not in settings["server"]:
    def get(url, **kwargs):
        kwargs.setdefault('allow_redirects', True)
        kwargs.setdefault('follow_redirects', True)
        return request('get', url, **kwargs)


def options(url, **kwargs):
    kwargs.setdefault('allow_redirects', True)
    kwargs.setdefault('follow_redirects', True)
    return request('options', url, **kwargs)


def head(url, **kwargs):
    kwargs.setdefault('allow_redirects', False)
    kwargs.setdefault('follow_redirects', False)
    return request('head', url, **kwargs)


@@ -172,7 +172,7 @@ if "redis_host" in settings["server"]:

async def stream_chunk_to_queue(network, q, method, url, **kwargs):
    try:
        async with network.stream(method, url, **kwargs) as response:
        async with await network.stream(method, url, **kwargs) as response:
            q.put(response)
            async for chunk in response.aiter_bytes(65536):
                if len(chunk) > 0:
+82 −132
Original line number Diff line number Diff line
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, global-statement

import asyncio
import logging
import threading

import httpcore
import httpx
from httpx_socks import AsyncProxyTransport
from python_socks import (
    parse_proxy_url,
    ProxyConnectionError,
    ProxyTimeoutError,
    ProxyError
)
import python_socks._errors
from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError

from searx import logger

@@ -26,29 +21,15 @@ else:
    uvloop.install()


logger = logger.getChild('searx.http.client')
logger = logger.getChild('searx.network.client')
LOOP = None
SSLCONTEXTS = {}
TRANSPORT_KWARGS = {
    'backend': 'asyncio',
    'trust_env': False,
}


async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
    origin = httpcore._utils.url_to_origin(url)
    logger.debug('Drop connections for %r', origin)
    connections_to_close = connection_pool._connections_for_origin(origin)
    for connection in connections_to_close:
        await connection_pool._remove_from_pool(connection)
        try:
            await connection.aclose()
        except httpx.NetworkError as e:
            logger.warning('Error closing an existing connection', exc_info=e)


def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
    global SSLCONTEXTS
    key = (proxy_url, cert, verify, trust_env, http2)
    if key not in SSLCONTEXTS:
        SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
@@ -58,74 +39,25 @@ def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http
class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
    """Block HTTP request"""

    async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
        raise httpx.UnsupportedProtocol("HTTP protocol is disabled")
    async def handle_async_request(self, request):
        raise httpx.UnsupportedProtocol('HTTP protocol is disabled')


class AsyncProxyTransportFixed(AsyncProxyTransport):
    """Fix httpx_socks.AsyncProxyTransport

    Map python_socks exceptions to httpx.ProxyError

    Map socket.gaierror to httpx.ConnectError

    Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
    * self._keepalive_sweep()
    * self._response_closed(self, connection)

    Note: AsyncProxyTransport inherit from AsyncConnectionPool
    Map python_socks exceptions to httpx.ProxyError exceptions
    """

    async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
        retry = 2
        while retry > 0:
            retry -= 1
    async def handle_async_request(self, request):
        try:
                return await super().handle_async_request(method, url, headers, stream, extensions)
            except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e:
                raise httpx.ProxyError(e)
            except OSError as e:
                # socket.gaierror when DNS resolution fails
                raise httpx.NetworkError(e)
            except httpx.RemoteProtocolError as e:
                # in case of httpx.RemoteProtocolError: Server disconnected
                await close_connections_for_url(self, url)
                logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
                # retry
            except (httpx.NetworkError, httpx.ProtocolError) as e:
                # httpx.WriteError on HTTP/2 connection leaves a new opened stream
                # then each new request creates a new stream and raise the same WriteError
                await close_connections_for_url(self, url)
                raise e


class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
    """Fix httpx.AsyncHTTPTransport"""

    async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
        retry = 2
        while retry > 0:
            retry -= 1
            try:
                return await super().handle_async_request(method, url, headers, stream, extensions)
            except OSError as e:
                # socket.gaierror when DNS resolution fails
                raise httpx.ConnectError(e)
            except httpx.CloseError as e:
                # httpx.CloseError: [Errno 104] Connection reset by peer
                # raised by _keepalive_sweep()
                #   from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198  # noqa
                await close_connections_for_url(self._pool, url)
                logger.warning('httpx.CloseError: retry', exc_info=e)
                # retry
            except httpx.RemoteProtocolError as e:
                # in case of httpx.RemoteProtocolError: Server disconnected
                await close_connections_for_url(self._pool, url)
                logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
                # retry
            except (httpx.ProtocolError, httpx.NetworkError) as e:
                await close_connections_for_url(self._pool, url)
                raise e
            return await super().handle_async_request(request)
        except ProxyConnectionError as e:
            raise httpx.ProxyError("ProxyConnectionError: " + e.strerror, request=request) from e
        except ProxyTimeoutError as e:
            raise httpx.ProxyError("ProxyTimeoutError: " + e.args[0], request=request) from e
        except ProxyError as e:
            raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e


def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
@@ -141,56 +73,65 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit

    proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
    verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
    return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
                                    username=proxy_username, password=proxy_password,
    return AsyncProxyTransportFixed(
        proxy_type=proxy_type,
        proxy_host=proxy_host,
        proxy_port=proxy_port,
        username=proxy_username,
        password=proxy_password,
        rdns=rdns,
        loop=get_loop(),
        verify=verify,
        http2=http2,
        local_address=local_address,
                                    max_connections=limit.max_connections,
                                    max_keepalive_connections=limit.max_keepalive_connections,
                                    keepalive_expiry=limit.keepalive_expiry,
        limits=limit,
        retries=retries,
                                    **TRANSPORT_KWARGS)
        **TRANSPORT_KWARGS,
    )


def get_transport(verify, http2, local_address, proxy_url, limit, retries):
    verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
    return AsyncHTTPTransportFixed(verify=verify,
    return httpx.AsyncHTTPTransport(
        # pylint: disable=protected-access
        verify=verify,
        http2=http2,
                                   local_address=local_address,
                                   proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
        limits=limit,
        proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
        local_address=local_address,
        retries=retries,
                                   **TRANSPORT_KWARGS)


def iter_proxies(proxies):
    # https://www.python-httpx.org/compatibility/#proxy-keys
    if isinstance(proxies, str):
        yield 'all://', proxies
    elif isinstance(proxies, dict):
        for pattern, proxy_url in proxies.items():
            yield pattern, proxy_url
        **TRANSPORT_KWARGS,
    )


def new_client(enable_http, verify, enable_http2,
               max_connections, max_keepalive_connections, keepalive_expiry,
               proxies, local_address, retries, max_redirects):
    limit = httpx.Limits(max_connections=max_connections,
def new_client(
    # pylint: disable=too-many-arguments
    enable_http,
    verify,
    enable_http2,
    max_connections,
    max_keepalive_connections,
    keepalive_expiry,
    proxies,
    local_address,
    retries,
    max_redirects,
    hook_log_response,
):
    limit = httpx.Limits(
        max_connections=max_connections,
        max_keepalive_connections=max_keepalive_connections,
                         keepalive_expiry=keepalive_expiry)
        keepalive_expiry=keepalive_expiry,
    )
    # See https://www.python-httpx.org/advanced/#routing
    mounts = {}
    for pattern, proxy_url in iter_proxies(proxies):
        if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
    for pattern, proxy_url in proxies.items():
        if not enable_http and pattern.startswith('http://'):
            continue
        if proxy_url.startswith('socks4://') \
           or proxy_url.startswith('socks5://') \
           or proxy_url.startswith('socks5h://'):
            mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
                                                            retries)
        if proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') or proxy_url.startswith('socks5h://'):
            mounts[pattern] = get_transport_for_socks_proxy(
                verify, enable_http2, local_address, proxy_url, limit, retries
            )
        else:
            mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)

@@ -198,17 +139,26 @@ def new_client(enable_http, verify, enable_http2,
        mounts['http://'] = AsyncHTTPTransportNoHttp()

    transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
    return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)

    event_hooks = None
    if hook_log_response:
        event_hooks = {'response': [hook_log_response]}

    return httpx.AsyncClient(
        transport=transport,
        mounts=mounts,
        max_redirects=max_redirects,
        event_hooks=event_hooks,
    )


def get_loop():
    global LOOP
    return LOOP


def init():
    # log
    for logger_name in ('hpack.hpack', 'hpack.table'):
    for logger_name in ('hpack.hpack', 'hpack.table', 'httpx._client'):
        logging.getLogger(logger_name).setLevel(logging.WARNING)

    # loop
@@ -217,12 +167,12 @@ def init():
        LOOP = asyncio.new_event_loop()
        LOOP.run_forever()

    th = threading.Thread(
    thread = threading.Thread(
        target=loop_thread,
        name='asyncio_loop',
        daemon=True,
    )
    th.start()
    thread.start()


init()
+182 −85

File changed.

Preview size limit exceeded, changes collapsed.

+7 −0
Original line number Diff line number Diff line
@@ -75,6 +75,13 @@ outgoing: # communication with search engines
    pool_connections : 100 # The maximum number of concurrent connections that may be established.
    pool_maxsize : 20 # Allow the connection pool to maintain keep-alive connections below this point.
    enable_http2: True  # See https://www.python-httpx.org/http2/
    keepalive_expiry: 5.0
    source_ips: []
    using_tor_proxy: False
    proxies: {}
    max_redirects: 30
    retries: 0
    networks: {}
# uncomment below section if you want to use a proxy
# see https://2.python-requests.org/en/latest/user/advanced/#proxies
# SOCKS proxies are also supported: see https://2.python-requests.org/en/latest/user/advanced/#socks
Loading