Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4f7a9728 authored by John Lai's avatar John Lai Committed by Gerrit Code Review
Browse files

Merge changes I648da896,I76df131e,I5ddfe446,I0e26708e into main

* changes:
  Floss: Implement Pandora HFP profile (part2).
  Floss: Implement Pandora HFP profile (part1).
  Floss: Implement Pandora modem profile
  Floss: Implements BluetoothTelephony D-Bus client
parents a25efa37 9da407a9
Loading
Loading
Loading
Loading
+334 −0
Original line number Diff line number Diff line
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client class to access the Floss telephony interface."""
import logging

from floss.pandora.floss import observer_base
from floss.pandora.floss import utils


class BluetoothTelephonyCallbacks:
    """Callbacks for the telephony interface.

    Implement this to observe these callbacks when exporting callbacks via register_callback.
    """

    def on_telephony_use(self, addr, state):
        """Called when telephony is in use.

        Args:
            addr: The address of the telephony device.
            state: The boolean value indicating the telephony state.
        """
        pass


class FlossTelephonyClient:
    """Handles method calls and callbacks from the telephony interface."""

    TELEPHONY_SERVICE = 'org.chromium.bluetooth'
    TELEPHONY_INTERFACE = 'org.chromium.bluetooth.BluetoothTelephony'
    TELEPHONY_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/telephony'
    TELEPHONY_CB_INTF = 'org.chromium.bluetooth.BluetoothTelephonyCallback'
    TELEPHONY_CB_OBJ_NAME = 'test_telephony_client'

    class ExportedTelephonyCallbacks(observer_base.ObserverBase):
        """
        <node>
            <interface name="org.chromium.bluetooth.BluetoothTelephonyCallback">
                <method name="OnTelephonyUse">
                    <arg type="s" name="add" direction="in" />
                    <arg type="b" name="state" direction="in" />
                </method>
            </interface>
        </node>
        """

        def __init__(self):
            """Constructs exported callbacks object."""
            observer_base.ObserverBase.__init__(self)

        def OnTelephonyUse(self, addr, state):
            """Handles telephony use callback.

            Args:
                addr: The address of the telephony device.
                state: The boolean value indicating the telephony state.
            """

            for observer in self.observers.values():
                observer.on_telephony_use(addr, state)

    def __init__(self, bus, hci):
        """Constructs the client.

        Args:
            bus: D-Bus bus over which we'll establish connections.
            hci: HCI adapter index. Get this value from `get_default_adapter` on FlossManagerClient.
        """
        self.bus = bus
        self.hci = hci
        self.objpath = self.TELEPHONY_OBJECT_PATTERN.format(hci)

        # We don't register callbacks by default.
        self.callbacks = None

    def __del__(self):
        """Destructor."""
        del self.callbacks

    @utils.glib_callback()
    def on_telephony_use(self, addr, state):
        """Handles telephony use callback.

        Args:
            addr: The address of the telephony device.
            state: The boolean value indicating the telephony state.
        """
        logging.debug('on_telephony_use: addr: %s, state: %s', addr, state)

    def _make_dbus_phone_number(self, number):
        """Makes struct for phone number D-Bus.

        Args:
            number : The phone number to use.

        Returns:
            Dictionary of phone number.
        """
        return utils.dbus_optional_value('s', number)

    @utils.glib_call(False)
    def has_proxy(self):
        """Checks whether telephony proxy can be acquired."""
        return bool(self.proxy())

    def proxy(self):
        """Gets proxy object to telephony interface for method calls."""
        return self.bus.get(self.TELEPHONY_SERVICE, self.objpath)[self.TELEPHONY_INTERFACE]

    @utils.glib_call(None)
    def register_telephony_callback(self):
        """Registers telephony callback for this client if one doesn't already exist.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        if self.callbacks:
            return True

        # Create and publish callbacks
        self.callbacks = self.ExportedTelephonyCallbacks()
        self.callbacks.add_observer('telephony_client', self)
        objpath = utils.generate_dbus_cb_objpath(self.TELEPHONY_CB_OBJ_NAME, self.hci)
        self.bus.register_object(objpath, self.callbacks, None)

        # Register published callbacks with manager daemon
        return self.proxy().RegisterTelephonyCallback(objpath)

    @utils.glib_call(False)
    def set_network_available(self, network_available):
        """Sets network availability status.

        Args:
            network_available: A boolean value indicating whether the device is connected to the cellular network.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().SetNetworkAvailable(network_available)
        return True

    @utils.glib_call(False)
    def set_roaming(self, roaming):
        """Sets roaming mode.

        Args:
            roaming: A boolean value indicating whether the device is in roaming mode.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().SetRoaming(roaming)
        return True

    @utils.glib_call(None)
    def set_signal_strength(self, signal_strength):
        """Sets signal strength.

        Args:
            signal_strength: The signal strength value to be set, ranging from 0 to 5.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().SetSignalStrength(signal_strength)

    @utils.glib_call(None)
    def set_battery_level(self, battery_level):
        """Sets battery level.

        Args:
            battery_level: The battery level value to be set, ranging from 0 to 5.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().SetBatteryLevel(battery_level)

    @utils.glib_call(False)
    def set_phone_ops_enabled(self, enable):
        """Sets phone operations status.

        Args:
            enable: A boolean value indicating whether phone operations are enabled.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().SetPhoneOpsEnabled(enable)
        return True

    @utils.glib_call(False)
    def set_mps_qualification_enabled(self, enable):
        """Sets MPS qualification status.

        Args:
            enable: A boolean value indicating whether MPS qualification is enabled.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().SetMpsQualificationEnabled(enable)
        return True

    @utils.glib_call(None)
    def incoming_call(self, number):
        """Initiates an incoming call with the specified phone number.

        Args:
            number: The phone number of the incoming call.

        Returns:
            True on success, False on failure, None on DBus error.
        """

        return self.proxy().IncomingCall(number)

    @utils.glib_call(None)
    def dialing_call(self, number):
        """Initiates a dialing call with the specified phone number.

        Args:
            number: The phone number to dial.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().DialingCall(number)

    @utils.glib_call(None)
    def answer_call(self):
        """Answers an incoming or dialing call.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().AnswerCall()

    @utils.glib_call(None)
    def hangup_call(self):
        """Hangs up an active, incoming, or dialing call.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().HangupCall()

    @utils.glib_call(None)
    def set_last_call(self, number=None):
        """Sets last call with the specified phone number.

        Args:
            number: Optional phone number value to be set as the last call, Defaults to None if not provided.
        Returns:
            True on success, False on failure, None on DBus error.
        """
        number = self._make_dbus_phone_number(number)
        return self.proxy().SetLastCall(number)

    @utils.glib_call(None)
    def set_memory_call(self, number=None):
        """Sets memory call with the specified phone number.

        Args:
            number: Optional phone number value to be set as the last call, Defaults to None if not provided.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        number = self._make_dbus_phone_number(number)
        return self.proxy().SetMemoryCall(number)

    @utils.glib_call(None)
    def release_held(self):
        """Releases all of the held calls.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().ReleaseHeld()

    @utils.glib_call(None)
    def release_active_accept_held(self):
        """Releases the active call and accepts a held call.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().ReleaseActiveAcceptHeld()

    @utils.glib_call(None)
    def hold_active_accept_held(self):
        """Holds the active call and accepts a held call.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().HoldActiveAcceptHeld()

    @utils.glib_call(None)
    def audio_connect(self, address):
        """Initiates an audio connection to the remote device.

        Args:
            address: The address of the remote device for audio connection.

        Returns:
            True on success, False on failure, None on DBus error.
        """
        return self.proxy().AudioConnect(address)

    @utils.glib_call(False)
    def audio_disconnect(self, address):
        """Disconnects the audio connection to the remote device.

        Args:
            address: The address of the remote device for audio disconnection.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().AudioDisconnect(address)
        return True
