Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit bc43487e authored by John Lai's avatar John Lai Committed by Gerrit Code Review
Browse files

Merge changes I0254ece0,I3f67b3a5,I5e160eca into main

* changes:
  Floss: Pandora GATT: Fix ReadCharacteristicsFromUuid return status
  Floss: Pandora GATT: Implement GATT server
  Floss: Implement Pandora A2DP profile
parents 865f0328 5fbf8782
Loading
Loading
Loading
Loading
+50 −0
Original line number Diff line number Diff line
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides audio test data."""

import os


class AudioTestDataException(Exception):
    """Exception for audio test data."""
    pass


class AudioTestData(object):
    """Class to represent audio test data."""

    def __init__(self, data_format=None, path=None, frequencies=None, duration_secs=None):
        """Initializes an audio test file.

        Args:
            data_format: A dict containing data format including
                         file_type, sample_format, channel, and rate.
                         file_type: file type e.g. 'raw' or 'wav'.
                         sample_format: One of the keys in audio_utils.SAMPLE_FORMAT.
                         channel: number of channels.
                         rate: sampling rate.
            path: The path to the file.
            frequencies: A list containing the frequency of each channel in this file.
                         Only applicable to data of sine tone.
            duration_secs: Duration of test file in seconds.

        Raises:
            AudioTestDataException if the path does not exist.
        """
        self.data_format = data_format
        if not os.path.exists(path):
            raise AudioTestDataException('Can not find path %s' % path)
        self.path = path
        self.frequencies = frequencies
        self.duration_secs = duration_secs
+187 −0
Original line number Diff line number Diff line
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import subprocess
import wave

from floss.pandora.floss import audio_test_data
from floss.pandora.floss import cras_utils
from floss.pandora.floss import sox_utils
from floss.pandora.floss import utils

CRAS_BLUETOOTH_OUTPUT_NODE_TYPE = 'BLUETOOTH'

AUDIO_TEST_DIR = '/tmp/audio'

A2DP_TEST_DATA = {
    'rate': 48000,
    'channels': 2,
    'frequencies': (440, 20000),
    'file': os.path.join(AUDIO_TEST_DIR, 'binaural_sine_440hz_20000hz_rate48000_5secs.wav'),
    'recorded_by_sink': os.path.join(AUDIO_TEST_DIR, 'a2dp_recorded_by_sink.wav'),
    'chunk_in_secs': 5,
    'bit_width': 16,
    'format': 'S16_LE',
    'duration': 5,
}

A2DP_PLAYBACK_DATA = {
    'rate': 44100,
    'channels': 2,
    'file': os.path.join(AUDIO_TEST_DIR, 'audio_playback.wav'),
    'sample_width': 2
}

SAMPLE_FORMATS = dict(S32_LE=dict(message='Signed 32-bit integer, little-endian', dtype_str='<i', size_bytes=4),
                      S16_LE=dict(message='Signed 16-bit integer, little-endian', dtype_str='<i', size_bytes=2))


@utils.dbus_safe(None)
def get_selected_output_device_type():
    """Gets the selected audio output node type.

    Returns:
        The node type of the selected output device.
    """
    return str(cras_utils.get_selected_output_device_type())


@utils.dbus_safe(None)
def select_output_node(node_type):
    """Selects the audio output node.

    Args:
        node_type: The node type of the Bluetooth peer device.

    Returns:
        True if the operation succeeds.
    """
    return cras_utils.set_single_selected_output_node(node_type)


def select_audio_output_node():
    """Selects the audio output node through cras."""

    def bluetooth_type_selected(node_type):
        """Checks if the bluetooth node type is selected."""
        selected = get_selected_output_device_type()
        logging.debug('active output node type: %s, expected %s', selected, node_type)
        return selected == node_type

    node_type = CRAS_BLUETOOTH_OUTPUT_NODE_TYPE
    if not select_output_node(node_type):
        return False

    desc = 'waiting for %s as active cras audio output node type' % node_type
    try:
        utils.poll_for_condition(condition=lambda: bluetooth_type_selected(node_type),
                                 timeout=20,
                                 sleep_interval=1,
                                 desc=desc)
    except TimeoutError:
        return False
    return True


def generate_audio_test_data(path, data_format=None, frequencies=None, duration_secs=None, volume_scale=None):
    """Generates audio test data with specified format and frequencies.

    Args:
        path: The path to the file.
        data_format: A dict containing data format including
                     file_type, sample_format, channel, and rate.
                     file_type: file type e.g. 'raw' or 'wav'.
                     sample_format: One of the keys in audio_data.SAMPLE_FORMAT.
                     channel: number of channels.
                     ate: sampling rate.
        frequencies: A list containing the frequency of each channel in this file.
                     Only applicable to data of sine tone.
        duration_secs: Duration of test file in seconds.
        volume_scale: A float for volume scale used in sox command.
                      E.g. 0.5 to scale volume by half. -1.0 to invert.

    Returns:
        An AudioTestData object.
    """
    sox_file_path = path

    if data_format is None:
        data_format = dict(file_type='wav', sample_format='S16_LE', channel=2, rate=48000)

    sample_format = SAMPLE_FORMATS[data_format['sample_format']]
    bits = sample_format['size_bytes'] * 8

    command = sox_utils.generate_sine_tone_cmd(filename=sox_file_path,
                                               channels=data_format['channel'],
                                               bits=bits,
                                               rate=data_format['rate'],
                                               duration=duration_secs,
                                               frequencies=frequencies,
                                               vol=volume_scale,
                                               raw=(data_format['file_type'] == 'raw'))

    logging.info(' '.join(command))
    subprocess.check_call(command)

    test_data = audio_test_data.AudioTestData(data_format=data_format,
                                              path=sox_file_path,
                                              frequencies=frequencies,
                                              duration_secs=duration_secs)

    return test_data


def generate_playback_file(audio_data):
    """Generates the playback file if it does not exist yet.

    Some audio test files may be large. Generate them on the fly to save the storage of the source tree.

    Args:
        audio_data: The audio test data.
    """
    directory = os.path.dirname(audio_data['file'])
    if not os.path.exists(directory):
        os.makedirs(directory)

    if not os.path.exists(audio_data['file']):
        data_format = dict(file_type='wav',
                           sample_format='S16_LE',
                           channel=audio_data['channels'],
                           rate=audio_data['rate'])
        generate_audio_test_data(data_format=data_format,
                                 path=audio_data['file'],
                                 duration_secs=audio_data['duration'],
                                 frequencies=audio_data['frequencies'])
        logging.debug('Audio file generated: %s', audio_data['file'])


def generate_playback_file_from_binary_data(audio_data):
    """Generates wav audio file from binary audio data.

    Args:
        audio_data: The binary audio data.
    """
    directory = os.path.dirname(A2DP_PLAYBACK_DATA['file'])
    if not os.path.exists(directory):
        os.makedirs(directory)

    with wave.open(A2DP_PLAYBACK_DATA['file'], 'wb') as wav_file:
        wav_file.setnchannels(A2DP_PLAYBACK_DATA['channels'])
        wav_file.setframerate(A2DP_PLAYBACK_DATA['rate'])
        wav_file.setsampwidth(A2DP_PLAYBACK_DATA['sample_width'])
        wav_file.writeframes(audio_data)

    logging.debug('wav file generated from binary data: %s', A2DP_PLAYBACK_DATA['file'])
+181 −0
Original line number Diff line number Diff line
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class to hold the GATT service/characteristic/descriptor object."""

