ESP32のMicroPythonスクリプトをWiFi経由で更新

今回、システムのプログラム更新をいちいちモノを回収して行うのが面倒なので、PHPサーバにスクリプト(MicroPython)をアップロードし、ESP32(ATOM Lite)からPHPサーバにアクセスして、スクリプトの更新を行うといったことをしました。

オンラインでプログラムを更新することをOver The Airで、OTAと呼ぶらしい。

 

■ 参考サイト

ESP32/ESP8266: MicroPython OTA Updates via PHP Server
https://randomnerdtutorials.com/esp32-esp8266-micropython-ota-updates/

PHPサーバを使いますが、サーバ立ち上げ手順は取り上げませんので、その解説は他サイト参照してください。
ちなみに、上記、参考サイトでは、RaspberryPiでLAN内サーバを立ち上げる手順を解説してます。

WAN上のPHPサーバでも可能です。セキュリティ上の対策は別途必要ですが。

 

■ PHPスクリプト

PHPサーバサイドのスクリプトは、更新スクリプトを取得するget_ESP_data.phpと、更新済みスクリプトを削除するdelete_ESP_data.phpを作成します。

get_ESP_data.php

<?php
error_reporting(0);
$file = $_GET['file'];
$dir = getcwd();
$file = $dir.'/'.$file;
$myfile = fopen($file, 'r') or die('FAIL');
echo file_get_contents($file);
fclose($myfile);
?>

 

delete_ESP_data.php

<?php
$file = $_GET['file'];
$dir = getcwd();
$file = $dir.'/'.$file;
if(file_exists($file)){
	unlink($file);
	print('DONE');
}else{
	print('FAIL');
}
?>

 

HTTPS(SSL)だとESP32側で複合化処理が必要なため、HTTPでアクセスしますが、通常設定だとHTTPSにリダイレクトしようとするため、リダイレクトしないようにする必要があります。
APACHEサーバの場合、下記の.htacessを同フォルダに置きます。

.htaccess

RewriteEngine off
Header always unset Strict-Transport-Security
Header add Strict-Transport-Security "max-age=0"

 

おまけに、あると便利な簡易アップローダも載せておきます。

upload.php

<?php
if (is_uploaded_file($_FILES["upfile"]["tmp_name"])) {
  if (move_uploaded_file($_FILES["upfile"]["tmp_name"], $_FILES["upfile"]["name"])) {
    chmod($_FILES["upfile"]["name"], 0644);
   $r = $_FILES["upfile"]["name"] . "をアップロードしました。";
  }
}
?>
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>アップロード</title>
<style type="text/css">
<!--
.upload-area {
    margin: auto;
    width: 85%;
    height: 300px;
    position: relative;
    border: 1px dotted rgba(0, 0, 0, .4);
}
#input-files {
    top: 0;
    left: 0;
    position: absolute;
    width: 100%;
    height: 100%;
}
-->
</style>
</head>
<body>
<form action="upload.php" method="post" enctype="multipart/form-data">
  <div class="upload-area">
    <input type="file" name="upfile" size="30" id="input-files" />
  </div>
  <input type="submit" value="アップロード" />
</form>
<p><?php echo $r ?></p>
</body>
</html>

 

■ MicroPythonスクリプト

main.pyはprogram.pyのmain()をループ処理して、一定間隔が過ぎると、サーバにアクセスして新しいprogram.pyが無いかチェックします。新しいのがあった場合は、program.pyを書き換えて再起動します。

main.py

import machine
import network
import urequests
import utime

#URLs for the updates
upd_url = "http://hogehoge/get_ESP_data.php?file=program.py"
del_url = "http://hogehoge/delete_ESP_data.php?file=program.py"

def do_connect():
    print('Connecting to network...')
    sta_if = network.WLAN(network.STA_IF)
    if not sta_if.isconnected():
        sta_if.active(True)
        sta_if.connect('SSID','PASSWORD') #要変更
        while not sta_if.isconnected():
            pass
        print('connected!')
    else:
        print('already connecting')
    print('network config:', sta_if.ifconfig())

def check_for_updates():
    print('Checking if the file has been updated...')
    do_connect()
    try:
        response = urequests.get(upd_url)
        x = response.text.strip()
        if x == "FAIL":
            print('There are no updates available.')
            return False
        else:
            print('There is an update available')
            return True

    except:
        print('unable to reach internet')
        return False

def program_update():
    print('Downloading update files...')
    #download the update and overwrite program.py
    response = urequests.get(upd_url)
    x = response.text.strip()
    if x != "FAIL":
        #download twice and compare for security
        x = response.text
        response = urequests.get(upd_url)
        if response.text == x:
            f = open("program.py","w")
            f.write(response.text)
            f.flush()
            f.close()
            print('done')
            urequests.get(del_url)
            
            print('reboot now')
            machine.deepsleep(5000)

def main():
    check_interval = 5 #sec
    prev_time = utime.time()-check_interval*2
    while True:
        if(utime.time()-prev_time > check_interval):
            if check_for_updates():
                program_update()
            prev_time = utime.time()
        try:
            import program
            program.main()
        except:
            print('PROGRAM RUN ERROR')

main()

program.py

import utime

def main():
    utime.sleep(1)
    print('Tasks completed')
Updated: 2022年7月11日 — 03:04

コメントを残す

メールアドレスが公開されることはありません。