Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a68ee005 authored by Charlie Boutier's avatar Charlie Boutier
Browse files

avatar: add hfpProtocol class into test file

Hfp is being reworks into Bumble hence this class will be reworks.

Bug: 296471045
Test: atest avatar
Change-Id: I28b328a8bf4f66524cba7a6ff6adca77fe3b6602
parent 1c233fa9
Loading
Loading
Loading
Loading
+55 −2
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ from bumble.core import (
    BT_L2CAP_PROTOCOL_ID,
    BT_RFCOMM_PROTOCOL_ID,
)
from bumble.hfp import HfpProtocol
from bumble import rfcomm
from bumble.rfcomm import DLC, Server as RfcommServer
from bumble.sdp import (
    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
@@ -34,6 +34,7 @@ from bumble.sdp import (
    DataElement,
    ServiceAttribute,
)
import collections
from mobly import base_test, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_in  # type: ignore
@@ -41,7 +42,7 @@ from mobly.asserts import assert_not_equal # type: ignore
from mobly.asserts import assert_not_in  # type: ignore
from pandora.host_pb2 import Connection as PandoraConnection
from pandora.security_pb2 import LEVEL2
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Union

SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311

@@ -58,6 +59,57 @@ PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicat
HFP_VERSION_1_7 = 0x0107


# Stub for Audio Gateway implementation
# TODO: b/296471045
class HfpProtocol:
    dlc: rfcomm.DLC
    buffer: str
    lines: collections.deque
    lines_available: asyncio.Event

    def __init__(self, dlc: rfcomm.DLC) -> None:
        self.dlc = dlc
        self.buffer = ''
        self.lines = collections.deque()
        self.lines_available = asyncio.Event()

        dlc.sink = self.feed

    def feed(self, data: Union[bytes, str]) -> None:
        # Convert the data to a string if needed
        if isinstance(data, bytes):
            data = data.decode('utf-8')

        logger.debug(f'<<< Data received: {data}')

        # Add to the buffer and look for lines
        self.buffer += data
        while (separator := self.buffer.find('\r')) >= 0:
            line = self.buffer[:separator].strip()
            self.buffer = self.buffer[separator + 1:]
            if len(line) > 0:
                self.on_line(line)

    def on_line(self, line: str) -> None:
        self.lines.append(line)
        self.lines_available.set()

    def send_command_line(self, line: str) -> None:
        logger.debug(color(f'>>> {line}', 'yellow'))
        self.dlc.write(line + '\r')

    def send_response_line(self, line: str) -> None:
        logger.debug(color(f'>>> {line}', 'yellow'))
        self.dlc.write('\r\n' + line + '\r\n')

    async def next_line(self) -> str:
        await self.lines_available.wait()
        line = self.lines.popleft()
        if not self.lines:
            self.lines_available.clear()
        logger.debug(color(f'<<< {line}', 'green'))
        return line

class HfpClientTest(base_test.BaseTestClass):  # type: ignore[misc]
    devices: Optional[PandoraDevices] = None

@@ -306,6 +358,7 @@ class HfpAgServer:
        self.send_response_line('OK')



if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    test_runner.main()  # type: ignore