import uuid


class Service:
    """Class represents Bluetooth GATT service."""

    def __init__(self,
                 instance_id=None,
                 service_type=None,
                 uuid=None,
                 characteristics=None,
                 included_services=None,
                 value=None):
        self.instance_id = instance_id
        self.service_type = service_type
        self.uuid = uuid
        self.characteristics = characteristics
        self.included_services = included_services
        self.value = value

    def to_dict(self):
        """Converts service object to dictionary.

        Returns:
            GATT service as dict.
        """
        return {
            'instance_id': self.instance_id,
            'service_type': self.service_type,
            'uuid': self.uuid,
            'included_services': [service.to_dict() for service in self.included_services],
            'characteristics': [characteristic.to_dict() for characteristic in self.characteristics],
            'value': self.value
        }


class Characteristic:
    """Class represents Bluetooth GATT characteristic."""

    def __init__(self,
                 instance_id=None,
                 permissions=None,
                 write_type=None,
                 descriptors=None,
                 uuid=None,
                 key_size=None,
                 properties=None,
                 value=None):
        self.instance_id = instance_id
        self.permissions = permissions
        self.write_type = write_type
        self.descriptors = descriptors
        self.uuid = uuid
        self.key_size = key_size
        self.properties = properties
        self.value = value

    def to_dict(self):
        """Converts characteristic object to dictionary.

        Returns:
            GATT characteristic as dict.
        """
        return {
            'properties': self.properties,
            'permissions': self.permissions,
            'uuid': self.uuid,
            'instance_id': self.instance_id,
            'descriptors': [descriptor.to_dict() for descriptor in self.descriptors],
            'key_size': self.key_size,
            'write_type': self.write_type,
            'value': self.value
        }


