Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0ce66856 authored by JohnLai's avatar JohnLai
Browse files

floss: Support parsing raw advertising data

Support parsing advertising data to return the scan results to the
Pandora client.

Bug: 313363858
Tag: #floss
Test: m Bluetooth && pts-bot GAP
Flag: EXEMPT floss only changes
Change-Id: Ida766e8ea653bcccd22e1c32d07bb46cb7d78340
parent 59eb7b15
Loading
Loading
Loading
Loading
+28 −0
Original line number Diff line number Diff line
@@ -184,3 +184,31 @@ class CompanyIdentifiers(enum.IntEnum):
    Bluetooth SIG official document: https://www.bluetooth.com/specifications/assigned-numbers/
    """
    GOOGLE = 0x00E0


class AdvertisingDataType(enum.IntEnum):
    FLAGS = 0x01
    INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
    COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
    INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
    COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
    INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
    COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
    SHORTENED_LOCAL_NAME = 0x08
    COMPLETE_LOCAL_NAME = 0x09
    TX_POWER_LEVEL = 0x0A
    CLASS_OF_DEVICE = 0x0D
    SLAVE_CONNECTION_INTERVAL_RANGE = 0x12
    LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
    LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
    SERVICE_DATA_16_BIT_UUID = 0x16
    PUBLIC_TARGET_ADDRESS = 0x17
    RANDOM_TARGET_ADDRESS = 0x18
    APPEARANCE = 0x19
    ADVERTISING_INTERVAL = 0x1A
    LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
    SERVICE_DATA_32_BIT_UUID = 0x20
    SERVICE_DATA_128_BIT_UUID = 0x21
    URI = 0x24
    LE_SUPPORTED_FEATURES = 0x27
    MANUFACTURER_SPECIFIC_DATA = 0xFF
+42 −0
Original line number Diff line number Diff line
@@ -13,6 +13,8 @@
# limitations under the License.
"""All floss utils functions."""

from typing import List, Optional

import functools
import logging
import threading
@@ -452,3 +454,43 @@ def create_observer_name(observer):
# when we are able to use it.
async def anext(ait):
    return await ait.__anext__()


def parse_advertiging_data(adv_data: List[int]) -> host_pb2.DataTypes:
    index = 0
    data = host_pb2.DataTypes()

    # advertising data packet is repeated by many advertising data and each one with the following format:
    # | Length (0) | Type (1) | Data Payload (2~N-1)
    # Take an advertising data packet [2, 1, 6, 5, 3, 0, 24, 1, 24] as an example,
    # The first 3 numbers 2, 1, 6:
    # 2 is the data length is 2 including the data type,
    # 1 is the data type is Flags (0x01),
    # 6 is the data payload.
    while index < len(adv_data):
        # Extract data length.
        data_length = adv_data[index]
        index = index + 1

        if data_length <= 0:
            break

        # Extract data type.
        data_type = adv_data[index]
        index = index + 1

        # Extract data payload.
        if data_type == floss_enums.AdvertisingDataType.COMPLETE_LOCAL_NAME:
            data.complete_local_name = parse_complete_local_name(adv_data[index:index + data_length - 1])
            logging.info('complete_local_name: %s', data.complete_local_name)
        else:
            logging.debug('Unsupported advertising data type to parse: %s', data_type)

        index = index + data_length - 1

    logging.info('Parsed data: %s', data)
    return data


def parse_complete_local_name(data: List[int]) -> Optional[str]:
    return ''.join(chr(char) for char in data) if data else None
+4 −0
Original line number Diff line number Diff line
@@ -474,6 +474,9 @@ class HostService(host_grpc_aio.HostServicer):
                elif scan_result['addr_type'] == floss_enums.BleAddressType.BLE_ADDR_RANDOM_ID:
                    response.random_static_identity = address

                data = utils.parse_advertiging_data(scan_result['adv_data'])
                response.data.CopyFrom(data)

                # TODO: b/289480188 - Support more data if needed.
                mode = host_pb2.NOT_DISCOVERABLE
                if scan_result['flags'] & (1 << 0):
@@ -483,6 +486,7 @@ class HostService(host_grpc_aio.HostServicer):
                else:
                    mode = host_pb2.NOT_DISCOVERABLE
                response.data.le_discoverability_mode = mode

                yield response
        finally:
            if scanner_id is not None: