基于websocket的跨平台通信——iPhone/iPad/Mac控制树莓派(四):树莓派端代码结构优化

基于websocket的跨平台通信——iPhone/iPad/Mac控制树莓派

瞎扯

我的本科专业是大数据(美其名曰大数据),在各种数据分析任务中只有一个要求:能跑出来结果,或者说能比较快地跑出结果;最多也就考虑考虑时间复杂度,什么继承多态高内聚低耦合从来就没有考虑过;反正这种东西不暴露给用户,都快被算法折磨死了谁还管代码结构。

直到我接触了传统的互联网开发,比如后端和iOS开发,要面对随时增加需求的甲方。我的第一个后端开发项目框架写的稀烂,牵一发而动全身,导致我差点跑路;至此我明白了项目框架的重要性,以及面向对象和函数式是多么的

优雅。

不想看我瞎扯跳到这

优化定时发送数据

在之前我们实现的发送数据中,数据在主线程循环发送,通过time.sleep()来进行延时。

那么问题就来了:
如果我们发送的不止一个MasterControl的数据呢?如果每一种数据发送的时间间隔不一样呢?以及通过while True阻塞了主线程,开销大不说,要干些别的事情怎么办?

多线程肯定是要用上的。

那么问题又来了:
如果某些数据有时需要发送,有时又不需要发送,使用多线程需要额外封装一个Thread的子类(相对方便),还需要额外写一个小框架来维护这些线程…

APScheduler任务管理

全称为Advanced Python Scheduler,作用为在指定的时间规则执行指定的作业。

安装

pip3 install apscheduler

本人不才,写不出来比较好的介绍APScheduler框架的教程,所以各位可以参考以下博客:
花10分钟让你彻底学会Python定时任务框架apscheduler

这里我就默认各位看完了上面这篇博客了。

使用

最简单的使用当然就是定义一个全局的scheduler,然后在封装的websocket类的on_open()函数中添加发送网络延迟的任务:

# 使用BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


scheduler = BackgroundScheduler({
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '50'                             # 最大进程数
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '20'                             # 最大进程数
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '10',     # 作业最大实例数
    'apscheduler.timezone': 'UTC',
})

# ......

class WmSocket:
	# ......
	def on_open(ws):
		scheduler.add_job(
            ws.send,
            args=[netDelay.get(),], 
            id='NetWorkDelay', 
            trigger='interval', 
            seconds=netDelay.sendTime, 
            replace_existing=True
        )
		
		scheduler.start()
	# ......

这样发送网络延迟的过程就变成多线程,也可以同时管理发送多条数据。

当然,这么写还是略显简陋了,我们稍后再对它做进一步的优化;先来看到之前我提到(但是为了赶稿子就一笔带过的)的 平台与设备 的概念。

以下仅代表个人思路


平台(Platform)与设备(Device)

在之前我简单提及了一下我关于设备(Device)和平台(Platform)的设计思路;当时为了图快(写文章不比写代码轻松)就简简单单带过了;这里我重新细说。

举个例子;一辆无人小车就是一个平台,这个平台拥有很多设备,例如:用树莓派做的主控、驱动小车移动的电机、转向舵机、摄像头,等等。

每个设备仅属于一个平台,平台的作用是维护设备的字典(一个设备在一个平台中名称唯一);平台的名称唯一,作用是让后端区分数据的来源和去向。

设备Device

在树莓派的Python代码中,我定义了一个Device父类:

import json

class Device:
	# 设备名称
    deviceName = "Device"
	# 间隔多少秒向后端发送一次该设备的数据,后面会提到;为0时不主动发送
    sendTime = 3
    # 以JSON格式返回该设备待发送给后端的数据
    def get(self):
        return(dict(msg = "empty"))
	# 发送数据;通常这个函数不需要重写
	# ws是什么后面会解释,现在把它当成传入的websocket调用发送函数就行了
    def send(self, ws):
        ws.send(self.get())
    # 该设备接收到数据时调用,msg为JSON格式数据,之后会说到
    def onMsg(self, msg):
        print(msg)

接下来我们再来写一个获取树莓派CPU使用率、内存使用率等信息的类:

MasterControl.py

# MasterControl.py

# pip3 install psutil
import psutil
import json
from device.Device import Device


