Loading android/pandora/server/src/com/android/pandora/Host.kt +108 −6 Original line number Diff line number Diff line Loading @@ -66,6 +66,41 @@ import kotlinx.coroutines.runBlocking import pandora.HostGrpc.HostImplBase import pandora.HostProto.* object ByteArrayOps { public fun getUShortAt(input: ByteArray, index: Int): UShort { return ( ((input[index + 1].toUInt() and 0xffU) shl 8) or (input[index].toUInt() and 0xffU)).toUShort() } public fun getShortAt(input: ByteArray, index: Int): Short { return getUShortAt(input, index).toShort() } public fun getUIntAt(input: ByteArray, index: Int): UInt { return ( ((input[index + 3].toUInt() and 0xffU) shl 24) or ((input[index + 2].toUInt() and 0xffU) shl 16) or ((input[index + 1].toUInt() and 0xffU) shl 8) or (input[index].toUInt() and 0xffU)) } public fun getIntAt(input: ByteArray, index: Int): Int { return getUIntAt(input, index).toInt() } public fun getUInt24At(input: ByteArray, index: Int): UInt { return ( ((input[index + 2].toUInt() and 0xffU) shl 16) or ((input[index + 1].toUInt() and 0xffU) shl 8) or (input[index].toUInt() and 0xffU)) } public fun getInt24At(input: ByteArray, index: Int): Int { return getUInt24At(input, index).toInt() } } @kotlinx.coroutines.ExperimentalCoroutinesApi class Host( private val context: Context, Loading Loading @@ -564,15 +599,46 @@ class Host( var dataTypesBuilder = DataTypes.newBuilder().setTxPowerLevel(scanRecord.getTxPowerLevel()) scanData[ScanRecord.DATA_TYPE_LOCAL_NAME_SHORT]?.let { dataTypesBuilder.setShortenedLocalName(it.decodeToString()) } ?: run { dataTypesBuilder.setIncludeShortenedLocalName(false) } scanData[ScanRecord.DATA_TYPE_LOCAL_NAME_COMPLETE]?.let { dataTypesBuilder.setCompleteLocalName(it.decodeToString()) } ?: run { dataTypesBuilder.setIncludeCompleteLocalName(false) } scanData[ScanRecord.DATA_TYPE_ADVERTISING_INTERVAL]?.let { dataTypesBuilder.setAdvertisingInterval(ByteArrayOps.getShortAt(it, 0).toInt()) } scanData[ScanRecord.DATA_TYPE_ADVERTISING_INTERVAL_LONG]?.let { dataTypesBuilder.setAdvertisingInterval(ByteArrayOps.getIntAt(it, 0)) } scanData[ScanRecord.DATA_TYPE_APPEARANCE]?.let { dataTypesBuilder.setAppearance(ByteArrayOps.getShortAt(it, 0).toInt()) } scanData[ScanRecord.DATA_TYPE_CLASS_OF_DEVICE]?.let { dataTypesBuilder.setClassOfDevice(ByteArrayOps.getInt24At(it, 0)) } scanData[ScanRecord.DATA_TYPE_URI]?.let { dataTypesBuilder.setUri(it.decodeToString()) } scanData[ScanRecord.DATA_TYPE_LE_SUPPORTED_FEATURES]?.let { dataTypesBuilder.setLeSupportedFeatures(ByteString.copyFrom(it)) } scanData[ScanRecord.DATA_TYPE_SLAVE_CONNECTION_INTERVAL_RANGE]?.let { dataTypesBuilder.setPeripheralConnectionIntervalMin(ByteArrayOps.getShortAt(it, 0).toInt()) dataTypesBuilder.setPeripheralConnectionIntervalMax(ByteArrayOps.getShortAt(it, 2).toInt()) } for (serviceDataEntry in serviceData) { val parcelUuid = serviceDataEntry.key Log.d(TAG, parcelUuid.uuid.toString()) Loading @@ -580,27 +646,53 @@ class Host( // use upper case uuid as the key if (BluetoothUuid.is16BitUuid(parcelUuid)) { val uuid16 = parcelUuid.uuid.toString().substring(4, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids16(uuid16) dataTypesBuilder.putServiceDataUuid16( uuid16, ByteString.copyFrom(serviceDataEntry.value) ) } else if (BluetoothUuid.is32BitUuid(parcelUuid)) { val uuid32 = parcelUuid.uuid.toString().substring(0, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids32(uuid32) dataTypesBuilder.putServiceDataUuid32( uuid32, ByteString.copyFrom(serviceDataEntry.value) ) } else { val uuid128 = parcelUuid.uuid.toString().uppercase() dataTypesBuilder.addIncompleteServiceClassUuids128(uuid128) dataTypesBuilder.putServiceDataUuid128( uuid128, ByteString.copyFrom(serviceDataEntry.value) ) } } for (serviceUuid in scanRecord.serviceSolicitationUuids ?: listOf<ParcelUuid>()) { Log.d(TAG, serviceUuid.uuid.toString()) if (BluetoothUuid.is16BitUuid(serviceUuid)) { val uuid16 = serviceUuid.uuid.toString().substring(4, 8).uppercase() dataTypesBuilder.addServiceSolicitationUuids16(uuid16) } else if (BluetoothUuid.is32BitUuid(serviceUuid)) { val uuid32 = serviceUuid.uuid.toString().substring(0, 8).uppercase() dataTypesBuilder.addServiceSolicitationUuids32(uuid32) } else { val uuid128 = serviceUuid.uuid.toString().uppercase() dataTypesBuilder.addServiceSolicitationUuids128(uuid128) } } for (serviceUuid in scanRecord.serviceUuids ?: listOf<ParcelUuid>()) { Log.d(TAG, serviceUuid.uuid.toString()) if (BluetoothUuid.is16BitUuid(serviceUuid)) { val uuid16 = serviceUuid.uuid.toString().substring(4, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids16(uuid16) } else if (BluetoothUuid.is32BitUuid(serviceUuid)) { val uuid32 = serviceUuid.uuid.toString().substring(0, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids32(uuid32) } else { val uuid128 = serviceUuid.uuid.toString().uppercase() dataTypesBuilder.addIncompleteServiceClassUuids128(uuid128) } } // Flags DataTypes CSSv10 1.3 Flags val mode: DiscoverabilityMode = when (result.scanRecord.advertiseFlags and 0b11) { Loading Loading @@ -628,12 +720,22 @@ class Host( BluetoothDevice.PHY_LE_CODED -> PrimaryPhy.PRIMARY_CODED else -> PrimaryPhy.UNRECOGNIZED } val secondaryPhy = when (result.getSecondaryPhy()) { ScanResult.PHY_UNUSED -> SecondaryPhy.SECONDARY_NONE BluetoothDevice.PHY_LE_1M -> SecondaryPhy.SECONDARY_1M BluetoothDevice.PHY_LE_2M -> SecondaryPhy.SECONDARY_2M BluetoothDevice.PHY_LE_CODED -> SecondaryPhy.SECONDARY_CODED else -> SecondaryPhy.UNRECOGNIZED } var scanningResponseBuilder = ScanningResponse.newBuilder() .setLegacy(result.isLegacy()) .setConnectable(result.isConnectable()) .setSid(result.getPeriodicAdvertisingInterval()) .setTruncated(result.getDataStatus() == ScanResult.DATA_TRUNCATED) .setSid(result.getAdvertisingSid()) .setPrimaryPhy(primaryPhy) .setSecondaryPhy(secondaryPhy) .setTxPower(result.getTxPower()) .setRssi(result.getRssi()) .setPeriodicAdvertisingInterval(result.getPeriodicAdvertisingInterval().toFloat()) Loading android/pandora/test/le_advertising_test.py +55 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ from mobly.asserts import assert_is_not_none # type: ignore from mobly.asserts import assert_true # type: ignore from mobly.asserts import fail # type: ignore from pandora.host_pb2 import PUBLIC, DataTypes from typing import Optional from typing import Any, Dict, Optional class AdvertisingEventProperties(enum.IntEnum): Loading Loading @@ -119,6 +119,60 @@ class LeAdvertisingTest(base_test.BaseTestClass): # type: ignore[misc] assert_equal(report.data.manufacturer_specific_data, manufacturer_specific_data) assert_false(report.truncated, msg='expected non-truncated advertising report') @parameterized( (dict(incomplete_service_class_uuids16=["183A", "181F"]),), # (dict(complete_service_class_uuids16=["183A", "181F"]),), (dict(incomplete_service_class_uuids32=["FFFF183A", "FFFF181F"]),), # (dict(complete_service_class_uuids32=["FFFF183A", "FFFF181F"]),), (dict(incomplete_service_class_uuids128=["FFFF181F-FFFF-1000-8000-00805F9B34FB"]),), # (dict(complete_service_class_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),), (dict(shortened_local_name="avatar"),), (dict(complete_local_name="avatar_the_last_test_blender"),), (dict(tx_power_level=20),), (dict(class_of_device=0x40680),), (dict(peripheral_connection_interval_min=0x0006, peripheral_connection_interval_max=0x0C80),), (dict(service_solicitation_uuids16=["183A", "181F"]),), (dict(service_solicitation_uuids32=["FFFF183A", "FFFF181F"]),), (dict(service_solicitation_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),), (dict(service_data_uuid16={"183A": bytes([1, 2, 3, 4])}),), (dict(service_data_uuid32={"FFFF183A": bytes([1, 2, 3, 4])}),), (dict(service_data_uuid128={"FFFF181F-FFFF-1000-8000-00805F9B34FB": bytes([1, 2, 3, 4])}),), # (dict(public_target_addresses=[bytes([1, 2, 3, 4, 5, 6]), # bytes([6, 5, 2, 4, 3, 1])]),), # (dict(random_target_addresses=[bytes([1, 2, 3, 4, 5, 6]), # bytes([6, 5, 2, 4, 3, 1])]),), (dict(appearance=0x0591),), (dict(advertising_interval=0x1000),), # (dict(advertising_interval=0x100000),), (dict(uri="https://www.google.com"),), (dict(le_supported_features=bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x10, 0x9F])),), (dict(manufacturer_specific_data=bytes([0, 1, 2, 3, 4])),), # (dict(le_discoverability_mode=DISCOVERABLE_GENERAL),), ) # type: ignore[misc] def test_advertising_data_types(self, advertising_data: Dict[str, Any]) -> None: # Advertise from the Ref device with the specified advertising data. # Validate that the Ref generates the correct advertising data, # and that the dut presents the correct advertising data in the scan # result. advertiser = self.ref.host.Advertise( legacy=True, connectable=True, data=DataTypes(**advertising_data), own_address_type=PUBLIC, ) scanner = self.dut.host.Scan(legacy=False, passive=False) report = next((x for x in scanner if x.public == self.ref.address)) scanner.cancel() advertiser.cancel() assert_true(report.legacy, msg='expected legacy advertising report') assert_equal(report.connectable, True) for (key, value) in advertising_data.items(): # type: ignore [misc] assert_equal(getattr(report.data, key), value) # type: ignore [misc] assert_false(report.truncated, msg='expected non-truncated advertising report') if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) Loading Loading
android/pandora/server/src/com/android/pandora/Host.kt +108 −6 Original line number Diff line number Diff line Loading @@ -66,6 +66,41 @@ import kotlinx.coroutines.runBlocking import pandora.HostGrpc.HostImplBase import pandora.HostProto.* object ByteArrayOps { public fun getUShortAt(input: ByteArray, index: Int): UShort { return ( ((input[index + 1].toUInt() and 0xffU) shl 8) or (input[index].toUInt() and 0xffU)).toUShort() } public fun getShortAt(input: ByteArray, index: Int): Short { return getUShortAt(input, index).toShort() } public fun getUIntAt(input: ByteArray, index: Int): UInt { return ( ((input[index + 3].toUInt() and 0xffU) shl 24) or ((input[index + 2].toUInt() and 0xffU) shl 16) or ((input[index + 1].toUInt() and 0xffU) shl 8) or (input[index].toUInt() and 0xffU)) } public fun getIntAt(input: ByteArray, index: Int): Int { return getUIntAt(input, index).toInt() } public fun getUInt24At(input: ByteArray, index: Int): UInt { return ( ((input[index + 2].toUInt() and 0xffU) shl 16) or ((input[index + 1].toUInt() and 0xffU) shl 8) or (input[index].toUInt() and 0xffU)) } public fun getInt24At(input: ByteArray, index: Int): Int { return getUInt24At(input, index).toInt() } } @kotlinx.coroutines.ExperimentalCoroutinesApi class Host( private val context: Context, Loading Loading @@ -564,15 +599,46 @@ class Host( var dataTypesBuilder = DataTypes.newBuilder().setTxPowerLevel(scanRecord.getTxPowerLevel()) scanData[ScanRecord.DATA_TYPE_LOCAL_NAME_SHORT]?.let { dataTypesBuilder.setShortenedLocalName(it.decodeToString()) } ?: run { dataTypesBuilder.setIncludeShortenedLocalName(false) } scanData[ScanRecord.DATA_TYPE_LOCAL_NAME_COMPLETE]?.let { dataTypesBuilder.setCompleteLocalName(it.decodeToString()) } ?: run { dataTypesBuilder.setIncludeCompleteLocalName(false) } scanData[ScanRecord.DATA_TYPE_ADVERTISING_INTERVAL]?.let { dataTypesBuilder.setAdvertisingInterval(ByteArrayOps.getShortAt(it, 0).toInt()) } scanData[ScanRecord.DATA_TYPE_ADVERTISING_INTERVAL_LONG]?.let { dataTypesBuilder.setAdvertisingInterval(ByteArrayOps.getIntAt(it, 0)) } scanData[ScanRecord.DATA_TYPE_APPEARANCE]?.let { dataTypesBuilder.setAppearance(ByteArrayOps.getShortAt(it, 0).toInt()) } scanData[ScanRecord.DATA_TYPE_CLASS_OF_DEVICE]?.let { dataTypesBuilder.setClassOfDevice(ByteArrayOps.getInt24At(it, 0)) } scanData[ScanRecord.DATA_TYPE_URI]?.let { dataTypesBuilder.setUri(it.decodeToString()) } scanData[ScanRecord.DATA_TYPE_LE_SUPPORTED_FEATURES]?.let { dataTypesBuilder.setLeSupportedFeatures(ByteString.copyFrom(it)) } scanData[ScanRecord.DATA_TYPE_SLAVE_CONNECTION_INTERVAL_RANGE]?.let { dataTypesBuilder.setPeripheralConnectionIntervalMin(ByteArrayOps.getShortAt(it, 0).toInt()) dataTypesBuilder.setPeripheralConnectionIntervalMax(ByteArrayOps.getShortAt(it, 2).toInt()) } for (serviceDataEntry in serviceData) { val parcelUuid = serviceDataEntry.key Log.d(TAG, parcelUuid.uuid.toString()) Loading @@ -580,27 +646,53 @@ class Host( // use upper case uuid as the key if (BluetoothUuid.is16BitUuid(parcelUuid)) { val uuid16 = parcelUuid.uuid.toString().substring(4, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids16(uuid16) dataTypesBuilder.putServiceDataUuid16( uuid16, ByteString.copyFrom(serviceDataEntry.value) ) } else if (BluetoothUuid.is32BitUuid(parcelUuid)) { val uuid32 = parcelUuid.uuid.toString().substring(0, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids32(uuid32) dataTypesBuilder.putServiceDataUuid32( uuid32, ByteString.copyFrom(serviceDataEntry.value) ) } else { val uuid128 = parcelUuid.uuid.toString().uppercase() dataTypesBuilder.addIncompleteServiceClassUuids128(uuid128) dataTypesBuilder.putServiceDataUuid128( uuid128, ByteString.copyFrom(serviceDataEntry.value) ) } } for (serviceUuid in scanRecord.serviceSolicitationUuids ?: listOf<ParcelUuid>()) { Log.d(TAG, serviceUuid.uuid.toString()) if (BluetoothUuid.is16BitUuid(serviceUuid)) { val uuid16 = serviceUuid.uuid.toString().substring(4, 8).uppercase() dataTypesBuilder.addServiceSolicitationUuids16(uuid16) } else if (BluetoothUuid.is32BitUuid(serviceUuid)) { val uuid32 = serviceUuid.uuid.toString().substring(0, 8).uppercase() dataTypesBuilder.addServiceSolicitationUuids32(uuid32) } else { val uuid128 = serviceUuid.uuid.toString().uppercase() dataTypesBuilder.addServiceSolicitationUuids128(uuid128) } } for (serviceUuid in scanRecord.serviceUuids ?: listOf<ParcelUuid>()) { Log.d(TAG, serviceUuid.uuid.toString()) if (BluetoothUuid.is16BitUuid(serviceUuid)) { val uuid16 = serviceUuid.uuid.toString().substring(4, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids16(uuid16) } else if (BluetoothUuid.is32BitUuid(serviceUuid)) { val uuid32 = serviceUuid.uuid.toString().substring(0, 8).uppercase() dataTypesBuilder.addIncompleteServiceClassUuids32(uuid32) } else { val uuid128 = serviceUuid.uuid.toString().uppercase() dataTypesBuilder.addIncompleteServiceClassUuids128(uuid128) } } // Flags DataTypes CSSv10 1.3 Flags val mode: DiscoverabilityMode = when (result.scanRecord.advertiseFlags and 0b11) { Loading Loading @@ -628,12 +720,22 @@ class Host( BluetoothDevice.PHY_LE_CODED -> PrimaryPhy.PRIMARY_CODED else -> PrimaryPhy.UNRECOGNIZED } val secondaryPhy = when (result.getSecondaryPhy()) { ScanResult.PHY_UNUSED -> SecondaryPhy.SECONDARY_NONE BluetoothDevice.PHY_LE_1M -> SecondaryPhy.SECONDARY_1M BluetoothDevice.PHY_LE_2M -> SecondaryPhy.SECONDARY_2M BluetoothDevice.PHY_LE_CODED -> SecondaryPhy.SECONDARY_CODED else -> SecondaryPhy.UNRECOGNIZED } var scanningResponseBuilder = ScanningResponse.newBuilder() .setLegacy(result.isLegacy()) .setConnectable(result.isConnectable()) .setSid(result.getPeriodicAdvertisingInterval()) .setTruncated(result.getDataStatus() == ScanResult.DATA_TRUNCATED) .setSid(result.getAdvertisingSid()) .setPrimaryPhy(primaryPhy) .setSecondaryPhy(secondaryPhy) .setTxPower(result.getTxPower()) .setRssi(result.getRssi()) .setPeriodicAdvertisingInterval(result.getPeriodicAdvertisingInterval().toFloat()) Loading
android/pandora/test/le_advertising_test.py +55 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ from mobly.asserts import assert_is_not_none # type: ignore from mobly.asserts import assert_true # type: ignore from mobly.asserts import fail # type: ignore from pandora.host_pb2 import PUBLIC, DataTypes from typing import Optional from typing import Any, Dict, Optional class AdvertisingEventProperties(enum.IntEnum): Loading Loading @@ -119,6 +119,60 @@ class LeAdvertisingTest(base_test.BaseTestClass): # type: ignore[misc] assert_equal(report.data.manufacturer_specific_data, manufacturer_specific_data) assert_false(report.truncated, msg='expected non-truncated advertising report') @parameterized( (dict(incomplete_service_class_uuids16=["183A", "181F"]),), # (dict(complete_service_class_uuids16=["183A", "181F"]),), (dict(incomplete_service_class_uuids32=["FFFF183A", "FFFF181F"]),), # (dict(complete_service_class_uuids32=["FFFF183A", "FFFF181F"]),), (dict(incomplete_service_class_uuids128=["FFFF181F-FFFF-1000-8000-00805F9B34FB"]),), # (dict(complete_service_class_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),), (dict(shortened_local_name="avatar"),), (dict(complete_local_name="avatar_the_last_test_blender"),), (dict(tx_power_level=20),), (dict(class_of_device=0x40680),), (dict(peripheral_connection_interval_min=0x0006, peripheral_connection_interval_max=0x0C80),), (dict(service_solicitation_uuids16=["183A", "181F"]),), (dict(service_solicitation_uuids32=["FFFF183A", "FFFF181F"]),), (dict(service_solicitation_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),), (dict(service_data_uuid16={"183A": bytes([1, 2, 3, 4])}),), (dict(service_data_uuid32={"FFFF183A": bytes([1, 2, 3, 4])}),), (dict(service_data_uuid128={"FFFF181F-FFFF-1000-8000-00805F9B34FB": bytes([1, 2, 3, 4])}),), # (dict(public_target_addresses=[bytes([1, 2, 3, 4, 5, 6]), # bytes([6, 5, 2, 4, 3, 1])]),), # (dict(random_target_addresses=[bytes([1, 2, 3, 4, 5, 6]), # bytes([6, 5, 2, 4, 3, 1])]),), (dict(appearance=0x0591),), (dict(advertising_interval=0x1000),), # (dict(advertising_interval=0x100000),), (dict(uri="https://www.google.com"),), (dict(le_supported_features=bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x10, 0x9F])),), (dict(manufacturer_specific_data=bytes([0, 1, 2, 3, 4])),), # (dict(le_discoverability_mode=DISCOVERABLE_GENERAL),), ) # type: ignore[misc] def test_advertising_data_types(self, advertising_data: Dict[str, Any]) -> None: # Advertise from the Ref device with the specified advertising data. # Validate that the Ref generates the correct advertising data, # and that the dut presents the correct advertising data in the scan # result. advertiser = self.ref.host.Advertise( legacy=True, connectable=True, data=DataTypes(**advertising_data), own_address_type=PUBLIC, ) scanner = self.dut.host.Scan(legacy=False, passive=False) report = next((x for x in scanner if x.public == self.ref.address)) scanner.cancel() advertiser.cancel() assert_true(report.legacy, msg='expected legacy advertising report') assert_equal(report.connectable, True) for (key, value) in advertising_data.items(): # type: ignore [misc] assert_equal(getattr(report.data, key), value) # type: ignore [misc] assert_false(report.truncated, msg='expected non-truncated advertising report') if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) Loading