Issue
I've inherited some code that utilizes Python Bleak to scan for advertisements emitted from a certain device. Whenever an advertisement from the Bluetooth mac address and service id we're looking for is detected and a certain condition from the extracted payload information is true, we want to terminate and return. In the attached code, I've masked the Bluetooth and service ID:s.
Not being too familiar with the event loop, is there a way to exit before the timer runs out? I suppose there's probably a better way to approach this problem.
Sample code:
import asyncio
import struct
from bleak import BleakScanner
timeout_seconds = 10
address_to_look_for = 'masked'
service_id_to_look_for = 'masked'
def detection_callback(device, advertisement_data):
if device.address == address_to_look_for:
byte_data = advertisement_data.service_data.get(service_id_to_look_for)
num_to_test = struct.unpack_from('<I', byte_data, 0)
if num_to_test == 1:
print('here we want to terminate')
async def run():
scanner = BleakScanner()
scanner.register_detection_callback(detection_callback)
await scanner.start()
await asyncio.sleep(timeout_seconds)
await scanner.stop()
if __name__=='__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Solution
I'm sure there are many ways this can be done. A small mod to your code would be rather than having the asyncio.sleep
for the full period before you stop the scan, you could could have a while loop that ends on time elapsed or device found event.
For example:
import asyncio
import struct
from bleak import BleakScanner
timeout_seconds = 20
address_to_look_for = 'F1:D9:3B:39:4D:A2'
service_id_to_look_for = '0000feaa-0000-1000-8000-00805f9b34fb'
class MyScanner:
def __init__(self):
self._scanner = BleakScanner()
self._scanner.register_detection_callback(self.detection_callback)
self.scanning = asyncio.Event()
def detection_callback(self, device, advertisement_data):
# Looking for:
# AdvertisementData(service_data={
# '0000feaa-0000-1000-8000-00805f9b34fb': b'\x00\xf6\x00\x00\x00Jupiter\x00\x00\x00\x00\x00\x0b'},
# service_uuids=['0000feaa-0000-1000-8000-00805f9b34fb'])
if device.address == address_to_look_for:
byte_data = advertisement_data.service_data.get(service_id_to_look_for)
num_to_test, = struct.unpack_from('<I', byte_data, 0)
if num_to_test == 62976:
print('\t\tDevice found so we terminate')
self.scanning.clear()
async def run(self):
await self._scanner.start()
self.scanning.set()
end_time = loop.time() + timeout_seconds
while self.scanning.is_set():
if loop.time() > end_time:
self.scanning.clear()
print('\t\tScan has timed out so we terminate')
await asyncio.sleep(0.1)
await self._scanner.stop()
if __name__ == '__main__':
my_scanner = MyScanner()
loop = asyncio.get_event_loop()
loop.run_until_complete(my_scanner.run())
Answered By - ukBaz
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.