class MasterControl(Device):
    deviceName = "MasterControl"
    isPerRate = False
    sendTime = 3        # 默认n秒发送一次

    # CPU使用率
    def getCPURate(self):
        return psutil.cpu_percent(interval=1,percpu=self.isPerRate)

    # 内存使用率
    def getVitMemoryRate(self):
        return psutil.virtual_memory()[2]

    # 电池状态,返回(电量, 是否充电)
    def getBatteryMsg(self):
        # return psutil.sensors_battery()
        # 树莓派没有电池(至少我暂时没有外接)
        return [0, 0, True]

    # CPU温度 (M1暂时没有办法读取)
    def getCPUTemp(self):
        temps = psutil.sensors_temperatures()
        return temps['cpu_thermal'][0].current

    def get(self):
        batteryMsg = self.getBatteryMsg()
        _text = json.dumps(dict(
            cpuRate = self.getCPURate(),
            virtualMemory = self.getVitMemoryRate(),
            batteryQuantity = batteryMsg[0],
            isCharging = batteryMsg[2],
            CPUTemp = self.getCPUTemp()))

        return json.dumps(dict(type = 1, toPlatform = ["WMBP"], msgType = "MasterControl", msg = _text))

然后我们定义一个枚举保存所有设备的名称,同时定义一个字典,key为设备名称,value为对应的设备类:

Devices.py

# Deivces.py
from enum import Enum
from device import NetworkDelay, MasterControl


class DeviceName(Enum):
    NetDelay = 1
    MasterControl = 2
    
class Devices:
    deviceList = {
        DeviceName.NetDelay.name: NetworkDelay.netDelay,
        DeviceName.MasterControl.name: MasterControl.masterControl,
    }

devices = Devices()

这样一切准备工作都做好了。让我们再次回到APScheduler那去:

APScheduler第二次优化

将APScheduler封装成一个工具类,同时在websocket工具类中删去对于APScheduler的定义:

# APScheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

class Apscheduler:
    scheduler = BackgroundScheduler({
        'apscheduler.executors.default': {
            'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
            'max_workers': '50'                             # 最大进程数
        },
        'apscheduler.executors.processpool': {
            'type': 'processpool',
            'max_workers': '20'                             # 最大进程数
        },
        'apscheduler.job_defaults.coalesce': 'false',
        'apscheduler.job_defaults.max_instances': '10',     # 作业最大实例数
        'apscheduler.timezone': 'UTC',
    })
	
	'''
		websocket这个库有一个非常反人类的设计:websocket.WebSocketApp()的实体类是没有.send()函数的,
	必须使用on_open()等方法传入的ws参数调用send()函数...想在on_open()之外调用ws.send()就必须继续传递ws。
		所以__init__()这里的ws必须从on_open()函数中传递过来。
	'''
    def __init__(self, ws) -> None:
        self.ws = ws
        
    # device是传递进来的Device子类实例
    def startTransDevice(self, device):
        self.scheduler.add_job(
            device.send,
            # 将ws传递给Device的send()函数
            args=[self.ws,], 
            id=device.deviceName, 
            trigger='interval', 
            seconds=device.sendTime, 
            replace_existing=True
        )

# 这里传入None是为了既满足aps是全局变量方便调用,又能让它在websocket中初始化以传递ws参数
aps = Apscheduler(None)

然后我们回到websocket工具类的on_open()函数中:

from Apscheduler import aps
from Devices import devices
import websocket

class WmSocket:
	# ......
	def on_open(ws):
		aps.ws = ws
		
		for key, value in devices.deviceList.items():
            if value.sendTime != 0:
                aps.startTransDevice(value)
		
		# print测试一下
		aps.scheduler.print_jobs()
		scheduler.start()
	# ......

这样关于设备数据发送的框架就搭建起来了;如果需要实现新的设备加入,只需要以下两步:

  1. 定义设备类,继承Device,定义设备名称、信息发送间隔、数据获取函数;
  2. 在DeviceName这个枚举中添加设备名称,以及在Devices.deviceList中添加这个设备的键值。

如果设备比较特殊,存在其它的需要定时的功能,例如电机的PID控制,另作讨论。

平台Platform

由于树莓派端肯定是(至少我是这么干的)作为一个设备的主控的,所以在树莓派端的代码中并没有体现平台的概念,因为所有代码的集合就是平台;这个概念更多的是在控制端体现。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>