+50 −1
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ from floss.pandora.floss import media_client
from floss.pandora.floss import qa_client
from floss.pandora.floss import scanner_client
from floss.pandora.floss import socket_manager
from floss.pandora.floss import telephony_client
from floss.pandora.floss import utils
from gi.repository import GLib
import pydbus
@@ -71,6 +72,7 @@ class Bluetooth(object):
        self.gatt_client = gatt_client.FlossGattClient(self.bus, self.DEFAULT_ADAPTER)
        self.gatt_server = gatt_server.FlossGattServer(self.bus, self.DEFAULT_ADAPTER)
        self.socket_manager = socket_manager.FlossSocketManagerClient(self.bus, self.DEFAULT_ADAPTER)
        self.telephony_client = telephony_client.FlossTelephonyClient(self.bus, self.DEFAULT_ADAPTER)

    def __del__(self):
        if not self.is_clean:
@@ -147,6 +149,9 @@ class Bluetooth(object):
        if not self.socket_manager.register_callbacks():
            logging.error('scanner_client: Failed to register callbacks')
            return False
        if not self.telephony_client.register_telephony_callback():
            logging.error('telephony_client: Failed to register callbacks')
            return False
        return True

    def is_bluetoothd_proxy_valid(self):
@@ -161,7 +166,8 @@ class Bluetooth(object):
            self.media_client.has_proxy(),
            self.gatt_client.has_proxy(),
            self.gatt_server.has_proxy(),
            self.socket_manager.has_proxy()
            self.socket_manager.has_proxy(),
            self.telephony_client.has_proxy()
        ])

        if not proxy_ready:
