# Lint as: python3 """Tests for blueberry.tests.bluetooth.bluetooth_latency.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import logging import math import random import string import time from mobly import asserts from mobly import test_runner from mobly.signals import TestAbortClass # Internal import from blueberry.utils import blueberry_base_test from blueberry.utils import bt_test_utils from blueberry.utils import metrics_utils # Internal import class BluetoothLatencyTest(blueberry_base_test.BlueberryBaseTest): @retry.logged_retry_on_exception( retry_intervals=retry.FuzzedExponentialIntervals( initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300)) def _measure_latency(self): """Measures the latency of data transfer over RFCOMM. Sends data from the client device that is read by the server device. Calculates the latency of the transfer. Returns: The latency of the transfer milliseconds. """ # Generates a random message to transfer message = (''.join( random.choice(string.ascii_letters + string.digits) for _ in range(6))) start_time = time.time() write_read_successful = bt_test_utils.write_read_verify_data_sl4a( self.phone, self.derived_bt_device, message, False) end_time = time.time() asserts.assert_true(write_read_successful, 'Failed to send/receive message') return (end_time - start_time) * 1000 def setup_class(self): """Standard Mobly setup class.""" super(BluetoothLatencyTest, self).setup_class() if len(self.android_devices) < 2: raise TestAbortClass( 'Not enough android phones detected (need at least two)') self.phone = self.android_devices[0] self.phone.init_setup() self.phone.sl4a_setup() # We treat the secondary phone as a derived_bt_device in order for the # generic script to work with this android phone properly. Data will be sent # from first phone to the second phone. self.derived_bt_device = self.android_devices[1] self.derived_bt_device.init_setup() self.derived_bt_device.sl4a_setup() self.set_btsnooplogmode_full(self.phone) self.set_btsnooplogmode_full(self.derived_bt_device) self.metrics = ( metrics_utils.BluetoothMetricLogger( metrics_pb2.BluetoothDataTestResult())) self.metrics.add_primary_device_metrics(self.phone) self.metrics.add_connected_device_metrics(self.derived_bt_device) self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM self.iterations = int(self.user_params.get('iterations', 300)) logging.info('Running Bluetooth latency test %s times.', self.iterations) logging.info('Successfully found required devices.') def setup_test(self): """Setup for bluetooth latency test.""" logging.info('Setup Test for test_bluetooth_latency') super(BluetoothLatencyTest, self).setup_test() asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device), 'Failed to establish RFCOMM connection') def test_bluetooth_latency(self): """Tests the latency for a data transfer over RFCOMM.""" metrics = {} latency_list = [] for _ in range(self.iterations): latency_list.append(self._measure_latency()) metrics['data_transfer_protocol'] = self.data_transfer_type metrics['data_latency_min_millis'] = int(min(latency_list)) metrics['data_latency_max_millis'] = int(max(latency_list)) metrics['data_latency_avg_millis'] = int( math.fsum(latency_list) / float(len(latency_list))) logging.info('Latency: %s', metrics) asserts.assert_true(metrics['data_latency_min_millis'] > 0, 'Minimum latency must be greater than 0!') self.metrics.add_test_metrics(metrics) for metric in metrics: self.record_data({ 'Test Name': 'test_bluetooth_latency', 'sponge_properties': { metric: metrics[metric], } }) def teardown_class(self): logging.info('Factory resetting Bluetooth on devices.') self.phone.sl4a.bluetoothSocketConnStop() self.derived_bt_device.sl4a.bluetoothSocketConnStop() self.phone.factory_reset_bluetooth() self.derived_bt_device.factory_reset_bluetooth() super(BluetoothLatencyTest, self).teardown_class() self.record_data({ 'Test Name': 'test_bluetooth_latency', 'sponge_properties': { 'proto_ascii': self.metrics.proto_message_to_ascii(), 'primary_device_build': self.phone.get_device_info()['android_release_id'] } }) if __name__ == '__main__': test_runner.main()