class Descriptor:
    """Class represents Bluetooth GATT descriptor."""

    def __init__(self, permissions=None, uuid=None, instance_id=None, value=None):
        self.permissions = permissions
        self.uuid = uuid
        self.instance_id = instance_id
        self.value = value

    def to_dict(self):
        """Converts descriptor object to dictionary.

        Returns:
            GATT descriptor as dict.
        """
        return {
            'instance_id': self.instance_id,
            'permissions': self.permissions,
            'uuid': self.uuid,
            'value': self.value
        }


def create_gatt_service(service):
    """Creates GATT service from a dictionary.

    Args:
        service: Bluetooth GATT service as a dictionary.

    Returns:
        Bluetooth GATT service object.
    """
    return Service(
        instance_id=service['instance_id'],
        service_type=service['service_type'],
        uuid=str(uuid.UUID(bytes=bytes(service['uuid']))).upper(),
        included_services=[create_gatt_service(included_service) for included_service in service['included_services']],
        characteristics=[create_gatt_characteristic(characteristic) for characteristic in service['characteristics']],
        value=service.get('value'))


def create_gatt_characteristic(characteristic):
    """Creates GATT characteristic from a dictionary.

    Args:
        characteristic: Bluetooth GATT characteristic as a dictionary.

    Returns:
        Bluetooth GATT characteristic object.
    """
    return Characteristic(
        properties=characteristic['properties'],
        permissions=characteristic['permissions'],
        uuid=str(uuid.UUID(bytes=bytes(characteristic['uuid']))).upper(),
        instance_id=characteristic['instance_id'],
        descriptors=[create_gatt_characteristic_descriptor(descriptor) for descriptor in characteristic['descriptors']],
        key_size=characteristic['key_size'],
        write_type=characteristic['write_type'],
        value=characteristic.get('value'))


def create_gatt_characteristic_descriptor(descriptor):
    """Creates GATT descriptor from a dictionary.

    Args:
        descriptor: Bluetooth GATT descriptor as a dictionary.

    Returns:
        Bluetooth GATT descriptor object.
    """
    return Descriptor(instance_id=descriptor['instance_id'],
                      permissions=descriptor['permissions'],
                      uuid=str(uuid.UUID(bytes=bytes(descriptor['uuid']))).upper(),
                      value=descriptor.get('value'))


def convert_object_to_dict(obj):
    """Coverts object to dictionary.

    Args:
        obj: Service/Characteristic/Descriptor object.

    Returns:
        A dictionary represents the object.
    """
    if isinstance(obj, (Descriptor, Characteristic, Service)):
        return obj.to_dict()
    elif isinstance(obj, list):
        return [convert_object_to_dict(item) for item in obj]
    else:
        return obj
+228 −0
Original line number Diff line number Diff line
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import atexit
import itertools
import logging
import os
import pipes
import pwd
import select
import subprocess
import threading

TEE_TO_LOGS = object()
_popen_lock = threading.Lock()
_logging_service = None
_command_serial_number = itertools.count(1)

_LOG_BUFSIZE = 4096
_PIPE_CLOSED = -1


class _LoggerProxy(object):

    def __init__(self, logger):
        self._logger = logger

    def fileno(self):
        """Returns the fileno of the logger pipe."""
        return self._logger._pipe[1]

    def __del__(self):
        self._logger.close()


class _PipeLogger(object):

    def __init__(self, level, prefix):
        self._pipe = list(os.pipe())
        self._level = level
        self._prefix = prefix

    def close(self):
        """Closes the logger."""
        if self._pipe[1] != _PIPE_CLOSED:
            os.close(self._pipe[1])
            self._pipe[1] = _PIPE_CLOSED