@@ -199,6 +205,7 @@ class Bluetooth(object):
            self.gatt_client = gatt_client.FlossGattClient(self.bus, default_adapter)
            self.gatt_server = gatt_server.FlossGattServer(self.bus, default_adapter)
            self.socket_manager = socket_manager.FlossSocketManagerClient(self.bus, default_adapter)
            self.telephony_client = telephony_client.FlossTelephonyClient(self.bus, default_adapter)

            try:
                utils.poll_for_condition(
@@ -352,6 +359,45 @@ class Bluetooth(object):
    def write_characteristic(self, address, handle, write_type, auth_req, value):
        return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value)

    def set_mps_qualification_enabled(self, enable):
        return self.telephony_client.set_mps_qualification_enabled(enable)

    def incoming_call(self, number):
        return self.telephony_client.incoming_call(number)

    def set_phone_ops_enabled(self, enable):
        return self.telephony_client.set_phone_ops_enabled(enable)

    def dial_call(self, number):
        return self.telephony_client.dialing_call(number)

    def answer_call(self):
        return self.telephony_client.answer_call()

    def swap_active_call(self):
        return self.telephony_client.hold_active_accept_held()

    def set_last_call(self, number=None):
        return self.telephony_client.set_last_call(number)

    def set_memory_call(self, number=None):
        return self.telephony_client.set_memory_call(number)

    def get_connected_audio_devices(self):
        return self.media_client.devices

    def audio_connect(self, address):
        return self.telephony_client.audio_connect(address)

    def audio_disconnect(self, address):
        return self.telephony_client.audio_disconnect(address)

    def hangup_call(self):
        return self.telephony_client.hangup_call()

    def set_battery_level(self, battery_level):
        return self.telephony_client.set_battery_level(battery_level)

    def gatt_connect(self, address, is_direct, transport):
        return self.gatt_client.connect_client(address, is_direct, transport)

@@ -410,3 +456,6 @@ class Bluetooth(object):

    def disconnect_media(self, address):
        return self.media_client.disconnect(address)

    def incoming_call(self, number):
        return self.telephony_client.incoming_call(number)
+191 −0

File added.

Preview size limit exceeded, changes collapsed.

+54 −0
Original line number Diff line number Diff line
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Modem grpc interface."""

from floss.pandora.server import bluetooth as bluetooth_module
import grpc
from pandora_experimental import modem_grpc_aio
from pandora_experimental import modem_pb2


class Modem(modem_grpc_aio.ModemServicer):
    """Service to trigger modem procedures.

    This class implements the Pandora bluetooth test interfaces,
    where the meta class definition is automatically generated by the protobuf.
    The interface definition can be found in:
    https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/modem.proto
    """

    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
        self.bluetooth = bluetooth

    async def Call(self, request: modem_pb2.CallRequest, context: grpc.ServicerContext) -> modem_pb2.CallResponse:
        phone_number = request.phone_number
        if phone_number is None or len(phone_number) == 0:
            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.')

        call_result = self.bluetooth.incoming_call(phone_number)
        if not call_result:
            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to receive a call.')

        return modem_pb2.CallResponse()

    async def AnswerCall(self, request: modem_pb2.AnswerCallRequest,
                         context: grpc.ServicerContext) -> modem_pb2.AnswerCallResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')

    async def Close(self, request: modem_pb2.CloseRequest, context: grpc.ServicerContext) -> modem_pb2.CloseResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')
+10 −0
Original line number Diff line number Diff line
@@ -19,9 +19,11 @@ import logging
from floss.pandora.server import a2dp
from floss.pandora.server import bluetooth as bluetooth_module
from floss.pandora.server import gatt
from floss.pandora.server import hfp
from floss.pandora.server import hid
from floss.pandora.server import host
from floss.pandora.server import l2cap
from floss.pandora.server import modem
from floss.pandora.server import rfcomm
from floss.pandora.server import security
import grpc
@@ -29,8 +31,10 @@ from pandora import a2dp_grpc_aio
from pandora import host_grpc_aio
from pandora import security_grpc_aio
from pandora_experimental import gatt_grpc_aio
from pandora_experimental import hfp_grpc_aio
from pandora_experimental import hid_grpc_aio
from pandora_experimental import l2cap_grpc_aio
from pandora_experimental import modem_grpc_aio
from pandora_experimental import rfcomm_grpc_aio


@@ -58,6 +62,12 @@ async def serve(port):
            gatt_service = gatt.GATTService(bluetooth)
            gatt_grpc_aio.add_GATTServicer_to_server(gatt_service, server)

            modem_service = modem.Modem(bluetooth)
            modem_grpc_aio.add_ModemServicer_to_server(modem_service, server)

            hfp_service = hfp.HFPService(bluetooth)
            hfp_grpc_aio.add_HFPServicer_to_server(hfp_service, server)

            hid_service = hid.HIDService(bluetooth)
            hid_grpc_aio.add_HIDServicer_to_server(hid_service, server)