物聯網開發筆記 (九) Python Server

接著來製作 Python Server 所需的功能,這些邏輯最後將運行在 Jetson-Nano 上。

poetry init
poetry env use python
poetry add paho.mqtt
初始化專案,匯入所需依賴

安裝 GPIO

由於 GPIO 的安裝步驟有一點多,我再另外寫一篇。

如果是先在其他平台開發,可以先從 Github 上拉下 zip 解壓到虛擬環境中的 .venv/Lib

MQTT 連線設定

在此之前,我封裝了 paho.mqtt 庫來更方便我使用,目前可能還有諸多問題,之後有時間再完善它

GitHub - hsiehyunju/python-paho-mqtt-tools: 封裝 paho mqtt 的功能
封裝 paho mqtt 的功能. Contribute to hsiehyunju/python-paho-mqtt-tools development by creating an account on GitHub.

main.py

def mqtt_thread():
	# MQTT Settings
    broker = 'mqtt.example.com'
    port = 8883
    client_id = 'test_client'
    username = 'user'
    password = 'pass'
    
    mqtt = MQTT(client_id, broker, port, MQTTVersion.MQTTv311)
    
    # 註冊需要的 Topic
    mqtt.subscribleTopic('Topic01', 0, lambda msg: print(msg);
    
    mqtt.connect(username, password)
    
# 建立 Thread 在其他執行緒執行,避免卡住主執行緒
mqttThread = threading.Thread(target=mqtt_thread)
mqttThread.start()

# 這裡可以執行其它事情

mqttThread.join()
連線至 mqtt

我在 Topic 上註冊了兩個 Callback,一個是作為 Server Config 更新,一個是作為灑水控制使用(人工控制)。

物聯網開發筆記 (八) 串中央氣象局 API
因應手動控制的需求,再串上天氣預報等資料,提供手動時給予控制人的判斷。
抓取氣象局 API

上一篇有說明抓氣象資料,我這邊也就加上去

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

def func:
	# 實作邏輯,我在這裡推送至 Line Notify
	pass

trigger = CronTrigger(hour=6)
jobId = 'notifyWeather'

scheduler = BackgroundScheduler()
scheduler.add_job(func, trigger, id=jobId)
設定背景排程

伺服器設定值

我新增了一支 watercontrol.py 的檔案來控制灑水

class WaterControl:
	
    # 定義數值
    _watering_duration: int = 5 * 60 	# 灑水時長 (秒數)
    _is_watering: bool = False 			# 是否正在灑水
    _can_water_today: bool = True 		# 今日是否可以灑水
    _auto_watering_enabled: bool = True # 是否啟用自動灑水
    
    # 外部設備
    _first_relay_pin: int = 40			# GPIO Pin Number
    
    # 其他定義
    _is_init_finish: bool = False 		# 是否完成初始化
上面是關於伺服器控制的數值定義

數值預設值設定為取不到伺服器資料前提下,至少還可以每日的灑水。

因為後來的需求增加希望可以定義一週只有 1,3,5會灑水,其餘都不用,為了增加彈性,我將數值拉到伺服器上,屆時想藉由 APP 變成2,4,6也可以。

def __init__(self) -> None:
	log('建立 WaterControl,初始化中 ...')
    self.getServerData()
建立 Class Init 建構子

這邊的 log 是我自己寫的,因為 log 資訊我不打算儲存在任何的 database,但又希望可以遠端查看 log,所以我將所有 log 透過 http 轉送到 Discord 上,不需要的可以改用 print

import request

def getServerData(self) -> None:
	"""
    呼叫請求伺服器設定
    """
    response = requests.get('Server URL')
    if (response.status_code == 202):
    	log('Server 回傳 202,等待資料進行更新動作')
    else:
    	log('請求伺服器設定失敗')
        pass # 這邊有空要加防呆,再次請求等
建立取得伺服器資料的方法

我的邏輯是伺服器接收到請求時,就會先回傳 202,Server 會透過 MQTT 傳來資料,這邊目前也還沒加上防呆,應該要再判斷若沒有收到資料的狀況。

import threading
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

# 定義變數
_water_plant_scheduler = BackgroundScheduler()
_cancel_watering_timer: threading.Timer = None

def receive_server_settings(self, json: dict) -> None:
	"""
    接收伺服器設定的 function(callable)
    """
    
    if json['watering_duration'] is not None:
    	self._watering_duration = json['watering_duration']
        
    ... 以此類推
    
    if self._is_init_finish is False:
    	self._is_init_finsh = True
        
    log(f'伺服器設定更新完畢 json = {json}')
    
    # 排程工作
    water_plant_job_id = 'waterPlant'
    cronData = json['watering_time'].split(':')
    
    trigger = CronTrigger(hour=cronData[0], minute=cronData[1], second=cronData[2]
    job = self._water_plant_scheduler.get_job(water_plant_job_id)
    
    # 判斷排程工作是否存在
    if job is None:
    	log('排程工作不存在,已新增')
        self._water_plant_scheduler.add_job(self.start, trigger, id=water_plant_job_id)
    else:
    	log('排程工作已存在,進行更新')
        self._water_plant_scheduler.reschedule_job(job_id=water_plant_job_id, trigger=trigger)
        
    # 啟動排程
    if self._water_plant_scheduler.running is False:
    	self._water_plant_scheduler.start()
        log('排程已執行')
定義伺服器更新 callable

這邊還有一些小缺點,即不管伺服器數值是否有變動,排程工作依然會進行更新,之後有機會再完善一下。

只要將上面的 function 註冊進 topic 更新時觸發就可以。

def start(self, manual: bool = False, user: str = None):
	"""
    啟動灑水
    
    Args:
    	manual (bool, optional): 是否為手動控制灑水 Default to False
        user (str, optional): 控制的使用者名稱 Default to None
    """
    
    if self._is_watering:
    	log('灑水已進行中,控制無效')
        return
    
    if not self._can_water_today and manual is False:
    	log('今日為不可灑水的日子,灑水不執行')
        return
        
    self._is_watering = True
    
    # 這裡執行 GPIO
    
    # 啟動停止執行緒,於指定時間後調用 stop function
    self._cancel_watering_timer = threading.Timer(self._watering_duration, self.stop)
    self._cancel_watering_timer.start()
    
    # 推送 Notify
    if manual:
    	msg = f'這是由 {user} 發起的手動強制灑水'
    	log(msg, transferLine = true)
    else:
    	msg = '系統開始進行灑水'
        log(msg, transferLine = true)
    
執行灑水的 function

這邊之後有需要可以再進行其他偵測,例如濕度達標等等隨時終止灑水。

def stop(self, user: str = None):
	"""
    關閉灑水
    
    Args:
    	user (str, optional): 控制的使用者名稱 Default to None
    """
    
    if not self._is_watering:
    	return
    
    if self._cancel_watering_timer is not None and self._cancel_watering_timer.is_alive():
    	log('系統已關閉灑水')
        self._cancel_watering_timer.cancel()
        self._cancel_watering_timer = None
    
    # 這裡執行 GPIO
    
    self._is_watering = False
    
    # 推送 Notify
    if user is not None:
    	msg = f'這是由 {user} 發起的強制關閉灑水'
    	log(msg, transferLine = true)
    else:
    	msg = '系統關閉灑水'
        log(msg, transferLine = true)
    	
關閉灑水 function

以上大概是 Python Server 的處理,本文目前尚未撰寫完,還需要補 GPIO 的地方,因為目前都是在 Jetson Nano 外撰寫,等到我確定 Jetson Nano 上可以運作,再上來補。