物聯網開發筆記 (九) Python Server
接著來製作 Python Server 所需的功能,這些邏輯最後將運行在 Jetson-Nano 上。
poetry init
poetry env use python
poetry add paho.mqtt
安裝 GPIO
由於 GPIO 的安裝步驟有一點多,我再另外寫一篇。
如果是先在其他平台開發,可以先從 Github 上拉下 zip 解壓到虛擬環境中的 .venv/Lib
MQTT 連線設定
在此之前,我封裝了 paho.mqtt 庫來更方便我使用,目前可能還有諸多問題,之後有時間再完善它
main.py
def mqtt_thread():
# MQTT Settings
broker = 'mqtt.example.com'
port = 8883
client_id = 'test_client'
username = 'user'
password = 'pass'
mqtt = MQTT(client_id, broker, port, MQTTVersion.MQTTv311)
# 註冊需要的 Topic
mqtt.subscribleTopic('Topic01', 0, lambda msg: print(msg);
mqtt.connect(username, password)
# 建立 Thread 在其他執行緒執行,避免卡住主執行緒
mqttThread = threading.Thread(target=mqtt_thread)
mqttThread.start()
# 這裡可以執行其它事情
mqttThread.join()
我在 Topic 上註冊了兩個 Callback,一個是作為 Server Config 更新,一個是作為灑水控制使用(人工控制)。
上一篇有說明抓氣象資料,我這邊也就加上去
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
def func:
# 實作邏輯,我在這裡推送至 Line Notify
pass
trigger = CronTrigger(hour=6)
jobId = 'notifyWeather'
scheduler = BackgroundScheduler()
scheduler.add_job(func, trigger, id=jobId)
伺服器設定值
我新增了一支 watercontrol.py 的檔案來控制灑水
class WaterControl:
# 定義數值
_watering_duration: int = 5 * 60 # 灑水時長 (秒數)
_is_watering: bool = False # 是否正在灑水
_can_water_today: bool = True # 今日是否可以灑水
_auto_watering_enabled: bool = True # 是否啟用自動灑水
# 外部設備
_first_relay_pin: int = 40 # GPIO Pin Number
# 其他定義
_is_init_finish: bool = False # 是否完成初始化
數值預設值設定為取不到伺服器資料前提下,至少還可以每日的灑水。
因為後來的需求增加希望可以定義一週只有 1,3,5會灑水,其餘都不用,為了增加彈性,我將數值拉到伺服器上,屆時想藉由 APP 變成2,4,6也可以。
def __init__(self) -> None:
log('建立 WaterControl,初始化中 ...')
self.getServerData()
這邊的 log
是我自己寫的,因為 log 資訊我不打算儲存在任何的 database,但又希望可以遠端查看 log,所以我將所有 log 透過 http 轉送到 Discord 上,不需要的可以改用 print
。
import request
def getServerData(self) -> None:
"""
呼叫請求伺服器設定
"""
response = requests.get('Server URL')
if (response.status_code == 202):
log('Server 回傳 202,等待資料進行更新動作')
else:
log('請求伺服器設定失敗')
pass # 這邊有空要加防呆,再次請求等
我的邏輯是伺服器接收到請求時,就會先回傳 202,Server 會透過 MQTT 傳來資料,這邊目前也還沒加上防呆,應該要再判斷若沒有收到資料的狀況。
import threading
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
# 定義變數
_water_plant_scheduler = BackgroundScheduler()
_cancel_watering_timer: threading.Timer = None
def receive_server_settings(self, json: dict) -> None:
"""
接收伺服器設定的 function(callable)
"""
if json['watering_duration'] is not None:
self._watering_duration = json['watering_duration']
... 以此類推
if self._is_init_finish is False:
self._is_init_finsh = True
log(f'伺服器設定更新完畢 json = {json}')
# 排程工作
water_plant_job_id = 'waterPlant'
cronData = json['watering_time'].split(':')
trigger = CronTrigger(hour=cronData[0], minute=cronData[1], second=cronData[2]
job = self._water_plant_scheduler.get_job(water_plant_job_id)
# 判斷排程工作是否存在
if job is None:
log('排程工作不存在,已新增')
self._water_plant_scheduler.add_job(self.start, trigger, id=water_plant_job_id)
else:
log('排程工作已存在,進行更新')
self._water_plant_scheduler.reschedule_job(job_id=water_plant_job_id, trigger=trigger)
# 啟動排程
if self._water_plant_scheduler.running is False:
self._water_plant_scheduler.start()
log('排程已執行')
這邊還有一些小缺點,即不管伺服器數值是否有變動,排程工作依然會進行更新,之後有機會再完善一下。
只要將上面的 function 註冊進 topic 更新時觸發就可以。
def start(self, manual: bool = False, user: str = None):
"""
啟動灑水
Args:
manual (bool, optional): 是否為手動控制灑水 Default to False
user (str, optional): 控制的使用者名稱 Default to None
"""
if self._is_watering:
log('灑水已進行中,控制無效')
return
if not self._can_water_today and manual is False:
log('今日為不可灑水的日子,灑水不執行')
return
self._is_watering = True
# 這裡執行 GPIO
# 啟動停止執行緒,於指定時間後調用 stop function
self._cancel_watering_timer = threading.Timer(self._watering_duration, self.stop)
self._cancel_watering_timer.start()
# 推送 Notify
if manual:
msg = f'這是由 {user} 發起的手動強制灑水'
log(msg, transferLine = true)
else:
msg = '系統開始進行灑水'
log(msg, transferLine = true)
這邊之後有需要可以再進行其他偵測,例如濕度達標等等隨時終止灑水。
def stop(self, user: str = None):
"""
關閉灑水
Args:
user (str, optional): 控制的使用者名稱 Default to None
"""
if not self._is_watering:
return
if self._cancel_watering_timer is not None and self._cancel_watering_timer.is_alive():
log('系統已關閉灑水')
self._cancel_watering_timer.cancel()
self._cancel_watering_timer = None
# 這裡執行 GPIO
self._is_watering = False
# 推送 Notify
if user is not None:
msg = f'這是由 {user} 發起的強制關閉灑水'
log(msg, transferLine = true)
else:
msg = '系統關閉灑水'
log(msg, transferLine = true)
以上大概是 Python Server 的處理,本文目前尚未撰寫完,還需要補 GPIO 的地方,因為目前都是在 Jetson Nano 外撰寫,等到我確定 Jetson Nano 上可以運作,再上來補。