先日、購入しましたM5Stack ATOM Lite(ESP32搭載)でPCとBluetooth通信をするところまでやってみました。
言語は、ATOMLiteはMicroPython、PCはPythonです。
ATOMをペリフェラル(周辺機器)、PCをセントラルとし、ATOM側はMicroPython、PC側はPythonとBleakモジュールで通信させます。
今回、PC側からテキストを送って、ATOMを操作したり、センサ値を取得したりしたかったので、PCからテキスト受信すると、そのまま返して(ループバック)、PC側で受信できるか確かめました。
ATOM側(MicroPython)
まずATOM側ですが、ベースとなるスクリプトは下記のを使わせてもらいました。
ESP32-WROOM-32EでMicroPythonを使い開発(BLE編2)
http://zattouka.net/GarageHouse/micon/ESP/ESP32-WROOM-32E/ESP32E_BLE2.htm
上記サイトの[ESP32_BLE2sample.zip]の中のble_simple_peripheral.pyをほとんどそのままですが、ATOMから一方的にデータを送信する部分をコメントアウトし、テキストを受け取ったらそのまま返す処理(ループバック)を追加しました。ここをテキストの内容によって処理を分岐させれば、ATOMを操作できますね。
なお、このスクリプトの実行には、同封されているble_advertising.pyが必要です。
# この例は、UARTペリフェラルを示しています。
# アドバータイジングはセントラルが応答するまで500ms間隔で送信する
import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_MTU_EXCHANGED = const(21)
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
# (値を通知)
bluetooth.FLAG_NOTIFY,
)
_UART_RX = (
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
# (書き込み専用) (応答の返答はしません)
bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE,
)
_UART_SERVICE = (
_UART_UUID,
(_UART_TX, _UART_RX),
)
class BLESimplePeripheral:
def __init__(self, ble, name="mpy-uart"):
self._ble = ble
self._ble.active(True) # bluetoothをアクティブにする
# MTU値これで変更はできる様だが...送信は128-3で送れるみたいです。
# でもぉ、クライアントからは20byteしか送れないがぁ...なぜだろう??
self._ble.config(mtu=128)
self._ble.irq(self._irq) # イベントのコールバック関数を登録する
# UART サービス(TX,RX)を登録します。
((self._handle_tx, self._handle_rx),) = self._ble.gatts_register_services((_UART_SERVICE,))
self._connections = set() # 空のセット(集合を表すデータ型)を生成する
self._write_callback = None # クライアントの書き込みは無視する設定だと思う?
# アドバータイジングパケットを作成します。
self._payload = advertising_payload(name=name, services=[_UART_UUID])
self._advertise() # アドバタイジングを始める
def _irq(self, event, data):
# イベント通知を受信できる様に接続を追跡します。
if event == _IRQ_CENTRAL_CONNECT:
# セントラルがこのペリフェラル機器に接続しました。
conn_handle, _, _ = data
print("New connection", conn_handle) # 新しい接続
self._connections.add(conn_handle) # セットにコネクションハンドルを追加する
elif event == _IRQ_CENTRAL_DISCONNECT:
# セントラルがこのペリフェラル機器から切断しました。
conn_handle, _, _ = data
print("Disconnected", conn_handle) # 切断されました
self._connections.remove(conn_handle) # セットからコネクションハンドルを削除する
self._advertise() # 新しい接続を許可する為に、アドバタイジングを再開始します。
elif event == _IRQ_GATTS_WRITE:
# クライアントがこのcharacteristic又はdescriptorに書き込みました。
print('GATTS_WRITE')
conn_handle, value_handle = data
value = self._ble.gatts_read(value_handle) # 書き込まれたデータを読み込む
if value_handle == self._handle_rx and self._write_callback:
self._write_callback(value)
elif event == _IRQ_MTU_EXCHANGED:
# ATT MTU交換が完了しました
conn_handle, mtu = data
print("MTU_EXCHANGED", mtu)
# 接続されているクライアントにdataを送信します。
# gatts_write(self._handle_tx, data)で属性(ATT)に書込めばクライアントからの"read"も出来ます。
def send(self, data):
if self.is_connected():
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle, self._handle_tx, data)
# セットにコネクションハンドルが有れば接続されていると判断する
def is_connected(self):
return len(self._connections) > 0
# アドバタイジングを開始します。
def _advertise(self, interval_us=500000):
print("Starting advertising")
self._ble.gap_advertise(interval_us, adv_data=self._payload)
# クライアントからの書き込みを読み出す為のコールバック設定
def on_write(self, callback):
self._write_callback = callback
# 使い方のサンプルデモ
def demo():
ble = bluetooth.BLE()
p = BLESimplePeripheral(ble)
# クライアントからの書き込みを読み出すコールバック関数
def on_rx(v):
if p.is_connected():
print("RX", v)
time.sleep_ms(100)
p.send(v) #loopback
p.on_write(on_rx)
i = 0
while True:
## if p.is_connected():
## # 接続されたならデータを送信する
## for _ in range(3):
## data = str(i) + "_"
## print("TX", data)
## p.send(data)
## i += 1
# 1秒毎の送信
time.sleep_ms(1000)
if __name__ == "__main__":
demo()
PC側(Python)
mpy-uartを検索して接続し、input()で入力したテキストをそのまま受信して終了です。
import asyncio
from bleak import BleakScanner, BleakClient
UART_UUID = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
TX_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
RX_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
def notif_hndlr(_, data):
"""Simple notification handler which prints the data received."""
print(data.decode())
async def main():
print('Searching devices...')
devices = await BleakScanner.discover()
found = False
for d in devices:
print(d.address,d.name,d.rssi,d.metadata)
if d.name == 'mpy-uart':
address = d.address
uuid = d.metadata['uuids'][0]
found = True
print('Found mpy-uart')
break
if not found:
print('Cannot find mpy-urat')
return
print(address,uuid)
print('Connecting...')
async with BleakClient(address) as client:
print("Start notify")
await client.start_notify(TX_UUID, notif_hndlr)
print("Sending Message...")
v = input()
await client.write_gatt_char(RX_UUID, v.encode())
print("Stopping notify after 3s")
await asyncio.sleep(3)
await client.stop_notify(TX_UUID)
print("fin")
asyncio.run(main())
参考サイト:
Bleak -kembo
https://scrapbox.io/kembo/Bleak
M5StackでBLEを使う
https://qiita.com/nsawa/items/2afd4ae9c10af87d0133
余談ですが、PCに入っていたPython3.6ではBleakのインストールができず、新たにPython3.9を入れることでBleakが使えました。
さらに余談ですが、しばし詰まった原因がWindows10の不具合で、bluetoothがオンになっているのに、bluetooth端末を検出しない現象が起きました。スリープや休止状態からの復帰後に起きるようで、デバイスマネージャーからbluetoothを一旦無効にして、有効にし直すと正常に動作します。
ソースコードが悪いのかと思い、悩んだじゃないかー。
