XBeeモジュールの使い方(画像転送:イベント駆動型receive_callback)


 前回の解説ではコーディネータXBeeが画像データを受信する方法としてreceive()とreceive_callback()を使用した方式を比較し、両者の差異や特徴を解説しました。今回は実際にイベント駆動型のreceive_callback()を使用した画像転送について解説します。

 receive_callbackは XBee3 専用の MicroPython APIであり、他の MicroPython や Python には標準では増備されていません。XBee3 の無線パケット受信をイベント駆動で扱える便利な仕組みです。xbee.receive_callbackはDigi が提供するXBee3のMicroPythonファームウェアならどれでも基本的に使えます。一般のPythonやMicroPythonにない理由は、通常の Python や MicroPython は無線モジュールや通信ファームウェアと直接やりとりしないためです。具体的には、XBee が無線でパケットを受信したときに、自分で用意した関数(コールバック関数)を自動で呼び出してくれる仕組みになっています。

 今回の画像伝送ではルータXBeeとコーディネータXBeeのダームウェアはDigiMeshに設定しました。いずれのXBeeもBD UART Baud Rateは 115200[7]に設定します。

具体的な画像転送の説明の前に、receive_callbackを使用したコードの一例を示します。

  import xbee
  def on_receive(pkt):
      print("Received payload:", pkt['payload'])
  xbee.receive_callback(on_receive)

これで、XBee がパケットを受信した瞬間にon_receiveが呼ばれます。

 以前使用したMonaLisa.png画像を送信するルータXBeeのMicroPythonのサンプルコードを以下に示します。4行目のコーディネータXBeeの64ビットアドレスはご自分のアドレスに変更してください。DigiMeshファームウェアは802.15.4ファームウェアと同様にファイルのパスを指定する際に/flash/が必要です。

import xbee
import os
import utime

COORD_ADDR = b'\x00\x13\xA2\x00\x41\xAE\x36\x69'
CHUNK_SIZE = 64
FILE_PATH = "/flash/MonaLisa.png"

def send_file():
    if FILE_PATH.split('/')[-1] not in os.listdir('/flash'):
        print("No file!")
        return

    with open(FILE_PATH, "rb") as f:
        while True:
            chunk = f.read(CHUNK_SIZE)
            if not chunk:
                break
            xbee.transmit(COORD_ADDR, chunk)
            utime.sleep(0.1)

    # 送信完了後に終了チャンクを送信
    xbee.transmit(COORD_ADDR, b"__EOF__")
    print("Send complete")

send_file()

画像を受信するコーディネータXBeeのMicroPythonのサンプルコードを以下に示します。

import xbee

FILE_PATH = "/flash/MonaLisa.png"
RECEIVED_CHUNKS = []

def on_receive(pkt):
    payload = pkt['payload']
    if not payload:
        print("Invalid payload")
        return

    if payload == b"__EOF__":
        print("EOF detected. Saving file.")
        save_file()
    else:
        RECEIVED_CHUNKS.append(payload)
        print("Received chunk. Total:", len(RECEIVED_CHUNKS))

def save_file():
    with open(FILE_PATH, "wb") as f:
        for chunk in RECEIVED_CHUNKS:
            f.write(chunk)
    print("File save completed!:", FILE_PATH)
    RECEIVED_CHUNKS.clear()

# コールバックを登録
xbee.receive_callback(on_receive)

# スクリプトがすぐ終わらないようにループ
while True:
    pass

 コーディネータXBeeのコードを実行してからルータXBeeのコードを実行した際にMicroPython Terminalの状況です。送信が完了したことを示しています。

 以下の図はコーディネータXBeeのMicroPython Terminalの状況です。複数のチャンクが受信され、画像が保存されたことを示しています。

 コーディネータXBeeのFile System Managerを確認するとMonaLisa.pngが保存されていることを確認できます。