class _LoggingService(object):

    def __init__(self):
        # Python's list is thread safe
        self._loggers = []

        # Change tuple to list so that we can change the value when
        # closing the pipe.
        self._pipe = list(os.pipe())
        self._thread = threading.Thread(target=self._service_run)
        self._thread.daemon = True
        self._thread.start()

    def _service_run(self):
        terminate_loop = False
        while not terminate_loop:
            rlist = [l._pipe[0] for l in self._loggers]
            rlist.append(self._pipe[0])
            for r in select.select(rlist, [], [])[0]:
                data = os.read(r, _LOG_BUFSIZE)
                if r != self._pipe[0]:
                    self._output_logger_message(r, data)
                elif len(data) == 0:
                    terminate_loop = True
        # Release resources.
        os.close(self._pipe[0])
        for logger in self._loggers:
            os.close(logger._pipe[0])

    def _output_logger_message(self, r, data):
        logger = next(l for l in self._loggers if l._pipe[0] == r)

        if len(data) == 0:
            os.close(logger._pipe[0])
            self._loggers.remove(logger)
            return

        for line in data.split('\n'):
            logging.log(logger._level, '%s%s', logger._prefix, line)

    def create_logger(self, level=logging.DEBUG, prefix=''):
        """Creates a new logger.

        Args:
            level: The desired logging level.
            prefix: The prefix to add to each log entry.
        """
        logger = _PipeLogger(level=level, prefix=prefix)
        self._loggers.append(logger)
        os.write(self._pipe[1], '\0')
        return _LoggerProxy(logger)

    def shutdown(self):
        """Shuts down the logger."""
        if self._pipe[1] != _PIPE_CLOSED:
            os.close(self._pipe[1])
            self._pipe[1] = _PIPE_CLOSED
            self._thread.join()


def create_logger(level=logging.DEBUG, prefix=''):
    """Creates a new logger.

    Args:
        level: The desired logging level.
        prefix: The prefix to add to each log entry.
    """
    global _logging_service
    if _logging_service is None:
        _logging_service = _LoggingService()
        atexit.register(_logging_service.shutdown)
    return _logging_service.create_logger(level=level, prefix=prefix)


def wait_and_check_returncode(*popens):
    """Waits for all the Popens and check the return code is 0.

    Args:
        popens: The Popens to be checked.

    Raises:
        RuntimeError if the return code is not 0.
    """
    error_message = None
    for p in popens:
        if p.wait() != 0:
            error_message = ('Command failed(%d, %d): %s' % (p.pid, p.returncode, p.command))
            logging.error(error_message)
    if error_message:
        raise RuntimeError(error_message)


def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, run_as=None):
    """Executes a child command and wait for it.

    Args:
        args: The command to be executed.
        stdin: The executed program's standard input.
        stdout: The executed program's standard output.
        stderr: The executed program's standard error.
        run_as: If not None, run the command as the given user.

    Returns:
        The output from standard output if 'stdout' is subprocess.PIPE.

    Raises:
        RuntimeException if the return code of the child command is not 0.
    """
    ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr, run_as=run_as)
    out = ps.communicate()[0] if stdout == subprocess.PIPE else None
    wait_and_check_returncode(ps)
    return out


def _run_as(user):
    """Changes the uid and gid of the running process to be that of the
    given user and configures its supplementary groups.

    Don't call this function directly, instead wrap it in a lambda and
    pass that to the preexec_fn argument of subprocess.Popen.

    Example usage:
    subprocess.Popen(..., preexec_fn=lambda: _run_as('chronos'))

    Args:
        user: The user to run as.
    """
    pw = pwd.getpwnam(user)
    os.setgid(pw.pw_gid)
    os.initgroups(user, pw.pw_gid)
    os.setuid(pw.pw_uid)


def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None, run_as=None):
    """Returns a Popen object just as subprocess.Popen does but with the
    executed command stored in Popen.command.

    Args:
        args: The command to be executed.
        stdin: The executed program's standard input.
        stdout: The executed program's standard output.
        stderr: The executed program's standard error.
        env: The executed program's environment.
        run_as: If not None, run the command as the given user.
    """
    command_id = next(_command_serial_number)
    prefix = '[%04d] ' % command_id

    if stdout is TEE_TO_LOGS:
        stdout = create_logger(level=logging.DEBUG, prefix=prefix)
    if stderr is TEE_TO_LOGS:
        stderr = create_logger(level=logging.ERROR, prefix=prefix)

    command = ' '.join(pipes.quote(x) for x in args)
    logging.info('%sRunning: %s', prefix, command)

    preexec_fn = None
    if run_as is not None:
        preexec_fn = lambda: _run_as(run_as)

    # The lock is required for http://crbug.com/323843.
    with _popen_lock:
        ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, env=env, preexec_fn=preexec_fn)

    logging.info('%spid is %d', prefix, ps.pid)
    ps.command_id = command_id
    ps.command = command
    return ps
+852 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading