Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4bcb8983 authored by Nivesh Krishna's avatar Nivesh Krishna
Browse files

Merge branch '4172-fix-crash' into 'master'

Updating lib version which fix TooManyStreams error

Closes e/backlog#4172

See merge request e/cloud/my-spot!96
parents b7f8aadc 5a3c3d8b
Loading
Loading
Loading
Loading
+2 −2
Original line number Original line Diff line number Diff line
@@ -7,11 +7,11 @@ lxml==4.6.3
pygments==2.8.0
pygments==2.8.0
python-dateutil==2.8.2
python-dateutil==2.8.2
pyyaml==5.4.1
pyyaml==5.4.1
httpx[http2]==0.19.0
httpx[http2]==0.21.2
Brotli==1.0.9
Brotli==1.0.9
uvloop==0.16.0; python_version >= '3.7'
uvloop==0.16.0; python_version >= '3.7'
uvloop==0.14.0; python_version < '3.7'
uvloop==0.14.0; python_version < '3.7'
httpx-socks[asyncio]==0.4.1
httpx-socks[asyncio]==0.7.2
langdetect==1.0.9
langdetect==1.0.9
setproctitle==1.2.2
setproctitle==1.2.2
redis==3.4.1
redis==3.4.1
+4 −4
Original line number Original line Diff line number Diff line
@@ -126,17 +126,17 @@ def request(method, url, **kwargs):


if "redis_host" not in settings["server"]:
if "redis_host" not in settings["server"]:
    def get(url, **kwargs):
    def get(url, **kwargs):
        kwargs.setdefault('allow_redirects', True)
        kwargs.setdefault('follow_redirects', True)
        return request('get', url, **kwargs)
        return request('get', url, **kwargs)




def options(url, **kwargs):
def options(url, **kwargs):
    kwargs.setdefault('allow_redirects', True)
    kwargs.setdefault('follow_redirects', True)
    return request('options', url, **kwargs)
    return request('options', url, **kwargs)




def head(url, **kwargs):
def head(url, **kwargs):
    kwargs.setdefault('allow_redirects', False)
    kwargs.setdefault('follow_redirects', False)
    return request('head', url, **kwargs)
    return request('head', url, **kwargs)




@@ -172,7 +172,7 @@ if "redis_host" in settings["server"]:


async def stream_chunk_to_queue(network, q, method, url, **kwargs):
async def stream_chunk_to_queue(network, q, method, url, **kwargs):
    try:
    try:
        async with network.stream(method, url, **kwargs) as response:
        async with await network.stream(method, url, **kwargs) as response:
            q.put(response)
            q.put(response)
            async for chunk in response.aiter_bytes(65536):
            async for chunk in response.aiter_bytes(65536):
                if len(chunk) > 0:
                if len(chunk) > 0:
+82 −132
Original line number Original line Diff line number Diff line
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, global-statement


import asyncio
import asyncio
import logging
import logging
import threading
import threading


import httpcore
import httpx
import httpx
from httpx_socks import AsyncProxyTransport
from httpx_socks import AsyncProxyTransport
from python_socks import (
from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError
    parse_proxy_url,
    ProxyConnectionError,
    ProxyTimeoutError,
    ProxyError
)
import python_socks._errors


from searx import logger
from searx import logger


@@ -26,29 +21,15 @@ else:
    uvloop.install()
    uvloop.install()




logger = logger.getChild('searx.http.client')
logger = logger.getChild('searx.network.client')
LOOP = None
LOOP = None
SSLCONTEXTS = {}
SSLCONTEXTS = {}
TRANSPORT_KWARGS = {
TRANSPORT_KWARGS = {
    'backend': 'asyncio',
    'trust_env': False,
    'trust_env': False,
}
}




async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
    origin = httpcore._utils.url_to_origin(url)
    logger.debug('Drop connections for %r', origin)
    connections_to_close = connection_pool._connections_for_origin(origin)
    for connection in connections_to_close:
        await connection_pool._remove_from_pool(connection)
        try:
            await connection.aclose()
        except httpx.NetworkError as e:
            logger.warning('Error closing an existing connection', exc_info=e)


def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
    global SSLCONTEXTS
    key = (proxy_url, cert, verify, trust_env, http2)
    key = (proxy_url, cert, verify, trust_env, http2)
    if key not in SSLCONTEXTS:
    if key not in SSLCONTEXTS:
        SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
        SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
@@ -58,74 +39,25 @@ def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http
class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
    """Block HTTP request"""
    """Block HTTP request"""


    async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
    async def handle_async_request(self, request):
        raise httpx.UnsupportedProtocol("HTTP protocol is disabled")
        raise httpx.UnsupportedProtocol('HTTP protocol is disabled')




class AsyncProxyTransportFixed(AsyncProxyTransport):
class AsyncProxyTransportFixed(AsyncProxyTransport):
    """Fix httpx_socks.AsyncProxyTransport
    """Fix httpx_socks.AsyncProxyTransport


    Map python_socks exceptions to httpx.ProxyError
    Map python_socks exceptions to httpx.ProxyError exceptions

    Map socket.gaierror to httpx.ConnectError

    Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
    * self._keepalive_sweep()
    * self._response_closed(self, connection)

    Note: AsyncProxyTransport inherit from AsyncConnectionPool
    """
    """


    async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
    async def handle_async_request(self, request):
        retry = 2
        while retry > 0:
            retry -= 1
        try:
        try:
                return await super().handle_async_request(method, url, headers, stream, extensions)
            return await super().handle_async_request(request)
            except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e:
        except ProxyConnectionError as e:
                raise httpx.ProxyError(e)
            raise httpx.ProxyError("ProxyConnectionError: " + e.strerror, request=request) from e
            except OSError as e:
        except ProxyTimeoutError as e:
                # socket.gaierror when DNS resolution fails
            raise httpx.ProxyError("ProxyTimeoutError: " + e.args[0], request=request) from e
                raise httpx.NetworkError(e)
        except ProxyError as e:
            except httpx.RemoteProtocolError as e:
            raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e
                # in case of httpx.RemoteProtocolError: Server disconnected
                await close_connections_for_url(self, url)
                logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
                # retry
            except (httpx.NetworkError, httpx.ProtocolError) as e:
                # httpx.WriteError on HTTP/2 connection leaves a new opened stream
                # then each new request creates a new stream and raise the same WriteError
                await close_connections_for_url(self, url)
                raise e


class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
    """Fix httpx.AsyncHTTPTransport"""

    async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
        retry = 2
        while retry > 0:
            retry -= 1
            try:
                return await super().handle_async_request(method, url, headers, stream, extensions)
            except OSError as e:
                # socket.gaierror when DNS resolution fails
                raise httpx.ConnectError(e)
            except httpx.CloseError as e:
                # httpx.CloseError: [Errno 104] Connection reset by peer
                # raised by _keepalive_sweep()
                #   from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198  # noqa
                await close_connections_for_url(self._pool, url)
                logger.warning('httpx.CloseError: retry', exc_info=e)
                # retry
            except httpx.RemoteProtocolError as e:
                # in case of httpx.RemoteProtocolError: Server disconnected
                await close_connections_for_url(self._pool, url)
                logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
                # retry
            except (httpx.ProtocolError, httpx.NetworkError) as e:
                await close_connections_for_url(self._pool, url)
                raise e




def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
@@ -141,56 +73,65 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit


    proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
    proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
    verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
    verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
    return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
    return AsyncProxyTransportFixed(
                                    username=proxy_username, password=proxy_password,
        proxy_type=proxy_type,
        proxy_host=proxy_host,
        proxy_port=proxy_port,
        username=proxy_username,
        password=proxy_password,
        rdns=rdns,
        rdns=rdns,
        loop=get_loop(),
        loop=get_loop(),
        verify=verify,
        verify=verify,
        http2=http2,
        http2=http2,
        local_address=local_address,
        local_address=local_address,
                                    max_connections=limit.max_connections,
        limits=limit,
                                    max_keepalive_connections=limit.max_keepalive_connections,
                                    keepalive_expiry=limit.keepalive_expiry,
        retries=retries,
        retries=retries,
                                    **TRANSPORT_KWARGS)
        **TRANSPORT_KWARGS,
    )




def get_transport(verify, http2, local_address, proxy_url, limit, retries):
def get_transport(verify, http2, local_address, proxy_url, limit, retries):
    verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
    verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
    return AsyncHTTPTransportFixed(verify=verify,
    return httpx.AsyncHTTPTransport(
        # pylint: disable=protected-access
        verify=verify,
        http2=http2,
        http2=http2,
                                   local_address=local_address,
                                   proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
        limits=limit,
        limits=limit,
        proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
        local_address=local_address,
        retries=retries,
        retries=retries,
                                   **TRANSPORT_KWARGS)
        **TRANSPORT_KWARGS,

    )

def iter_proxies(proxies):
    # https://www.python-httpx.org/compatibility/#proxy-keys
    if isinstance(proxies, str):
        yield 'all://', proxies
    elif isinstance(proxies, dict):
        for pattern, proxy_url in proxies.items():
            yield pattern, proxy_url




def new_client(enable_http, verify, enable_http2,
def new_client(
               max_connections, max_keepalive_connections, keepalive_expiry,
    # pylint: disable=too-many-arguments
               proxies, local_address, retries, max_redirects):
    enable_http,
    limit = httpx.Limits(max_connections=max_connections,
    verify,
    enable_http2,
    max_connections,
    max_keepalive_connections,
    keepalive_expiry,
    proxies,
    local_address,
    retries,
    max_redirects,
    hook_log_response,
):
    limit = httpx.Limits(
        max_connections=max_connections,
        max_keepalive_connections=max_keepalive_connections,
        max_keepalive_connections=max_keepalive_connections,
                         keepalive_expiry=keepalive_expiry)
        keepalive_expiry=keepalive_expiry,
    )
    # See https://www.python-httpx.org/advanced/#routing
    # See https://www.python-httpx.org/advanced/#routing
    mounts = {}
    mounts = {}
    for pattern, proxy_url in iter_proxies(proxies):
    for pattern, proxy_url in proxies.items():
        if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
        if not enable_http and pattern.startswith('http://'):
            continue
            continue
        if proxy_url.startswith('socks4://') \
        if proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') or proxy_url.startswith('socks5h://'):
           or proxy_url.startswith('socks5://') \
            mounts[pattern] = get_transport_for_socks_proxy(
           or proxy_url.startswith('socks5h://'):
                verify, enable_http2, local_address, proxy_url, limit, retries
            mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
            )
                                                            retries)
        else:
        else:
            mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
            mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)


@@ -198,17 +139,26 @@ def new_client(enable_http, verify, enable_http2,
        mounts['http://'] = AsyncHTTPTransportNoHttp()
        mounts['http://'] = AsyncHTTPTransportNoHttp()


    transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
    transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
    return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)

    event_hooks = None
    if hook_log_response:
        event_hooks = {'response': [hook_log_response]}

    return httpx.AsyncClient(
        transport=transport,
        mounts=mounts,
        max_redirects=max_redirects,
        event_hooks=event_hooks,
    )




def get_loop():
def get_loop():
    global LOOP
    return LOOP
    return LOOP




def init():
def init():
    # log
    # log
    for logger_name in ('hpack.hpack', 'hpack.table'):
    for logger_name in ('hpack.hpack', 'hpack.table', 'httpx._client'):
        logging.getLogger(logger_name).setLevel(logging.WARNING)
        logging.getLogger(logger_name).setLevel(logging.WARNING)


    # loop
    # loop
@@ -217,12 +167,12 @@ def init():
        LOOP = asyncio.new_event_loop()
        LOOP = asyncio.new_event_loop()
        LOOP.run_forever()
        LOOP.run_forever()


    th = threading.Thread(
    thread = threading.Thread(
        target=loop_thread,
        target=loop_thread,
        name='asyncio_loop',
        name='asyncio_loop',
        daemon=True,
        daemon=True,
    )
    )
    th.start()
    thread.start()




init()
init()
+182 −85

File changed.

Preview size limit exceeded, changes collapsed.

+7 −0
Original line number Original line Diff line number Diff line
@@ -75,6 +75,13 @@ outgoing: # communication with search engines
    pool_connections : 100 # The maximum number of concurrent connections that may be established.
    pool_connections : 100 # The maximum number of concurrent connections that may be established.
    pool_maxsize : 20 # Allow the connection pool to maintain keep-alive connections below this point.
    pool_maxsize : 20 # Allow the connection pool to maintain keep-alive connections below this point.
    enable_http2: True  # See https://www.python-httpx.org/http2/
    enable_http2: True  # See https://www.python-httpx.org/http2/
    keepalive_expiry: 5.0
    source_ips: []
    using_tor_proxy: False
    proxies: {}
    max_redirects: 30
    retries: 0
    networks: {}
# uncomment below section if you want to use a proxy
# uncomment below section if you want to use a proxy
# see https://2.python-requests.org/en/latest/user/advanced/#proxies
# see https://2.python-requests.org/en/latest/user/advanced/#proxies
# SOCKS proxies are also supported: see https://2.python-requests.org/en/latest/user/advanced/#socks
# SOCKS proxies are also supported: see https://2.python-requests.org/en/latest/user/advanced/#socks
Loading