Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9b217496 authored by Mohammad Sabri's avatar Mohammad Sabri Committed by JohnLai
Browse files

Floss: Implement Pandora HFP profile (part1).

Bug: 300954040
Bug: 301036159
Bug: 300954041
Bug: 300954043
Bug: 300954042
Bug: 300942872
Bug: 300942873
Bug: 301000128
Bug: 300954044
Bug: 301000132
Bug: 301000135

Test: mma packages/modules/Bluetooth && pts-bot HFP
Tag: #floss
Flag: EXEMPT floss only changes
Change-Id: I76df131eeb6ccd18d8a2e5b324f76af571c6e805
parent 8e6a4445
Loading
Loading
Loading
Loading
+33 −0
Original line number Diff line number Diff line
@@ -359,6 +359,39 @@ class Bluetooth(object):
    def write_characteristic(self, address, handle, write_type, auth_req, value):
        return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value)

    def set_mps_qualification_enabled(self, enable):
        return self.telephony_client.set_mps_qualification_enabled(enable)

    def incoming_call(self, number):
        return self.telephony_client.incoming_call(number)

    def set_phone_ops_enabled(self, enable):
        return self.telephony_client.set_phone_ops_enabled(enable)

    def dial_call(self, number):
        return self.telephony_client.dialing_call(number)

    def answer_call(self):
        return self.telephony_client.answer_call()

    def swap_active_call(self):
        return self.telephony_client.hold_active_accept_held()

    def set_last_call(self, number=None):
        return self.telephony_client.set_last_call(number)

    def set_memory_call(self, number=None):
        return self.telephony_client.set_memory_call(number)

    def get_connected_audio_devices(self):
        return self.media_client.devices

    def audio_connect(self, address):
        return self.telephony_client.audio_connect(address)

    def audio_disconnect(self, address):
        return self.telephony_client.audio_disconnect(address)

    def gatt_connect(self, address, is_direct, transport):
        return self.gatt_client.connect_client(address, is_direct, transport)

+118 −0
Original line number Diff line number Diff line
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""HFP grpc interface."""

from floss.pandora.server import bluetooth as bluetooth_module
import grpc
from pandora_experimental import hfp_grpc_aio
from pandora_experimental import hfp_pb2


class HFPService(hfp_grpc_aio.HFPServicer):
    """Service to trigger Bluetooth HFP procedures.

    This class implements the Pandora bluetooth test interfaces,
    where the meta class definition is automatically generated by the protobuf.
    The interface definition can be found in:
    https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/hfp.proto
    """

    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
        self.bluetooth = bluetooth

    def enable_phone_for_testing(self):
        self.bluetooth.set_phone_ops_enabled(True)
        self.bluetooth.set_mps_qualification_enabled(True)

    async def MakeCall(self, request: hfp_pb2.MakeCallRequest,
                       context: grpc.ServicerContext) -> hfp_pb2.MakeCallResponse:
        self.enable_phone_for_testing()
        number = request.number
        if number is None or len(number) == 0:
            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.')
        call_result = self.bluetooth.dial_call(number)
        if call_result is None or not call_result:
            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to make a call.')
        return hfp_pb2.MakeCallResponse()

    async def MakeCallAsHandsfree(self, request: hfp_pb2.MakeCallAsHandsfreeRequest,
                                  context: grpc.ServicerContext) -> hfp_pb2.MakeCallAsHandsfreeResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')

    async def AnswerCall(self, request: hfp_pb2.AnswerCallRequest,
                         context: grpc.ServicerContext) -> hfp_pb2.AnswerCallResponse:
        self.enable_phone_for_testing()
        answer_result = self.bluetooth.answer_call()
        if answer_result is None or not answer_result:
            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to answer call.')
        return hfp_pb2.AnswerCallResponse()

    async def AnswerCallAsHandsfree(self, request: hfp_pb2.AnswerCallAsHandsfreeRequest,
                                    context: grpc.ServicerContext) -> hfp_pb2.AnswerCallAsHandsfreeResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')

    async def SetAudioPath(self, request: hfp_pb2.SetAudioPathRequest,
                           context: grpc.ServicerContext) -> hfp_pb2.SetAudioPathResponse:
        self.enable_phone_for_testing()
        connected_devices = self.bluetooth.get_connected_audio_devices()
        if len(connected_devices) == 0:
            await context.abort(grpc.StatusCode.INTERNAL, 'No connected devices.')
        if request.audio_path == hfp_pb2.AUDIO_PATH_SPEAKERS:
            self.bluetooth.audio_disconnect(connected_devices[0])
        elif request.audio_path == hfp_pb2.AUDIO_PATH_HANDSFREE:
            self.bluetooth.audio_connect(connected_devices[0])
        return hfp_pb2.SetAudioPathResponse()

    async def SwapActiveCall(self, request: hfp_pb2.SwapActiveCallRequest,
                             context: grpc.ServicerContext) -> hfp_pb2.SwapActiveCallResponse:
        self.enable_phone_for_testing()
        swap_result = self.bluetooth.swap_active_call()
        if swap_result is None or not swap_result:
            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to swap active call.')
        return hfp_pb2.SwapActiveCallResponse()

    async def SetInBandRingtone(self, request: hfp_pb2.SetInBandRingtoneRequest,
                                context: grpc.ServicerContext) -> hfp_pb2.SetInBandRingtoneResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')

    async def ClearCallHistory(self, request: hfp_pb2.ClearCallHistoryRequest,
                               context: grpc.ServicerContext) -> hfp_pb2.ClearCallHistoryResponse:
        self.enable_phone_for_testing()
        if not all((self.bluetooth.set_memory_call(), self.bluetooth.set_last_call())):
            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear hall history.')
        return hfp_pb2.ClearCallHistoryResponse()

    async def EndCallAsHandsfree(self, request: hfp_pb2.EndCallAsHandsfreeRequest,
                                 context: grpc.ServicerContext) -> hfp_pb2.EndCallAsHandsfreeResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')

    async def CallTransferAsHandsfree(self, request: hfp_pb2.CallTransferAsHandsfreeRequest,
                                      context: grpc.ServicerContext) -> hfp_pb2.CallTransferAsHandsfreeResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')

    async def SendDtmfFromHandsfree(self, request: hfp_pb2.SendDtmfFromHandsfreeRequest,
                                    context: grpc.ServicerContext) -> hfp_pb2.SendDtmfFromHandsfreeResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')
+5 −0
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ import logging
from floss.pandora.server import a2dp
from floss.pandora.server import bluetooth as bluetooth_module
from floss.pandora.server import gatt
from floss.pandora.server import hfp
from floss.pandora.server import hid
from floss.pandora.server import host
from floss.pandora.server import l2cap
@@ -30,6 +31,7 @@ from pandora import a2dp_grpc_aio
from pandora import host_grpc_aio
from pandora import security_grpc_aio
from pandora_experimental import gatt_grpc_aio
from pandora_experimental import hfp_grpc_aio
from pandora_experimental import hid_grpc_aio
from pandora_experimental import l2cap_grpc_aio
from pandora_experimental import modem_grpc_aio
@@ -63,6 +65,9 @@ async def serve(port):
            modem_service = modem.Modem(bluetooth)
            modem_grpc_aio.add_ModemServicer_to_server(modem_service, server)

            hfp_service = hfp.HFPService(bluetooth)
            hfp_grpc_aio.add_HFPServicer_to_server(hfp_service, server)

            hid_service = hid.HIDService(bluetooth)
            hid_grpc_aio.add_HIDServicer_to_server(hid_service, server)