Bluetooth

概要

ATOM Liteで、WindowsPCとBluetooth通信。

部品

  • M5Stack ATOM Lite
  • Windows PC

プログラム

ATOM Lite側(MicroPython)

  • 下記サイトの[ESP32_BLE2sample.zip]の中のble_simple_peripheral.pyを用いる
  • 実行には、同封されているble_advertising.pyが必要

ESP32-WROOM-32EでMicroPythonを使い開発(BLE編2)
http://zattouka.net/GarageHouse/micon/ESP/ESP32-WROOM-32E/ESP32E_BLE2.htm

# この例は、UARTペリフェラルを示しています。
# アドバータイジングはセントラルが応答するまで500ms間隔で送信する
import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_MTU_EXCHANGED = const(21)
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
    bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
    #  (値を通知)
    bluetooth.FLAG_NOTIFY,
)
_UART_RX = (
    bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
    #  (書き込み専用)          (応答の返答はしません)
    bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE,
)
_UART_SERVICE = (
    _UART_UUID,
    (_UART_TX, _UART_RX),
)
class BLESimplePeripheral:
    def __init__(self, ble, name="mpy-uart"):
        self._ble = ble
        self._ble.active(True)            # bluetoothをアクティブにする
        # MTU値これで変更はできる様だが...送信は128-3で送れるみたいです。
        # でもぉ、クライアントからは20byteしか送れないがぁ...なぜだろう??
        self._ble.config(mtu=128)
        self._ble.irq(self._irq)          # イベントのコールバック関数を登録する
        # UART サービス(TX,RX)を登録します。
        ((self._handle_tx, self._handle_rx),) = self._ble.gatts_register_services((_UART_SERVICE,))
        self._connections = set()         # 空のセット(集合を表すデータ型)を生成する
        self._write_callback = None       # クライアントの書き込みは無視する設定だと思う?
        # アドバータイジングパケットを作成します。
        self._payload = advertising_payload(name=name, services=[_UART_UUID])
        self._advertise()                 # アドバタイジングを始める
    def _irq(self, event, data):
        # イベント通知を受信できる様に接続を追跡します。
        if event == _IRQ_CENTRAL_CONNECT:
            # セントラルがこのペリフェラル機器に接続しました。
            conn_handle, _, _ = data
            print("New connection", conn_handle)   # 新しい接続
            self._connections.add(conn_handle)     # セットにコネクションハンドルを追加する
        elif event == _IRQ_CENTRAL_DISCONNECT:
            #  セントラルがこのペリフェラル機器から切断しました。
            conn_handle, _, _ = data
            print("Disconnected", conn_handle)     # 切断されました
            self._connections.remove(conn_handle)  # セットからコネクションハンドルを削除する
            self._advertise()                      # 新しい接続を許可する為に、アドバタイジングを再開始します。
        elif event == _IRQ_GATTS_WRITE:
            # クライアントがこのcharacteristic又はdescriptorに書き込みました。
            print('GATTS_WRITE')
            conn_handle, value_handle = data
            value = self._ble.gatts_read(value_handle)  # 書き込まれたデータを読み込む
            if value_handle == self._handle_rx and self._write_callback:
                self._write_callback(value)
        elif event == _IRQ_MTU_EXCHANGED:
            # ATT MTU交換が完了しました
            conn_handle, mtu = data
            print("MTU_EXCHANGED", mtu)
    # 接続されているクライアントにdataを送信します。
    # gatts_write(self._handle_tx, data)で属性(ATT)に書込めばクライアントからの"read"も出来ます。
    def send(self, data):
        if self.is_connected():
            for conn_handle in self._connections:
                self._ble.gatts_notify(conn_handle, self._handle_tx, data)
    # セットにコネクションハンドルが有れば接続されていると判断する
    def is_connected(self):
        return len(self._connections) > 0
    # アドバタイジングを開始します。
    def _advertise(self, interval_us=500000):
        print("Starting advertising")
        self._ble.gap_advertise(interval_us, adv_data=self._payload)
    # クライアントからの書き込みを読み出す為のコールバック設定
    def on_write(self, callback):
        self._write_callback = callback
# 使い方のサンプルデモ
def demo():
    ble = bluetooth.BLE()
    p = BLESimplePeripheral(ble)
    # クライアントからの書き込みを読み出すコールバック関数
    def on_rx(v):
        if p.is_connected():
            print("RX", v)
            time.sleep_ms(100)
            p.send(v) #loopback

    p.on_write(on_rx)
    i = 0
    while True:
##        if p.is_connected():
##            # 接続されたならデータを送信する
##            for _ in range(3):
##                data = str(i) + "_"
##                print("TX", data)
##                p.send(data)
##                i += 1
        # 1秒毎の送信
        time.sleep_ms(1000)
if __name__ == "__main__":
    demo()

PC側(Python)

mpy-uartを検索して接続し、input()で入力したテキストをそのまま受信して終了

import asyncio
from bleak import BleakScanner, BleakClient
UART_UUID = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
TX_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
RX_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
def notif_hndlr(_, data):
      """Simple notification handler which prints the data received."""
      print(data.decode())
async def main():
    print('Searching devices...')
    devices = await BleakScanner.discover()
    found = False
    for d in devices:
        print(d.address,d.name,d.rssi,d.metadata)
        if d.name == 'mpy-uart':
            address = d.address
            uuid = d.metadata['uuids'][0]
            found = True
            print('Found mpy-uart')
            break
    if not found:
        print('Cannot find mpy-urat')
        return
    print(address,uuid)
    print('Connecting...')
    async with BleakClient(address) as client:
        print("Start notify")
        await client.start_notify(TX_UUID, notif_hndlr)
        print("Sending Message...")
        v = input()
        await client.write_gatt_char(RX_UUID, v.encode())
        print("Stopping notify after 3s")
        await asyncio.sleep(3)
        await client.stop_notify(TX_UUID)
        print("fin")
asyncio.run(main())

詳細

ATOM Lite:MicroPythonでBluetooth
https://tiblab.net/blog/2022/01/atom-lite_bluetooth/

参照サイト

ESP32-WROOM-32EでMicroPythonを使い開発(BLE編2)
http://zattouka.net/GarageHouse/micon/ESP/ESP32-WROOM-32E/ESP32E_BLE2.htm

Bleak -kembo
https://scrapbox.io/kembo/Bleak

M5StackでBLEを使う
https://qiita.com/nsawa/items/2afd4ae9c10af87d0133

ページトップへ