401 lines
16 KiB
Python
401 lines
16 KiB
Python
|
"""Tests for blueberry.pbap.bluetooth_pbap."""
|
||
|
|
||
|
import os
|
||
|
import random
|
||
|
import time
|
||
|
|
||
|
from mobly import asserts
|
||
|
from mobly import test_runner
|
||
|
from mobly import signals
|
||
|
from mobly import utils
|
||
|
|
||
|
from mobly.controllers import android_device
|
||
|
|
||
|
from blueberry.utils import blueberry_ui_base_test
|
||
|
from blueberry.utils import bt_constants
|
||
|
from blueberry.utils import bt_test_utils
|
||
|
|
||
|
# The path is used to place the created vcf files.
|
||
|
STORAGE_PATH = '/storage/emulated/0'
|
||
|
|
||
|
# URI for contacts database.
|
||
|
CONTACTS_URI = 'content://com.android.contacts/data/phones'
|
||
|
|
||
|
# Number of seconds to wait for contacts and call logs update.
|
||
|
WAITING_TIMEOUT_SEC = 60
|
||
|
|
||
|
# Number of contacts and call logs to be tested.
|
||
|
TEST_DATA_COUNT = 1000
|
||
|
|
||
|
# Permissions for Contacts app.
|
||
|
PERMISSION_LIST = [
|
||
|
'android.permission.READ_CONTACTS',
|
||
|
'android.permission.WRITE_CONTACTS',
|
||
|
]
|
||
|
|
||
|
|
||
|
class BluetoothPbapTest(blueberry_ui_base_test.BlueberryUiBaseTest):
|
||
|
"""Test Class for Bluetooth PBAP Test."""
|
||
|
|
||
|
def __init__(self, configs):
|
||
|
super().__init__(configs)
|
||
|
self.derived_bt_device = None
|
||
|
self.pri_phone = None
|
||
|
self.pse_mac_address = None
|
||
|
|
||
|
def setup_class(self):
|
||
|
"""Standard Mobly setup class."""
|
||
|
super(BluetoothPbapTest, self).setup_class()
|
||
|
|
||
|
# Bluetooth carkit which role is Phone Book Client Equipment (PCE).
|
||
|
self.derived_bt_device = self.derived_bt_devices[0]
|
||
|
|
||
|
# Primary phone which role is Phone Book Server Equipment (PSE).
|
||
|
self.pri_phone = self.android_devices[0]
|
||
|
self.pri_phone.init_setup()
|
||
|
self.pri_phone.sl4a_setup()
|
||
|
self.derived_bt_device.add_sec_ad_device(self.pri_phone)
|
||
|
|
||
|
# Grant the permissions to Contacts app.
|
||
|
for device in [self.pri_phone, self.derived_bt_device]:
|
||
|
required_permissions = PERMISSION_LIST
|
||
|
# App requires READ_EXTERNAL_STORAGE to read contacts if SDK < 30.
|
||
|
if int(device.build_info['build_version_sdk']) < 30:
|
||
|
required_permissions.append('android.permission.READ_EXTERNAL_STORAGE')
|
||
|
for permission in required_permissions:
|
||
|
device.adb.shell('pm grant com.google.android.contacts %s' % permission)
|
||
|
self.pse_mac_address = self.pri_phone.get_bluetooth_mac_address()
|
||
|
mac_address = self.derived_bt_device.get_bluetooth_mac_address()
|
||
|
self.derived_bt_device.activate_pairing_mode()
|
||
|
self.pri_phone.pair_and_connect_bluetooth(mac_address)
|
||
|
# Sleep until the connection stabilizes.
|
||
|
time.sleep(5)
|
||
|
|
||
|
# Allow permission access for PBAP profile.
|
||
|
self.pri_phone.sl4a.bluetoothChangeProfileAccessPermission(
|
||
|
mac_address,
|
||
|
bt_constants.BluetoothProfile.PBAP.value,
|
||
|
bt_constants.BluetoothAccessLevel.ACCESS_ALLOWED.value)
|
||
|
|
||
|
def setup_test(self):
|
||
|
super(BluetoothPbapTest, self).setup_test()
|
||
|
# Make sure PBAP is not connected before running tests.
|
||
|
self._terminate_pbap_connection()
|
||
|
|
||
|
def _import_vcf_to_pse(self, file_name, expected_contact_count):
|
||
|
"""Imports the vcf file to PSE."""
|
||
|
# Open ImportVcardActivity and click "OK" in the pop-up dialog, then
|
||
|
# PickActivity will be launched and browses the existing vcf files.
|
||
|
self.pri_phone.adb.shell(
|
||
|
'am start com.google.android.contacts/'
|
||
|
'com.google.android.apps.contacts.vcard.ImportVCardActivity')
|
||
|
self.pri_phone.aud(text='OK').click()
|
||
|
|
||
|
# Check if the vcf file appears in the PickActivity.
|
||
|
if not self.pri_phone.aud(text=file_name).exists():
|
||
|
raise android_device.DeviceError(
|
||
|
self.pri_phone,
|
||
|
'No file name matches "%s" in PickActivity.' % file_name)
|
||
|
|
||
|
# TODO(user): Remove the check of code name for S build.
|
||
|
if (self.pri_phone.build_info['build_version_codename'] != 'S' and
|
||
|
int(self.pri_phone.build_info['build_version_sdk']) <= 30):
|
||
|
# Since `adb shell input tap` cannot work in PickActivity before R build,
|
||
|
# send TAB and ENETER Key events to select and import the vcf file.
|
||
|
if self.pri_phone.aud(content_desc='Grid view').exists():
|
||
|
# Switch Grid mode since ENTER Key event cannot work in List mode on
|
||
|
# git_rvc-d2-release branch.
|
||
|
self.pri_phone.aud(content_desc='Grid view').click()
|
||
|
self.pri_phone.aud.send_key_code('KEYCODE_TAB')
|
||
|
self.pri_phone.aud.send_key_code('KEYCODE_ENTER')
|
||
|
else:
|
||
|
self.pri_phone.aud(text=file_name).click()
|
||
|
self.pri_phone.log.info('Importing "%s"...' % file_name)
|
||
|
current_count = self._wait_and_get_contact_count(
|
||
|
self.pri_phone, expected_contact_count, WAITING_TIMEOUT_SEC)
|
||
|
if current_count != expected_contact_count:
|
||
|
raise android_device.DeviceError(
|
||
|
self.pri_phone,
|
||
|
'Failed to import %d contact(s) within %ds. Actual count: %d' %
|
||
|
(expected_contact_count, WAITING_TIMEOUT_SEC, current_count))
|
||
|
self.pri_phone.log.info(
|
||
|
'Successfully added %d contact(s).' % current_count)
|
||
|
|
||
|
def _generate_contacts_on_pse(self,
|
||
|
num_of_contacts,
|
||
|
first_name=None,
|
||
|
last_name=None,
|
||
|
phone_number=None):
|
||
|
"""Generates contacts to be tested on PSE."""
|
||
|
vcf_file = bt_test_utils.create_vcf_from_vcard(
|
||
|
output_path=self.pri_phone.log_path,
|
||
|
num_of_contacts=num_of_contacts,
|
||
|
first_name=first_name,
|
||
|
last_name=last_name,
|
||
|
phone_number=phone_number)
|
||
|
self.pri_phone.adb.push([vcf_file, STORAGE_PATH])
|
||
|
# For R build, since the pushed vcf file probably not found when importing
|
||
|
# contacts, do a media scan to recognize the file.
|
||
|
if int(self.pri_phone.build_info['build_version_sdk']) > 29:
|
||
|
self.pri_phone.adb.shell('content call --uri content://media/ --method '
|
||
|
'scan_volume --arg external_primary')
|
||
|
file_name = vcf_file.split('/')[-1]
|
||
|
self._import_vcf_to_pse(file_name, num_of_contacts)
|
||
|
self.pri_phone.adb.shell(
|
||
|
'rm -rf %s' % os.path.join(STORAGE_PATH, file_name))
|
||
|
|
||
|
def _generate_call_logs_on_pse(self, call_log_type, num_of_call_logs):
|
||
|
"""Generates call logs to be tested on PSE."""
|
||
|
self.pri_phone.log.info('Putting %d call log(s) which type are "%s"...' %
|
||
|
(num_of_call_logs, call_log_type))
|
||
|
for _ in range(num_of_call_logs):
|
||
|
self.pri_phone.sl4a.callLogsPut(dict(
|
||
|
type=call_log_type,
|
||
|
number='8809%d' % random.randrange(int(10e8)),
|
||
|
time=int(1000 * float(self.pri_phone.adb.shell('date +%s.%N')))))
|
||
|
current_count = self._wait_and_get_call_log_count(
|
||
|
self.pri_phone,
|
||
|
call_log_type,
|
||
|
num_of_call_logs,
|
||
|
WAITING_TIMEOUT_SEC)
|
||
|
if current_count != num_of_call_logs:
|
||
|
raise android_device.DeviceError(
|
||
|
self.pri_phone,
|
||
|
'Failed to generate %d call log(s) within %ds. '
|
||
|
'Actual count: %d, Call log type: %s' %
|
||
|
(num_of_call_logs, WAITING_TIMEOUT_SEC, current_count, call_log_type))
|
||
|
self.pri_phone.log.info(
|
||
|
'Successfully added %d call log(s).' % current_count)
|
||
|
|
||
|
def _wait_and_get_contact_count(self,
|
||
|
device,
|
||
|
expected_contact_count,
|
||
|
timeout_sec):
|
||
|
"""Waits for contact update for a period time and returns contact count.
|
||
|
|
||
|
This method should be used when a device imports some new contacts. It can
|
||
|
wait some time for contact update until expectation or timeout and then
|
||
|
return contact count.
|
||
|
|
||
|
Args:
|
||
|
device: AndroidDevice, Mobly Android controller class.
|
||
|
expected_contact_count: Int, Number of contacts as expected.
|
||
|
timeout_sec: Int, Number of seconds to wait for contact update.
|
||
|
|
||
|
Returns:
|
||
|
current_count: Int, number of the existing contacts on the device.
|
||
|
"""
|
||
|
start_time = time.time()
|
||
|
end_time = start_time + timeout_sec
|
||
|
current_count = 0
|
||
|
while time.time() < end_time:
|
||
|
current_count = device.sl4a.contactsGetCount()
|
||
|
if current_count == expected_contact_count:
|
||
|
break
|
||
|
# Interval between attempts to get contacts.
|
||
|
time.sleep(1)
|
||
|
if current_count != expected_contact_count:
|
||
|
device.log.warning(
|
||
|
'Failed to get expected contact count: %d. '
|
||
|
'Actual contact count: %d.' %
|
||
|
(expected_contact_count, current_count))
|
||
|
return current_count
|
||
|
|
||
|
def _wait_and_get_call_log_count(self,
|
||
|
device,
|
||
|
call_log_type,
|
||
|
expected_call_log_count,
|
||
|
timeout_sec):
|
||
|
"""Waits for call log update for a period time and returns call log count.
|
||
|
|
||
|
This method should be used when a device adds some new call logs. It can
|
||
|
wait some time for call log update until expectation or timeout and then
|
||
|
return call log count.
|
||
|
|
||
|
Args:
|
||
|
device: AndroidDevice, Mobly Android controller class.
|
||
|
call_log_type: String, Type of the call logs.
|
||
|
expected_call_log_count: Int, Number of call logs as expected.
|
||
|
timeout_sec: Int, Number of seconds to wait for call log update.
|
||
|
|
||
|
Returns:
|
||
|
current_count: Int, number of the existing call logs on the device.
|
||
|
"""
|
||
|
start_time = time.time()
|
||
|
end_time = start_time + timeout_sec
|
||
|
current_count = 0
|
||
|
while time.time() < end_time:
|
||
|
current_count = len(device.sl4a.callLogsGet(call_log_type))
|
||
|
if current_count == expected_call_log_count:
|
||
|
break
|
||
|
# Interval between attempts to get call logs.
|
||
|
time.sleep(1)
|
||
|
if current_count != expected_call_log_count:
|
||
|
device.log.warning(
|
||
|
'Failed to get expected call log count: %d. '
|
||
|
'Actual call log count: %d.' %
|
||
|
(expected_call_log_count, current_count))
|
||
|
return current_count
|
||
|
|
||
|
def _terminate_pbap_connection(self):
|
||
|
status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
|
||
|
self.pse_mac_address)
|
||
|
if status == bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
|
||
|
return
|
||
|
self.derived_bt_device.log.info('Disconnecting PBAP...')
|
||
|
self.derived_bt_device.sl4a.bluetoothPbapClientDisconnect(
|
||
|
self.pse_mac_address)
|
||
|
# Buffer for the connection status check.
|
||
|
time.sleep(3)
|
||
|
status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
|
||
|
self.pse_mac_address)
|
||
|
if status != bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
|
||
|
raise signals.TestError('PBAP connection failed to be terminated.')
|
||
|
self.derived_bt_device.log.info('Successfully disconnected PBAP.')
|
||
|
|
||
|
def test_download_contacts(self):
|
||
|
"""Test for the feature of downloading contacts.
|
||
|
|
||
|
Tests that PCE can download contacts from PSE.
|
||
|
"""
|
||
|
# Make sure no any contacts exist on the devices.
|
||
|
for device in [self.pri_phone, self.derived_bt_device]:
|
||
|
device.sl4a.contactsEraseAll()
|
||
|
|
||
|
# Add contacts to PSE.
|
||
|
self._generate_contacts_on_pse(TEST_DATA_COUNT)
|
||
|
|
||
|
# When PCE is connected to PSE, it will download PSE's contacts.
|
||
|
self.derived_bt_device.pbap_connect()
|
||
|
self.derived_bt_device.log.info('Downloading contacts from PSE...')
|
||
|
current_count = self._wait_and_get_contact_count(
|
||
|
self.derived_bt_device, TEST_DATA_COUNT, WAITING_TIMEOUT_SEC)
|
||
|
self.derived_bt_device.log.info(
|
||
|
'Successfully downloaded %d contact(s).' % current_count)
|
||
|
|
||
|
asserts.assert_true(
|
||
|
current_count == TEST_DATA_COUNT,
|
||
|
'PCE failed to download %d contact(s) within %ds, '
|
||
|
'actually downloaded %d contact(s).' %
|
||
|
(TEST_DATA_COUNT, WAITING_TIMEOUT_SEC, current_count))
|
||
|
|
||
|
def test_download_call_logs(self):
|
||
|
"""Test for the feature of downloading call logs.
|
||
|
|
||
|
Tests that PCE can download incoming/outgoing/missed call logs from PSE.
|
||
|
"""
|
||
|
# Make sure no any call logs exist on the devices.
|
||
|
for device in [self.pri_phone, self.derived_bt_device]:
|
||
|
device.sl4a.callLogsEraseAll()
|
||
|
|
||
|
call_log_types = [
|
||
|
bt_constants.INCOMING_CALL_LOG_TYPE,
|
||
|
bt_constants.OUTGOING_CALL_LOG_TYPE,
|
||
|
bt_constants.MISSED_CALL_LOG_TYPE,
|
||
|
]
|
||
|
for call_log_type in call_log_types:
|
||
|
# Add call logs to PSE.
|
||
|
self._generate_call_logs_on_pse(call_log_type, TEST_DATA_COUNT)
|
||
|
|
||
|
# When PCE is connected to PSE, it will download PSE's contacts.
|
||
|
self.derived_bt_device.pbap_connect()
|
||
|
self.derived_bt_device.log.info('Downloading call logs...')
|
||
|
|
||
|
for call_log_type in call_log_types:
|
||
|
current_count = self._wait_and_get_call_log_count(
|
||
|
self.derived_bt_device,
|
||
|
call_log_type,
|
||
|
TEST_DATA_COUNT,
|
||
|
WAITING_TIMEOUT_SEC)
|
||
|
self.derived_bt_device.log.info(
|
||
|
'Successfully downloaded %d call log(s) which type are "%s".' %
|
||
|
(current_count, call_log_type))
|
||
|
|
||
|
asserts.assert_true(
|
||
|
current_count == TEST_DATA_COUNT,
|
||
|
'PCE failed to download %d call log(s) which type are "%s" within %ds'
|
||
|
', actually downloaded %d call log(s).' %
|
||
|
(TEST_DATA_COUNT, call_log_type, WAITING_TIMEOUT_SEC, current_count))
|
||
|
|
||
|
def test_show_caller_name(self):
|
||
|
"""Test for caller name of the incoming phone call is correct on PCE.
|
||
|
|
||
|
Tests that caller name matches contact name which is downloaded via PBAP.
|
||
|
"""
|
||
|
# Checks if two android devices exist.
|
||
|
if len(self.android_devices) < 2:
|
||
|
raise signals.TestError('This test requires two Android devices.')
|
||
|
primary_phone = self.pri_phone
|
||
|
secondary_phone = self.android_devices[1]
|
||
|
secondary_phone.init_setup()
|
||
|
for phone in [primary_phone, secondary_phone]:
|
||
|
# Checks if SIM state is loaded for every devices.
|
||
|
if not phone.is_sim_state_loaded():
|
||
|
raise signals.TestError(f'Please insert a SIM Card to the phone '
|
||
|
f'"{phone.serial}".')
|
||
|
# Checks if phone_number is provided in the support dimensions.
|
||
|
phone.phone_number = phone.dimensions.get('phone_number')
|
||
|
if not phone.phone_number:
|
||
|
raise signals.TestError(f'Please add "phone_number" to support '
|
||
|
f'dimensions of the phone "{phone.serial}".')
|
||
|
# Make sure no any contacts exist on the devices.
|
||
|
for device in [primary_phone, self.derived_bt_device]:
|
||
|
device.sl4a.contactsEraseAll()
|
||
|
# Generate a contact name randomly.
|
||
|
first_name = utils.rand_ascii_str(4)
|
||
|
last_name = utils.rand_ascii_str(4)
|
||
|
full_name = f'{first_name} {last_name}'
|
||
|
primary_phone.log.info('Creating a contact "%s"...', full_name)
|
||
|
self._generate_contacts_on_pse(
|
||
|
num_of_contacts=1,
|
||
|
first_name=first_name,
|
||
|
last_name=last_name,
|
||
|
phone_number=secondary_phone.phone_number)
|
||
|
self.derived_bt_device.log.info('Connecting to PSE...')
|
||
|
self.derived_bt_device.pbap_connect()
|
||
|
self.derived_bt_device.log.info('Downloading contacts from PSE...')
|
||
|
current_count = self._wait_and_get_contact_count(
|
||
|
device=self.derived_bt_device,
|
||
|
expected_contact_count=1,
|
||
|
timeout_sec=WAITING_TIMEOUT_SEC)
|
||
|
self.derived_bt_device.log.info('Successfully downloaded %d contact(s).',
|
||
|
current_count)
|
||
|
asserts.assert_equal(
|
||
|
first=current_count,
|
||
|
second=1,
|
||
|
msg=f'Failed to download the contact "{full_name}".')
|
||
|
secondary_phone.sl4a.telecomCallNumber(primary_phone.phone_number)
|
||
|
secondary_phone.log.info('Made a phone call to device "%s".',
|
||
|
primary_phone.serial)
|
||
|
primary_phone.log.info('Waiting for the incoming call from device "%s"...',
|
||
|
secondary_phone.serial)
|
||
|
is_ringing = primary_phone.wait_for_call_state(
|
||
|
bt_constants.CALL_STATE_RINGING,
|
||
|
bt_constants.CALL_STATE_TIMEOUT_SEC)
|
||
|
if not is_ringing:
|
||
|
raise signals.TestError(
|
||
|
f'Timed out after {bt_constants.CALL_STATE_TIMEOUT_SEC}s waiting for '
|
||
|
f'the incoming call from device "{secondary_phone.serial}".')
|
||
|
try:
|
||
|
self.derived_bt_device.aud.open_notification()
|
||
|
hfp_address = primary_phone.get_bluetooth_mac_address()
|
||
|
if not self.derived_bt_device.aud(
|
||
|
text=f'Incoming call via HFP {hfp_address}').exists():
|
||
|
raise signals.TestError('The incoming call was not received from '
|
||
|
'the Handsfree device side.')
|
||
|
# Asserts that caller name of the incoming phone call is correct in the
|
||
|
# notification bar.
|
||
|
asserts.assert_true(
|
||
|
self.derived_bt_device.aud(text=full_name).exists(),
|
||
|
f'Caller name is incorrect. Expectation: "{full_name}"')
|
||
|
finally:
|
||
|
# Takes a screenshot for debugging.
|
||
|
self.derived_bt_device.take_screenshot(self.derived_bt_device.log_path)
|
||
|
# Recovery actions.
|
||
|
self.derived_bt_device.aud.close_notification()
|
||
|
secondary_phone.sl4a.telecomEndCall()
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
test_runner.main()
|