admin 发布的文章

1 背景

烘焙咖啡豆的过程中,为了更好地测量和记录温度及其变化,使用ESP32C3制作了一个温度监控模块。

2 需求

如果从烘焙咖啡豆的角度去考虑,参照商用机器的设计,这个温度监测会有很多功能要实现。作为起始的设计,还是先简化需求,逐步实现更多功能。所以第一版先确定以下需求:

  • 计时。这个实现起来简单,记录的温度也需要跟时间关联。
  • 读取测量出的温度值。用于展示、分析、记录等。
  • 展示温度变化。显示当前温度值,和温度与时间的曲线。

3 设计

3.1 元件

  • 主控:ESP32C3-Core,刷上MicroPython固件

    • 此开发板很廉价,最低9.9 RMB包邮。
    • 支持WiFi和蓝牙,数据可以很方便地同步到其它设备。
    • 基于MicroPython开发程序,调试很方便。
  • 温度检测模块:MAX6675,K型热电偶温度传感模块,SPI接口。

    • 廉价。
    • 最低精度为0.25 °C,够用。
  • 温度检测探头:K型铠装热电偶,探头为可弯曲、接壳式、304不锈钢材质。

    • 这个探头比温度检测模块附送的灵敏很多。
    • 一般型号为WRNK191。
    • 参考规格:直径1mm,插深50mm,线长500mm。
  • 显示模块:SSD1306,单色OLED屏,0.96英寸,分辨率128x64,两线I2C接口。

    • 廉价。
    • I2C接口的数据线只有两根,减少GPIO的占用。

3.2 接线

这里忽略电源(VCC 3.3V)和接地(GND)的连接。详细如下:

ESP32C3的接口模块接口
GPIO10MAX6675SO
GPIO02MAX6675SCK
GPIO12MAX6675CS
GPIO05SSD1306SCL
GPIO04SSD1306SDA

4 程序

相关代码文件:

  • max6675.py:MAX6675的驱动程序。
  • ssd1306.py:SSD1306的驱动程序。
  • series_list.py:温度数据类,方便后面扩展对温度数据的保存。
  • main.py:主程序。

4.1 MAX6675的驱动程序

# from https://github.com/BetaRavener/micropython-hw-lib/blob/master/MAX6675/max6675.py

import time


class MAX6675:
    MEASUREMENT_PERIOD_MS = 220

    def __init__(self, sck, cs, so):
        """
        Creates new object for controlling MAX6675
        :param sck: SCK (clock) pin, must be configured as Pin.OUT
        :param cs: CS (select) pin, must be configured as Pin.OUT
        :param so: SO (data) pin, must be configured as Pin.IN
        """
        # Thermocouple
        self._sck = sck
        self._sck.off()

        self._cs = cs
        self._cs.on()

        self._so = so
        self._so.off()

        self._last_measurement_start = 0
        self._last_read_temp = 0
        self._error = 0

    def _cycle_sck(self):
        self._sck.on()
        time.sleep_us(1)
        self._sck.off()
        time.sleep_us(1)

    def refresh(self):
        """
        Start a new measurement.
        """
        self._cs.off()
        time.sleep_us(10)
        self._cs.on()
        self._last_measurement_start = time.ticks_ms()

    def ready(self):
        """
        Signals if measurement is finished.
        :return: True if measurement is ready for reading.
        """
        return time.ticks_ms() - self._last_measurement_start > MAX6675.MEASUREMENT_PERIOD_MS

    def error(self):
        """
        Returns error bit of last reading. If this bit is set (=1), there's problem with the
        thermocouple - it can be damaged or loosely connected
        :return: Error bit value
        """
        return self._error

    def read(self):
        """
        Reads last measurement and starts a new one. If new measurement is not ready yet, returns last value.
        Note: The last measurement can be quite old (e.g. since last call to `read`).
        To refresh measurement, call `refresh` and wait for `ready` to become True before reading.
        :return: Measured temperature
        """
        # Check if new reading is available
        if self.ready():
            # Bring CS pin low to start protocol for reading result of
            # the conversion process. Forcing the pin down outputs
            # first (dummy) sign bit 15.
            self._cs.off()
            time.sleep_us(10)

            # Read temperature bits 14-3 from MAX6675.
            value = 0
            for i in range(12):
                # SCK should resemble clock signal and new SO value
                # is presented at falling edge
                self._cycle_sck()
                value += self._so.value() << (11 - i)

            # Read the TC Input pin to check if the input is open
            self._cycle_sck()
            self._error = self._so.value()

            # Read the last two bits to complete protocol
            for i in range(2):
                self._cycle_sck()

            # Finish protocol and start new measurement
            self._cs.on()
            self._last_measurement_start = time.ticks_ms()

            self._last_read_temp = value * 0.25

        return self._last_read_temp

4.2 SSD1306的驱动程序

这个驱动继承了FrameBuffer类,显示的方法,参考FrameBuffer类的文档即可(代码有说明)。

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
# from https://github.com/micropython/micropython-lib/blob/master/micropython/drivers/display/ssd1306/ssd1306.py

from micropython import const
import framebuf


# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)

# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        self.buffer = bytearray(self.pages * self.width)
        super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
        self.init_display()

    def init_display(self):
        for cmd in (
            SET_DISP,  # display off
            # address setting
            SET_MEM_ADDR,
            0x00,  # horizontal
            # resolution and layout
            SET_DISP_START_LINE,  # start at line 0
            SET_SEG_REMAP | 0x01,  # column addr 127 mapped to SEG0
            SET_MUX_RATIO,
            self.height - 1,
            SET_COM_OUT_DIR | 0x08,  # scan from COM[N] to COM0
            SET_DISP_OFFSET,
            0x00,
            SET_COM_PIN_CFG,
            0x02 if self.width > 2 * self.height else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV,
            0x80,
            SET_PRECHARGE,
            0x22 if self.external_vcc else 0xF1,
            SET_VCOM_DESEL,
            0x30,  # 0.83*Vcc
            # display
            SET_CONTRAST,
            0xFF,  # maximum
            SET_ENTIRE_ON,  # output follows RAM contents
            SET_NORM_INV,  # not inverted
            SET_IREF_SELECT,
            0x30,  # enable internal IREF during display on
            # charge pump
            SET_CHARGE_PUMP,
            0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01,  # display on
        ):  # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()

    def poweroff(self):
        self.write_cmd(SET_DISP)

    def poweron(self):
        self.write_cmd(SET_DISP | 0x01)

    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)

    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))

    def rotate(self, rotate):
        self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
        self.write_cmd(SET_SEG_REMAP | (rotate & 1))

    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width != 128:
            # narrow displays use centred columns
            col_offset = (128 - self.width) // 2
            x0 += col_offset
            x1 += col_offset
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_data(self.buffer)


class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        self.write_list = [b"\x40", None]  # Co=0, D/C#=1
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.temp[0] = 0x80  # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)

    def write_data(self, buf):
        self.write_list[1] = buf
        self.i2c.writevto(self.addr, self.write_list)


class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        import time

        self.res(1)
        time.sleep_ms(1)
        self.res(0)
        time.sleep_ms(10)
        self.res(1)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(0)
        self.cs(0)
        self.spi.write(bytearray([cmd]))
        self.cs(1)

    def write_data(self, buf):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(1)
        self.cs(0)
        self.spi.write(buf)
        self.cs(1)

4.3 温度数据类

想模拟成时序数据库,目前只是保存了屏幕可以显示的数据数量。

class SeriesList:
    
    def __init__(self, maxLen: int, firstVal: float):
        self._maxLen = 1 if maxLen <= 0 else maxLen
        self._list = [firstVal]
        self._len = len(self._list)
    
    def append(self, val: float):
        self._list.append(val)
        if (self._len + 1) > self._maxLen:
            delVal = self._list.pop(0)
        else:
            self._len += 1
    
    def last(self, index: int = 0) -> int:
        return self._list[self._len - index -1]
    
    def histogram(self, maxRange: int) -> list:
        hList = self._list.copy()
        hMax = int(max(self._list))
        hMin = int(min(self._list))
        hRange = hMax - hMin + 1
        rate = 1 if hRange <= maxRange else (maxRange / hRange)
        if rate == 1:
            for i, v in enumerate(hList):
                hList[i] = int(v) - hMin
        else:
            for i, v in enumerate(hList):
                hList[i] = int((int(v) - hMin) * rate)
        return hList

4.4 主程序

总结一下:

  • 使用了定时任务(Timer)去探测温度和更新显示,每秒执行一次。
  • 每次执行最大耗时约150毫秒,绘画曲线的代码可以再优化。
  • 绘画曲线时,如果把整个区域置黑,再画曲线,性能比较高,但是会闪屏。目前是逐列置黑,再画,虽然慢了点,但观感良好。
  • 展示的曲线是为了直观看到温度变化,如果显示范围内的温度差超过屏幕区域的分辨率,曲线会按比例压缩。
from micropython import const
import time
from machine import Pin, SoftI2C, Timer
from max6675 import MAX6675
from ssd1306 import SSD1306_I2C
from series_list import SeriesList

UNIT_60 = const(60)
SCREEN_W = const(128)
SCREEN_H = const(64)
HISTOGRAM_X = const(0)
HISTOGRAM_Y = const(10)
HISTOGRAM_W = const(SCREEN_W)
HISTOGRAM_H = const(SCREEN_H - HISTOGRAM_Y)

""" init MAX6675 ################################## """
print('init MAX6675')
so = Pin(10, Pin.IN) # GPIO10
sck = Pin(2, Pin.OUT) # GPIO02
cs = Pin(12, Pin.OUT) # GPIO12
max = MAX6675(sck, cs, so)
time.sleep(1)
curTemp = max.read()

""" init OLED ################################## """
print('init OLED')
i2c = SoftI2C(scl=Pin(5), sda=Pin(4))
oled = SSD1306_I2C(SCREEN_W, SCREEN_H, i2c)

""" init Ticks ################################## """
ticksStart = time.ticks_ms()

""" init Temperature list ################################## """
sList = SeriesList(SCREEN_W, curTemp)

""" init Timer ################################## """
def timerRefresh(t):
    global max, curTemp, oled, timeSec, sList
    
    duration = 0 if ticksStart==None else int(time.ticks_diff(time.ticks_ms(), ticksStart) / 1000)
    dSec = duration % UNIT_60
    dMin = int(duration / UNIT_60)
    
    curTemp = max.read() # Current temperature
    sList.append(curTemp)
    
    oled.fill(0)
    oled.text('{:02d}:{:02d} | {:>6.2f} C'.format(dMin, dSec, curTemp) , 0, 0)
    hList = sList.histogram(HISTOGRAM_H)
    hLen = len(hList)
    hPre = 0
    hCur = 0
    startX = SCREEN_W - hLen
    startY = HISTOGRAM_Y + HISTOGRAM_H - 1
    for i, v in enumerate(hList):
        hPre = v if i == 0 else hCur
        hCur = v
        oled.vline(startX + i, startY - hCur, (hCur - hPre if hPre < hCur else 1), 1)
        if hPre > hCur:
            oled.vline(startX + i - 1, startY - hPre, hPre - hCur, 1)
    oled.show()

timerTemp = Timer(0)
timerTemp.init(period=500, mode=Timer.PERIODIC, callback=timerRefresh) # Every 1 second

print('start run')

春节前,以35rmb入手了Kido x3儿童智能手表,刷上基于官方系统修改的第三方ROM,尝试成为日常通讯设备,以失败告终。

1. 背景

随着时代的变化,现在已经日常使用两个手机SIM卡。一个是“保号卡”,以低价月租套餐保留手机号,一般用于注册、认证、社交等。另一个是“流量卡”,主要使用大流量套餐,子卡互打免费等。这个方案方便更换更优惠的手机流量套餐,同时避免更换手机号。

然后就想,是否可以把“保号卡”插上轻量化设备,比如可插卡的智能手表,避免别人联系不上。此时,“流量卡”可以插上其它智能设备,比如平板、笔记本电脑之类。

决定入手Kido x3,去实行这个想法。

2. 硬件介绍

Kido x3相关介绍视频:

其配置不算突出,吸引我的地方主要是:

  • 可破解,有定制系统。
  • 可插SIM卡,支持4G LTE和WiFi。
  • 价格便宜,当时只要35rmb。
  • 电池容量为900mAh,比同类产品高。
  • 防水级别为IP68,日常生活防水。

其详细配置如下:

  • CPU: Qualcomm® Snapdragon Wear™ 2100
  • 内存: 512 MB
  • 闪存: 4 GB(用户可用不到2GB)
  • 电池: 900 mAh
  • 重量: 57.4g
  • 屏幕: 1.55英寸IPS,320 * 360像素
  • 按键: 两个
  • 摄像头: 侧边500万像素、前置200万像素
  • 移动网络: 4G全网通(可插1个SIM卡)
  • WiFi: 只支持2.4GHz
  • 蓝牙: (未知)
  • GPS: L1+L5 Dual-Frequency GPS
  • 防水级别: IP68
  • 充电: 磁吸线、无线充
  • 系统:Android 7.1.1

3. 刷机

主要思路是,重启进入高通9008模式,利用高通刷机工具,刷入userdata、system、boot三个分区。

刷机前,需要制作数据线,把手表背面的4个触点连上USB 2.0的线。

具体过程,可参照ROM维护者的说明文档。

4. 缺点

  1. 定制系统优化不足。

由于原版系统没有开源,只能在原版ROM基础上修修补补。虽然可以获得root权限,但是改得有限。最致命的缺点是不够省电。虽然其电池容量比同类要大,但是开启4G流量后电量雪崩,而且是关屏待机状态,也不能坚持一天。对比孩子的“小天才”电话手表,启用流量后,能待机两天多。

还出现过致命问题,别人打不进电话!虽然重启后恢复正常,但完全没办法知道什么时候又不行了。

  1. 配套软件不足。

主要是屏幕太小,而一般App都是针对手机设计,用起来就是手机屏幕缩小的效果。缓解的办法是,系统设置三击屏幕后放大显示。

虽然有一些小市场收集了专门的App,但是App数量实在少得可怜。

  1. 抬腕操作很累。

抬腕看手表屏幕的动作,持续太久,手臂会累。另外,手表也有点重,毕竟电池容量大,日常佩戴会感到累。

  1. 一般功能的缺失。

很多传感器,比如计步、GPS等,是没有的。估计是驱动问题吧。

5. 收获

  1. 手环更实用

使用“小米手环3 NFC”已经好几年了。没有连上手机,只是作为手表(看时间)、闹钟、计步、久坐提醒、公交卡等,只需一个月充电一次。相比之下,这个手表相差太远。

  1. 体会到儿童手表的优化

以前一直很鄙视那些系统版本低下的儿童手表,例如某品牌基于Android 4.4改造还卖得贼贵。但用过才知道,儿童手表都做了大量优化,特别是省电方面的。比如我孩子的“小天才 Q2”,开启4G流量后,仍能待机两天多。另外,UI优化、配套App等,就不用多说了。

  1. 表带螺丝的替代

由于到手的kido x3只有本体,没有表带。即使购买了合适的表带,也找到合适的表带螺丝。后来才了解到,可以使用“舌钉”、“乳环”之类代替表带螺丝,确实刷新了对世界的认识(奇怪的知识有增加了)。

  1. 破解系统

kido x3的背面,4个触点,就是对应USB 2.0的四条线。接上USB后,就可以使用ADB连上系统。不少儿童手表都可以用这个方法破解。另外,这个手表是基于高通的CPU,也有专用工具刷入分区镜像,实现破解。

  1. 儿童微信

用过儿童微信,不算好用,但是提供了一个实现微信分身的方法。同一个微信号,可以同时登录手机和儿童微信,而且各不影响接收消息。不过安装儿童微信需要license,可从相关的儿童手表获取,或者直接到“咸鱼”购买。

  1. 涨价

可能由于视频的宣传带动,这货居然涨价了。折腾一番后,仍能高于原价卖出,真好~

本来CFO想帮孩子报个关于“竹子“的研学活动,包括挖春笋和做竹筒饭,收费280rmb/人。抠门的她越想越不对劲,本着“我也可以”的想法,自己策划了一场类似的活动。

研学活动的策划如下:

1. 背景

竹子除了用途广泛,还可以食用,深受国人喜爱。特别是农村,都会自家种一些。

2. 第一课时,采竹

  • 地点:种有竹子的山丘、公园
  • 内容:亲身了解竹子的生长环境,挖竹笋、砍竹子
  • 收获:

    • 体验挖竹笋,了解竹笋可食。不能挖的,可菜市场购买竹笋,再现场讲解。
    • 砍了几条竹枝。
    • 收获粗竹子一根(可淘宝购买)。

3. 第二课时,制竹

  • 地点:有编织竹子制品的店铺。
  • 内容:参观竹制品店铺,了解竹子的用途。
  • 收获:

    • 利用上一课时砍的竹枝,制作吸管。
    • 淘宝购买竹制品,因为实体店比较贵。

4. 第三课时,煮竹

  • 地点:可烧烤的阳台、烧烤场、露营场地等。
  • 内容:

    • 劈柴,把干竹枝劈开。
    • 生火,利用劈好的竹枝生火。
    • 煮竹筒饭,把粗竹子锯成段,放入米、水、切片的广式腊肠,放火上烤。
  • 收获:

    • 竹筒饭带着新鲜竹香,很好吃。
    • 体会到竹筒饭还是蒸的靠谱,不会烧焦。最好顺便搞场烧烤。

5. 总结

  • 幼儿园或小学1、2年级,以口述、绘画形式做总结。
  • 小学3到6年级,以记叙文、日记、书信、手抄报等形式做总结。

为了在MicroPython上实现Web服务,找了几个框架,最终选择tinyweb

tinyweb主要是小巧、符合编程习惯、做好错误处理、实现了HTTP 1.0基本功能等。相关信息如下:

但是tinyweb采用logging做log记录,而logging依赖的库有点多,于是改为使用ulooger。修改后的源码如下:

"""
Tiny Web - pretty simple and powerful web server for tiny platforms like ESP8266 / ESP32
MIT license
(C) Konstantin Belyalov 2017-2018

- project: https://github.com/belyalov/tinyweb
- source code: https://github.com/belyalov/tinyweb/blob/master/tinyweb/server.py
- version: e92546054b905de46f42157282677f56764fb2f0

edit: replace logging with ulogger
"""
import ulogger
import uasyncio as asyncio
import uasyncio.core
import ujson as json
import gc
import uos as os
import sys
import uerrno as errno
import usocket as socket


#log = logging.getLogger('WEB')
class UtcClock(ulogger.BaseClock):
    def __call__(self) -> str:
        # UTC date time: (year, month, mday, hour, minute, second, weekday, yearday)
        dt = time.gmtime()
        return f'{dt[0]}{dt[1]:02d}{dt[2]:02d}_{dt[3]:02d}{dt[4]:02d}{dt[5]:02d}'

handler_to_term = ulogger.Handler(
    level=ulogger.INFO,
    colorful=True,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_TERM,
)

handler_to_file = ulogger.Handler(
    level=ulogger.INFO,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_FILE,
    file_name="tinyweb.log",
    max_file_size=102400 # max for 100KB
)
logger = ulogger.Logger(
    name = __name__,
    handlers = (
        handler_to_term,
        handler_to_file
    )
)

type_gen = type((lambda: (yield))())

# uasyncio v3 is shipped with MicroPython 1.13, and contains some subtle
# but breaking changes. See also https://github.com/peterhinch/micropython-async/blob/master/v3/README.md
IS_UASYNCIO_V3 = hasattr(asyncio, "__version__") and asyncio.__version__ >= (3,)


def urldecode_plus(s):
    """Decode urlencoded string (including '+' char).

    Returns decoded string
    """
    s = s.replace('+', ' ')
    arr = s.split('%')
    res = arr[0]
    for it in arr[1:]:
        if len(it) >= 2:
            res += chr(int(it[:2], 16)) + it[2:]
        elif len(it) == 0:
            res += '%'
        else:
            res += it
    return res


def parse_query_string(s):
    """Parse urlencoded string into dict.

    Returns dict
    """
    res = {}
    pairs = s.split('&')
    for p in pairs:
        vals = [urldecode_plus(x) for x in p.split('=', 1)]
        if len(vals) == 1:
            res[vals[0]] = ''
        else:
            res[vals[0]] = vals[1]
    return res


class HTTPException(Exception):
    """HTTP protocol exceptions"""

    def __init__(self, code=400):
        self.code = code


class request:
    """HTTP Request class"""

    def __init__(self, _reader):
        self.reader = _reader
        self.headers = {}
        self.method = b''
        self.path = b''
        self.query_string = b''

    async def read_request_line(self):
        """Read and parse first line (AKA HTTP Request Line).
        Function is generator.

        Request line is something like:
        GET /something/script?param1=val1 HTTP/1.1
        """
        while True:
            rl = await self.reader.readline()
            # skip empty lines
            if rl == b'\r\n' or rl == b'\n':
                continue
            break
        rl_frags = rl.split()
        if len(rl_frags) != 3:
            raise HTTPException(400)
        self.method = rl_frags[0]
        url_frags = rl_frags[1].split(b'?', 1)
        self.path = url_frags[0]
        if len(url_frags) > 1:
            self.query_string = url_frags[1]

    async def read_headers(self, save_headers=[]):
        """Read and parse HTTP headers until \r\n\r\n:
        Optional argument 'save_headers' controls which headers to save.
            This is done mostly to deal with memory constrains.

        Function is generator.

        HTTP headers could be like:
        Host: google.com
        Content-Type: blah
        \r\n
        """
        while True:
            gc.collect()
            line = await self.reader.readline()
            if line == b'\r\n':
                break
            frags = line.split(b':', 1)
            if len(frags) != 2:
                raise HTTPException(400)
            if frags[0] in save_headers:
                self.headers[frags[0]] = frags[1].strip()

    async def read_parse_form_data(self):
        """Read HTTP form data (payload), if any.
        Function is generator.

        Returns:
            - dict of key / value pairs
            - None in case of no form data present
        """
        # TODO: Probably there is better solution how to handle
        # request body, at least for simple urlencoded forms - by processing
        # chunks instead of accumulating payload.
        gc.collect()
        if b'Content-Length' not in self.headers:
            return {}
        # Parse payload depending on content type
        if b'Content-Type' not in self.headers:
            # Unknown content type, return unparsed, raw data
            return {}
        size = int(self.headers[b'Content-Length'])
        if size > self.params['max_body_size'] or size < 0:
            raise HTTPException(413)
        data = await self.reader.readexactly(size)
        # Use only string before ';', e.g:
        # application/x-www-form-urlencoded; charset=UTF-8
        ct = self.headers[b'Content-Type'].split(b';', 1)[0]
        try:
            if ct == b'application/json':
                return json.loads(data)
            elif ct == b'application/x-www-form-urlencoded':
                return parse_query_string(data.decode())
        except ValueError:
            # Re-generate exception for malformed form data
            raise HTTPException(400)


class response:
    """HTTP Response class"""

    def __init__(self, _writer):
        self.writer = _writer
        self.send = _writer.awrite
        self.code = 200
        self.version = '1.0'
        self.headers = {}

    async def _send_headers(self):
        """Compose and send:
        - HTTP request line
        - HTTP headers following by \r\n.
        This function is generator.

        P.S.
        Because of usually we have only a few HTTP headers (2-5) it doesn't make sense
        to send them separately - sometimes it could increase latency.
        So combining headers together and send them as single "packet".
        """
        # Request line
        hdrs = 'HTTP/{} {} MSG\r\n'.format(self.version, self.code)
        # Headers
        for k, v in self.headers.items():
            hdrs += '{}: {}\r\n'.format(k, v)
        hdrs += '\r\n'
        # Collect garbage after small mallocs
        gc.collect()
        await self.send(hdrs)

    async def error(self, code, msg=None):
        """Generate HTTP error response
        This function is generator.

        Arguments:
            code - HTTP response code

        Example:
            # Not enough permissions. Send HTTP 403 - Forbidden
            await resp.error(403)
        """
        self.code = code
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    async def redirect(self, location, msg=None):
        """Generate HTTP redirect response to 'location'.
        Basically it will generate HTTP 302 with 'Location' header

        Arguments:
            location - URL to redirect to

        Example:
            # Redirect to /something
            await resp.redirect('/something')
        """
        self.code = 302
        self.add_header('Location', location)
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    def add_header(self, key, value):
        """Add HTTP response header

        Arguments:
            key - header name
            value - header value

        Example:
            resp.add_header('Content-Encoding', 'gzip')
        """
        self.headers[key] = value

    def add_access_control_headers(self):
        """Add Access Control related HTTP response headers.
        This is required when working with RestApi (JSON requests)
        """
        self.add_header('Access-Control-Allow-Origin', self.params['allowed_access_control_origins'])
        self.add_header('Access-Control-Allow-Methods', self.params['allowed_access_control_methods'])
        self.add_header('Access-Control-Allow-Headers', self.params['allowed_access_control_headers'])

    async def start_html(self):
        """Start response with HTML content type.
        This function is generator.

        Example:
            await resp.start_html()
            await resp.send('<html><h1>Hello, world!</h1></html>')
        """
        self.add_header('Content-Type', 'text/html')
        await self._send_headers()

    async def send_file(self, filename, content_type=None, content_encoding=None, max_age=2592000, buf_size=128):
        """Send local file as HTTP response.
        This function is generator.

        Arguments:
            filename - Name of file which exists in local filesystem
        Keyword arguments:
            content_type - Filetype. By default - None means auto-detect.
            max_age - Cache control. How long browser can keep this file on disk.
                      By default - 30 days
                      Set to 0 - to disable caching.

        Example 1: Default use case:
            await resp.send_file('images/cat.jpg')

        Example 2: Disable caching:
            await resp.send_file('static/index.html', max_age=0)

        Example 3: Override content type:
            await resp.send_file('static/file.bin', content_type='application/octet-stream')
        """
        try:
            # Get file size
            stat = os.stat(filename)
            slen = str(stat[6])
            self.add_header('Content-Length', slen)
            # Find content type
            if content_type:
                self.add_header('Content-Type', content_type)
            # Add content-encoding, if any
            if content_encoding:
                self.add_header('Content-Encoding', content_encoding)
            # Since this is static content is totally make sense
            # to tell browser to cache it, however, you can always
            # override it by setting max_age to zero
            self.add_header('Cache-Control', 'max-age={}, public'.format(max_age))
            with open(filename) as f:
                await self._send_headers()
                gc.collect()
                buf = bytearray(min(stat[6], buf_size))
                while True:
                    size = f.readinto(buf)
                    if size == 0:
                        break
                    await self.send(buf, sz=size)
        except OSError as e:
            # special handling for ENOENT / EACCESS
            if e.args[0] in (errno.ENOENT, errno.EACCES):
                raise HTTPException(404)
            else:
                raise


async def restful_resource_handler(req, resp, param=None):
    """Handler for RESTful API endpoins"""
    # Gather data - query string, JSON in request body...
    data = await req.read_parse_form_data()
    # Add parameters from URI query string as well
    # This one is actually for simply development of RestAPI
    if req.query_string != b'':
        data.update(parse_query_string(req.query_string.decode()))
    # Call actual handler
    _handler, _kwargs = req.params['_callmap'][req.method]
    # Collect garbage before / after handler execution
    gc.collect()
    if param:
        res = _handler(data, param, **_kwargs)
    else:
        res = _handler(data, **_kwargs)
    gc.collect()
    # Handler result could be:
    # 1. generator - in case of large payload
    # 2. string - just string :)
    # 2. dict - meaning client what tinyweb to convert it to JSON
    # it can also return error code together with str / dict
    # res = {'blah': 'blah'}
    # res = {'blah': 'blah'}, 201
    if isinstance(res, type_gen):
        # Result is generator, use chunked response
        # NOTICE: HTTP 1.0 by itself does not support chunked responses, so, making workaround:
        # Response is HTTP/1.1 with Connection: close
        resp.version = '1.1'
        resp.add_header('Connection', 'close')
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Transfer-Encoding', 'chunked')
        resp.add_access_control_headers()
        await resp._send_headers()
        # Drain generator
        for chunk in res:
            chunk_len = len(chunk.encode('utf-8'))
            await resp.send('{:x}\r\n'.format(chunk_len))
            await resp.send(chunk)
            await resp.send('\r\n')
            gc.collect()
        await resp.send('0\r\n\r\n')
    else:
        if type(res) == tuple:
            resp.code = res[1]
            res = res[0]
        elif res is None:
            raise Exception('Result expected')
        # Send response
        if type(res) is dict:
            res_str = json.dumps(res)
        else:
            res_str = res
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Content-Length', str(len(res_str)))
        resp.add_access_control_headers()
        await resp._send_headers()
        await resp.send(res_str)


class webserver:

    def __init__(self, request_timeout=3, max_concurrency=3, backlog=16, debug=False):
        """Tiny Web Server class.
        Keyword arguments:
            request_timeout - Time for client to send complete request
                              after that connection will be closed.
            max_concurrency - How many connections can be processed concurrently.
                              It is very important to limit this number because of
                              memory constrain.
                              Default value depends on platform
            backlog         - Parameter to socket.listen() function. Defines size of
                              pending to be accepted connections queue.
                              Must be greater than max_concurrency
            debug           - Whether send exception info (text + backtrace)
                              to client together with HTTP 500 or not.
        """
        self.loop = asyncio.get_event_loop()
        self.request_timeout = request_timeout
        self.max_concurrency = max_concurrency
        self.backlog = backlog
        self.debug = debug
        self.explicit_url_map = {}
        self.catch_all_handler = None
        self.parameterized_url_map = {}
        # Currently opened connections
        self.conns = {}
        # Statistics
        self.processed_connections = 0

    def _find_url_handler(self, req):
        """Helper to find URL handler.
        Returns tuple of (function, opts, param) or (None, None) if not found.
        """
        # First try - lookup in explicit (non parameterized URLs)
        if req.path in self.explicit_url_map:
            return self.explicit_url_map[req.path]
        # Second try - strip last path segment and lookup in another map
        idx = req.path.rfind(b'/') + 1
        path2 = req.path[:idx]
        if len(path2) > 0 and path2 in self.parameterized_url_map:
            # Save parameter into request
            req._param = req.path[idx:].decode()
            return self.parameterized_url_map[path2]

        if self.catch_all_handler:
            return self.catch_all_handler

        # No handler found
        return (None, None)

    async def _handle_request(self, req, resp):
        await req.read_request_line()
        # Find URL handler
        req.handler, req.params = self._find_url_handler(req)
        if not req.handler:
            # No URL handler found - read response and issue HTTP 404
            await req.read_headers()
            raise HTTPException(404)
        # req.params = params
        # req.handler = han
        resp.params = req.params
        # Read / parse headers
        await req.read_headers(req.params['save_headers'])

    async def _handler(self, reader, writer):
        """Handler for TCP connection with
        HTTP/1.0 protocol implementation
        """
        gc.collect()

        try:
            req = request(reader)
            resp = response(writer)
            # Read HTTP Request with timeout
            await asyncio.wait_for(self._handle_request(req, resp),
                                   self.request_timeout)

            # OPTIONS method is handled automatically
            if req.method == b'OPTIONS':
                resp.add_access_control_headers()
                # Since we support only HTTP 1.0 - it is important
                # to tell browser that there is no payload expected
                # otherwise some webkit based browsers (Chrome)
                # treat this behavior as an error
                resp.add_header('Content-Length', '0')
                await resp._send_headers()
                return

            # Ensure that HTTP method is allowed for this path
            if req.method not in req.params['methods']:
                raise HTTPException(405)

            # Handle URL
            gc.collect()
            if hasattr(req, '_param'):
                await req.handler(req, resp, req._param)
            else:
                await req.handler(req, resp)
            # Done here
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass
        except OSError as e:
            # Do not send response for connection related errors - too late :)
            # P.S. code 32 - is possible BROKEN PIPE error (TODO: is it true?)
            if e.args[0] not in (errno.ECONNABORTED, errno.ECONNRESET, 32):
                try:
                    await resp.error(500)
                except Exception as e:
                    #log.exc(e, "")
                    logger.error(f'OSError: {e}')
        except HTTPException as e:
            try:
                await resp.error(e.code)
            except Exception as e:
                #log.exc(e)
                logger.error(f'HTTPException: {e}')
        except Exception as e:
            # Unhandled expection in user's method
            #log.error(req.path.decode())
            #log.exc(e, "")
            logger.error(f'Unhandled expection. URL: {req.path.decode()}, exception: {e}')
            try:
                await resp.error(500)
                # Send exception info if desired
                if self.debug:
                    sys.print_exception(e, resp.writer.s)
            except Exception as e:
                pass
        finally:
            await writer.aclose()
            # Max concurrency support -
            # if queue is full schedule resume of TCP server task
            if len(self.conns) == self.max_concurrency:
                self.loop.create_task(self._server_coro)
            # Delete connection, using socket as a key
            del self.conns[id(writer.s)]

    def add_route(self, url, f, **kwargs):
        """Add URL to function mapping.

        Arguments:
            url - url to map function with
            f - function to map

        Keyword arguments:
            methods - list of allowed methods. Defaults to ['GET', 'POST']
            save_headers - contains list of HTTP headers to be saved. Case sensitive. Default - empty.
            max_body_size - Max HTTP body size (e.g. POST form data). Defaults to 1024
            allowed_access_control_headers - Default value for the same name header. Defaults to *
            allowed_access_control_origins - Default value for the same name header. Defaults to *
        """
        if url == '' or '?' in url:
            raise ValueError('Invalid URL')
        # Initial params for route
        params = {'methods': ['GET'],
                  'save_headers': [],
                  'max_body_size': 1024,
                  'allowed_access_control_headers': '*',
                  'allowed_access_control_origins': '*',
                  }
        params.update(kwargs)
        params['allowed_access_control_methods'] = ', '.join(params['methods'])
        # Convert methods/headers to bytestring
        params['methods'] = [x.encode() for x in params['methods']]
        params['save_headers'] = [x.encode() for x in params['save_headers']]
        # If URL has a parameter
        if url.endswith('>'):
            idx = url.rfind('<')
            path = url[:idx]
            idx += 1
            param = url[idx:-1]
            if path.encode() in self.parameterized_url_map:
                raise ValueError('URL exists')
            params['_param_name'] = param
            self.parameterized_url_map[path.encode()] = (f, params)

        if url.encode() in self.explicit_url_map:
            raise ValueError('URL exists')
        self.explicit_url_map[url.encode()] = (f, params)

    def add_resource(self, cls, url, **kwargs):
        """Map resource (RestAPI) to URL

        Arguments:
            cls - Resource class to map to
            url - url to map to class
            kwargs - User defined key args to pass to the handler.

        Example:
            class myres():
                def get(self, data):
                    return {'hello': 'world'}


            app.add_resource(myres, '/api/myres')
        """
        methods = []
        callmap = {}
        # Create instance of resource handler, if passed as just class (not instance)
        try:
            obj = cls()
        except TypeError:
            obj = cls
        # Get all implemented HTTP methods and make callmap
        for m in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            fn = m.lower()
            if hasattr(obj, fn):
                methods.append(m)
                callmap[m.encode()] = (getattr(obj, fn), kwargs)
        self.add_route(url, restful_resource_handler,
                       methods=methods,
                       save_headers=['Content-Length', 'Content-Type'],
                       _callmap=callmap)

    def catchall(self):
        """Decorator for catchall()

        Example:
            @app.catchall()
            def catchall_handler(req, resp):
                response.code = 404
                await response.start_html()
                await response.send('<html><body><h1>My custom 404!</h1></html>\n')
        """
        params = {'methods': [b'GET'], 'save_headers': [], 'max_body_size': 1024, 'allowed_access_control_headers': '*', 'allowed_access_control_origins': '*'}

        def _route(f):
            self.catch_all_handler = (f, params)
            return f
        return _route

    def route(self, url, **kwargs):
        """Decorator for add_route()

        Example:
            @app.route('/')
            def index(req, resp):
                await resp.start_html()
                await resp.send('<html><body><h1>Hello, world!</h1></html>\n')
        """
        def _route(f):
            self.add_route(url, f, **kwargs)
            return f
        return _route

    def resource(self, url, method='GET', **kwargs):
        """Decorator for add_resource() method

        Examples:
            @app.resource('/users')
            def users(data):
                return {'a': 1}

            @app.resource('/messages/<topic_id>')
            async def index(data, topic_id):
                yield '{'
                yield '"topic_id": "{}",'.format(topic_id)
                yield '"message": "test",'
                yield '}'
        """
        def _resource(f):
            self.add_route(url, restful_resource_handler,
                           methods=[method],
                           save_headers=['Content-Length', 'Content-Type'],
                           _callmap={method.encode(): (f, kwargs)})
            return f
        return _resource

    async def _tcp_server(self, host, port, backlog):
        """TCP Server implementation.
        Opens socket for accepting connection and
        creates task for every new accepted connection
        """
        addr = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][-1]
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(addr)
        sock.listen(backlog)
        try:
            while True:
                if IS_UASYNCIO_V3:
                    yield uasyncio.core._io_queue.queue_read(sock)
                else:
                    yield asyncio.IORead(sock)
                csock, caddr = sock.accept()
                csock.setblocking(False)
                # Start handler / keep it in the map - to be able to
                # shutdown gracefully - by close all connections
                self.processed_connections += 1
                hid = id(csock)
                handler = self._handler(asyncio.StreamReader(csock),
                                        asyncio.StreamWriter(csock, {}))
                self.conns[hid] = handler
                self.loop.create_task(handler)
                # In case of max concurrency reached - temporary pause server:
                # 1. backlog must be greater than max_concurrency, otherwise
                #    client will got "Connection Reset"
                # 2. Server task will be resumed whenever one active connection finished
                if len(self.conns) == self.max_concurrency:
                    # Pause
                    yield False
        except asyncio.CancelledError:
            return
        finally:
            sock.close()

    def run(self, host="127.0.0.1", port=8081, loop_forever=True):
        """Run Web Server. By default it runs forever.

        Keyword arguments:
            host - host to listen on. By default - localhost (127.0.0.1)
            port - port to listen on. By default - 8081
            loop_forever - run loo.loop_forever(), otherwise caller must run it by itself.
        """
        self._server_coro = self._tcp_server(host, port, self.backlog)
        self.loop.create_task(self._server_coro)
        if loop_forever:
            self.loop.run_forever()

    def shutdown(self):
        """Gracefully shutdown Web Server"""
        asyncio.cancel(self._server_coro)
        for hid, coro in self.conns.items():
            asyncio.cancel(coro)

使用时,其实也有不少东西需要注意。

  1. 处理POST请求的方法,其注解需要设置save_headers = ['Content-Length','Content-Type'],才能获取到请求的参数。例如:
app = tinyweb.webserver()

@app.route('/save', methods = ['POST'], save_headers = ['Content-Length','Content-Type'])
async def save(request, response):
    params = await request.read_parse_form_data()
    await response.send(params)
  1. 一个方法处理多个请求链接,加多行注解即可。例如:
app = tinyweb.webserver()

@app.route('/')
@app.route('/index')
async def index(request, response):
    await response.send("Hello world!")
  1. 如果需要浏览器或对方客户端不要缓存,设置响应头即可。相关代码:
    response.add_header('Cache-Control', 'no-cache')  # for HTTP 1.1
    response.add_header('Pragma', 'no-cache')  # for HTTP 1.0
    response.add_header('Expires', '0')  # for proxies
  1. Web服务的max_concurrency不要设置太大,以降低内存占用。
  2. 更多的示例,可参考官方文档。

采用MicroPython编写的定时任务,特别是在实际环境测试,一般不能看到错误信息。因此,需要做log记录。

找到一个实现Logger功能的项目ulogger,其代码没有依赖其它库,使用方式也符合一般的Logger用法。相关信息如下:

相关代码复制了一份过来,没做修改:

"""
- project: micropython-ulogger
  https://github.com/whales-chen/micropython-ulogger
- code from
  https://github.com/whales-chen/micropython-ulogger/blob/main/ulogger/__init__.py
- version
  ec4f6b3842c677fbb457f6bc6d88afd8a82eeed6
"""
try:    import time
except: import utime as time

try:    from micropython import const
except: const = lambda x:x # for debug

from io import TextIOWrapper

__version__ = "v1.2"

DEBUG:    int = const(10)
INFO:     int = const(20)
WARN:     int = const(30)
ERROR:    int = const(40)
CRITICAL: int = const(50)

TO_FILE = const(100)
TO_TERM = const(200)

# fmt map 的可选参数
_level  = const(0)
_msg    = const(1)
_time   = const(2)
_name   = const(3)
_fnname = const(4)


def level_name(level: int, color: bool = False) -> str:
    if not color:
        if level == INFO:
            return "INFO"
        elif level == DEBUG:
            return "DEBUG"
        elif level == WARN:
            return "WARN"
        elif level == ERROR:
            return "ERROR"
        elif level == CRITICAL:
            return "CRITICAL"
    else:
        if level == INFO:
            return "\033[97mINFO\033[0m"
        elif level == DEBUG:
            return "\033[37mDEBUG\033[0m"
        elif level == WARN:
            return "\033[93mWARN\033[0m"
        elif level == ERROR:
            return "\033[35mERROR\033[0m"
        elif level == CRITICAL:
            return "\033[91mCRITICAL\033[0m"


class BaseClock ():
    """
    This is a BaseClock for the logger.
    Please inherit this class by your custom.
    """

    def __call__(self) -> str:
        """
        Acquire the time of now, please inherit this method.
        We will use the return value of this function as the time format of the log,
        such as `2021 - 6 - 13` or `12:45:23` and so on.

        :return: the time string.
        """
        return '%d' % time.time()


class Handler():
    """The Handler for logger.
    """
    _template: str
    _map: bytes
    level: int
    _direction: int
    _clock: BaseClock
    _color: bool
    _file_name: str
    _max_size: int
    _file = TextIOWrapper

    def __init__(self,
        level: int = INFO,
        colorful: bool = True,
        fmt: str = "&(time)% - &(level)% - &(name)% - &(msg)%",
        clock: BaseClock = None,
        direction: int = TO_TERM,
        file_name: str = "logging.log",
        max_file_size: int = 4096
        ):
        """
        Create a Handler that you can add to the logger later

        ## Options available for fmt.
        - &(level)%  : the log level
        - &(msg)%    : the log message
        - &(time)%   : the time acquire from clock, see `BaseClock`
        - &(name)%   : the logger's name
        - &(fnname)%  : the function name which you will pass on.
        - more optional is developing.

        ## Options available for level.
        - DEBUG
        - INFO
        - WARN
        - ERROR
        - CRITICAL

        ## Options available for direction.
        - TO_FILE : output to a file
        - TO_TERM : output to terminal

        :param level: Set a minimum level you want to be log
        :type level: int(see the consts in this module)

        :param colorful: Whether to use color display information to terminal(Not applicable to files)
        :type colorful: bool

        :param fmt: the format string like: "&(time)% - &(level)% - &(name)% - &(fnname)% - &(msg)%"(default)
        :type fmt: str

        :param clock: The Clock which will provide time str. see `BaseClock`
        :type clock: BaseClock(can be inherit )

        :param direction: Set the direction where logger will output
        :type direction: int (`TO_FILE` or `TO_TERM`)

        :param file_name: available when you set `TO_FILE` to param `direction`. (default for `logging.log`)
        :type file_name: str
        :param max_file_size: available when you set `TO_FILE` to param `direction`. The unit is `byte`, (default for 4k)
        :type max_file_size: str
        """
        #TODO: 文件按日期存储, 最大份数的设置.
        self._direction = direction
        self.level = level
        self._clock = clock if clock else BaseClock()
        self._color = colorful
        self._file_name = file_name if direction == TO_FILE else ''
        self._max_size = max_file_size if direction == TO_FILE else 0

        if direction == TO_FILE:
            self._file = open(file_name, 'a+')

        # 特么的re居然不能全局匹配, 烦了, 只能自己来.
        # m = re.match(r"&\((.*?)\)%", fmt)
        # i = 0
        # while True:
        #     # 由于蛋疼的 ure 不能直接获取匹配结果的数量, 只能使用这种蠢蛋方法来循环.
        #     try:
        #         text = m.group(i)

        #     except:
        #         # 发生错误说明已经遍历完毕
        #         break

        #     # 使用指针代替文本来减少开销
        #     if text == "level":
        #         self._map.append(_level)
        #     elif text == "msg":
        #         self._map.append(_msg)
        #     elif text == "time":
        #         self._map.append(_time)
        #     elif text == "name":
        #         self._map.append(_name)
        #     elif text == "fnname":
        #         self._map.append(_fnname)

        #     i += 1

        # 添加映射
        self._map = bytearray()
        idx = 0
        while True:
            idx = fmt.find("&(", idx)
            if idx >= 0:  # 有找到
                a_idx = fmt.find(")%", idx+2)
                if a_idx < 0:
                    # 没找到后缀, 报错
                    raise Exception(
                        "Unable to parse text format successfully.")
                text = fmt[idx+2:a_idx]
                idx = a_idx+2  # 交换位置
                if text == "level":
                    self._map.append(_level)
                elif text == "msg":
                    self._map.append(_msg)
                elif text == "time":
                    self._map.append(_time)
                elif text == "name":
                    self._map.append(_name)
                elif text == "fnname":
                    self._map.append(_fnname)
            else:  # 没找到, 代表后面没有了
                break

        # 将 template 变成可被格式化的文本
        # 确保最后一个是换行字符

        self._template = fmt.replace("&(level)%", "%s")\
            .replace("&(msg)%", "%s")\
            .replace("&(time)%", "%s")\
            .replace("&(name)%", "%s")\
            .replace("&(fnname)%", "%s")\
            + "\n" if fmt[:-1] != '\n' else ''

    def _msg(self, *args, level: int, name: str, fnname: str):
        """
        Log a msg
        """
        
        if level < self.level:
            return
        # generate msg
        temp_map = []
        text = ''
        for item in self._map:
            if item == _msg:
                for text_ in args:  # 将元组内的文本添加到一起
                    text = "%s%s" % (text, text_)  # 防止用户输入其他类型(int, float)
                temp_map.append(text)
            elif item == _level:
                if self._direction == TO_TERM:  # only terminal can use color.
                    temp_map.append(level_name(level, self._color))
                else:
                    temp_map.append(level_name(level))
            elif item == _time:
                temp_map.append(self._clock())
            elif item == _name:
                temp_map.append(name)
            elif item == _fnname:
                temp_map.append(fnname if fnname else "unknownfn")

        if self._direction == TO_TERM:
            self._to_term(tuple(temp_map))
        else:
            self._to_file(tuple(temp_map))
        # TODO: 待验证: 转换为 tuple 和使用 fromat 谁更快

    def _to_term(self, map: tuple):
        print(self._template % map, end='')

    def _to_file(self, map: tuple):
        fp = self._file
        # 检查是否超出大小限制.
        prev_idx = fp.tell()  # 保存原始指针位置
        # 将读写指针跳到文件最大限制的地方,
        # 如果能读出数据, 说明文件大于指定的大小
        fp.seek(self._max_size)
        if fp.read(1):  # 能读到数据, 说明超出大小限制了
            fp = self._file = open(self._file_name, 'w')  # 使用 w 选项清空文件内容
        else:
            # 没有超出限制
            fp.seek(prev_idx)  # 指针回到原来的地方

        # 检查完毕, 开始写入数据
        fp.write(self._template % map)
        fp.flush()


class Logger():
    _handlers: list

    def __init__(self,
        name: str,
        handlers: list = None,
        ):

        self.name = name
        if not handlers:
            # 如果没有指定处理器, 自动创建一个默认的
            self._handlers = [Handler()]
        else:
            self._handlers = handlers

    @property
    def handlers(self):
        return self._handlers

    def _msg(self, *args, level: int, fn: str):

        for item in self._handlers:
            #try:
            item._msg(*args, level=level, fnname=fn, name=self.name)
            #except:
            #    print("Failed while trying to record")

    def debug(self, *args, fn: str = None):
        self._msg(*args, level=DEBUG, fn=fn)

    def info(self, *args, fn: str = None):
        self._msg(*args, level=INFO, fn=fn)

    def warn(self, *args, fn: str = None):
        self._msg(*args, level=WARN, fn=fn)

    def error(self, *args, fn: str = None):
        self._msg(*args, level=ERROR, fn=fn)

    def critical(self, *args, fn: str = None):
        self._msg(*args, level=CRITICAL, fn=fn)


__all__ = [
    Logger,
    Handler,
    BaseClock,


    DEBUG,
    INFO,
    WARN,
    ERROR,
    CRITICAL,

    TO_FILE,
    TO_TERM,

    __version__
]

近来搞电脑的远程启动搞上瘾了。使用网络启动(Wake on Lan),确实带来很多玩法。为了进一步降低电费,减少电脑非使用时段(例如晚上睡觉时段)待机而产生的功耗,采用ESP32C3(刷上MicroPython)来作为远程开机设备。即:

  • 普通x86电脑,在晚上或无需使用的时间正常关机,并开启网络启动功能。
  • ESP32C3开发板保持24小时开机并联网,可在需要时远程启动所需x86电脑。

于是找了下,在MicroPython上发送Wake on Lan的实现代码。参考了以下文章:

整理出可用代码,保存为文件wol.py,如下:

"""
Small module for use with the wake on lan protocol.

Reference:
- https://pypi.org/project/wakeonlan/
- https://www.cnblogs.com/Yogile/p/16488281.html
"""
import socket
import ubinascii


BROADCAST_IP = "255.255.255.255"
DEFAULT_PORT = 9
SO_BROADCAST = 20

def create_magic_packet(macaddress: str) -> bytes:
    """
    Create a magic packet.

    A magic packet is a packet that can be used with the for wake on lan
    protocol to wake up a computer. The packet is constructed from the
    mac address given as a parameter.

    Args:
        macaddress: the mac address that should be parsed into a magic packet.

    """
    if len(macaddress) == 17:
        sep = macaddress[2]
        macaddress = macaddress.replace(sep, "")
    elif len(macaddress) == 14:
        sep = macaddress[4]
        macaddress = macaddress.replace(sep, "")
    if len(macaddress) != 12:
        raise ValueError("Incorrect MAC address format")
    #return bytes.fromhex("F" * 12 + macaddress * 16)
    return ubinascii.unhexlify("F" * 12 + macaddress * 16)


def send_magic_packet(
    *macs: str,
    ip_address: str = BROADCAST_IP,
    port: int = DEFAULT_PORT,
    interface: str = None
) -> None:
    """
    Wake up computers having any of the given mac addresses.

    Wake on lan must be enabled on the host device.

    Args:
        macs: One or more macaddresses of machines to wake.

    Keyword Args:
        ip_address: the ip address of the host to send the magic packet to.
        port: the port of the host to send the magic packet to.
        interface: the ip address of the network adapter to route the magic packet through.

    """
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 1)
        
        if interface is not None:
            sock.bind((interface, 0))
        sock.setsockopt(socket.SOL_SOCKET, SO_BROADCAST, 1)
        sock.connect((ip_address, port))
        for mac in macs:
            packet = create_magic_packet(mac)
            sock.send(packet)
            print("send magic packet to MAC [%s]" % (mac))
    except:
        print('send magic packet failed')
        pass
    finally:
        sock.close()

使用示例:

import wol

mac = '12:ab:12:ab:12:ab' # 必填参数。要启动电脑的网卡MAC
broadcastIp = '192.168.0.255'  # 可选参数。广播的地址,一般填对应网段的255地址

wol.send_magic_packet(mac, ip_address=broadcastIp)

最近做的MicroPython小项目,需要用到ping命令的功能,检查指定IP的电脑是否有开机并联网。

找了一圈,找到这个ping功能的源码。没有深究代码,就是能用。而且跟常用的ping命令差不多。

源码没改,直接搬过来,避免以后找不到:

# µPing (MicroPing) for MicroPython
# copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
# License: MIT
#
# Internet Checksum Algorithm
# Author: Olav Morken
# https://github.com/olavmrk/python-ping/blob/master/ping.py
# @data: bytes
#
# source code from: https://forum.pycom.io/topic/4930/ping-command/15
#
def checksum(data):
    if len(data) & 0x1: # Odd number of bytes
        data += b'\0'
    cs = 0
    for pos in range(0, len(data), 2):
        b1 = data[pos]
        b2 = data[pos + 1]
        cs += (b1 << 8) + b2
    while cs >= 0x10000:
        cs = (cs & 0xffff) + (cs >> 16)
    cs = ~cs & 0xffff
    return cs

def ping(host, count=4, timeout=5000, interval=10, quiet=False, size=64):
    import utime
    import uselect
    import uctypes
    import usocket
    import ustruct
    import uos

    # prepare packet
    assert size >= 16, "pkt size too small"
    pkt = b'Q'*size
    pkt_desc = {
        "type": uctypes.UINT8 | 0,
        "code": uctypes.UINT8 | 1,
        "checksum": uctypes.UINT16 | 2,
        "id": (uctypes.ARRAY | 4, 2 | uctypes.UINT8),
        "seq": uctypes.INT16 | 6,
        "timestamp": uctypes.UINT64 | 8,
    } # packet header descriptor
    h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN)
    h.type = 8 # ICMP_ECHO_REQUEST
    h.code = 0
    h.checksum = 0
    h.id[0:2] = uos.urandom(2)
    h.seq = 1

    # init socket
    sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
    sock.setblocking(0)
    sock.settimeout(timeout/1000)
    try:
        addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
    except IndexError:
        not quiet and print("Could not determine the address of", host)
        return None
    sock.connect((addr, 1))
    not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))

    seqs = list(range(1, count+1)) # [1,2,...,count]
    c = 1
    t = 0
    n_trans = 0
    n_recv = 0
    finish = False
    while t < timeout:
        if t==interval and c<=count:
            # send packet
            h.checksum = 0
            h.seq = c
            h.timestamp = utime.ticks_us()
            h.checksum = checksum(pkt)
            if sock.send(pkt) == size:
                n_trans += 1
                t = 0 # reset timeout
            else:
                seqs.remove(c)
            c += 1

        # recv packet
        while 1:
            socks, _, _ = uselect.select([sock], [], [], 0)
            if socks:
                resp = socks[0].recv(4096)
                resp_mv = memoryview(resp)
                h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN)
                # TODO: validate checksum (optional)
                seq = h2.seq
                if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
                    t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
                    ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
                    n_recv += 1
                    not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped))
                    seqs.remove(seq)
                    if len(seqs) == 0:
                        finish = True
                        break
            else:
                break

        if finish:
            break

        utime.sleep_ms(1)
        t += 1

    # close
    sock.close()
    ret = (n_trans, n_recv)
    not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv))
    return (n_trans, n_recv)

最近利用“合宙ESP32C3-Core”做了个MicroPython的小项目,用到了HTTP请求的功能。

找到了以下文章介绍urllib.urequest。据说代码少、性能好,能满足80%的需求。能支持HTTPS的请求。但使用下来,发现功能不完善,最大问题是不支持超时(timeout)设置。

折腾一番后,改为使用urequests,相关功能都比较完善,可以设置timeout、请求头等。

源码没改,保存为文件名urequests.py即可:

"""
code from: https://github.com/micropython/micropython-lib/blob/master/python-ecosys/urequests/urequests.py
version: a3d6d29b1b9de2bb147e0751c08a39608ebe06c8
"""
import usocket


class Response:
    def __init__(self, f):
        self.raw = f
        self.encoding = "utf-8"
        self._cached = None

    def close(self):
        if self.raw:
            self.raw.close()
            self.raw = None
        self._cached = None

    @property
    def content(self):
        if self._cached is None:
            try:
                self._cached = self.raw.read()
            finally:
                self.raw.close()
                self.raw = None
        return self._cached

    @property
    def text(self):
        return str(self.content, self.encoding)

    def json(self):
        import ujson

        return ujson.loads(self.content)


def request(
    method,
    url,
    data=None,
    json=None,
    headers={},
    stream=None,
    auth=None,
    timeout=None,
    parse_headers=True,
):
    redirect = None  # redirection url, None means no redirection
    chunked_data = data and getattr(data, "__iter__", None) and not getattr(data, "__len__", None)

    if auth is not None:
        import ubinascii

        username, password = auth
        formated = b"{}:{}".format(username, password)
        formated = str(ubinascii.b2a_base64(formated)[:-1], "ascii")
        headers["Authorization"] = "Basic {}".format(formated)

    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto == "http:":
        port = 80
    elif proto == "https:":
        import ussl

        port = 443
    else:
        raise ValueError("Unsupported protocol: " + proto)

    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)

    ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM)
    ai = ai[0]

    resp_d = None
    if parse_headers is not False:
        resp_d = {}

    s = usocket.socket(ai[0], usocket.SOCK_STREAM, ai[2])

    if timeout is not None:
        # Note: settimeout is not supported on all platforms, will raise
        # an AttributeError if not available.
        s.settimeout(timeout)

    try:
        s.connect(ai[-1])
        if proto == "https:":
            s = ussl.wrap_socket(s, server_hostname=host)
        s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
        if not "Host" in headers:
            s.write(b"Host: %s\r\n" % host)
        # Iterate over keys to avoid tuple alloc
        for k in headers:
            s.write(k)
            s.write(b": ")
            s.write(headers[k])
            s.write(b"\r\n")
        if json is not None:
            assert data is None
            import ujson

            data = ujson.dumps(json)
            s.write(b"Content-Type: application/json\r\n")
        if data:
            if chunked_data:
                s.write(b"Transfer-Encoding: chunked\r\n")
            else:
                s.write(b"Content-Length: %d\r\n" % len(data))
        s.write(b"Connection: close\r\n\r\n")
        if data:
            if chunked_data:
                for chunk in data:
                    s.write(b"%x\r\n" % len(chunk))
                    s.write(chunk)
                    s.write(b"\r\n")
                s.write("0\r\n\r\n")
            else:
                s.write(data)

        l = s.readline()
        # print(l)
        l = l.split(None, 2)
        if len(l) < 2:
            # Invalid response
            raise ValueError("HTTP error: BadStatusLine:\n%s" % l)
        status = int(l[1])
        reason = ""
        if len(l) > 2:
            reason = l[2].rstrip()
        while True:
            l = s.readline()
            if not l or l == b"\r\n":
                break
            # print(l)
            if l.startswith(b"Transfer-Encoding:"):
                if b"chunked" in l:
                    raise ValueError("Unsupported " + str(l, "utf-8"))
            elif l.startswith(b"Location:") and not 200 <= status <= 299:
                if status in [301, 302, 303, 307, 308]:
                    redirect = str(l[10:-2], "utf-8")
                else:
                    raise NotImplementedError("Redirect %d not yet supported" % status)
            if parse_headers is False:
                pass
            elif parse_headers is True:
                l = str(l, "utf-8")
                k, v = l.split(":", 1)
                resp_d[k] = v.strip()
            else:
                parse_headers(l, resp_d)
    except OSError:
        s.close()
        raise

    if redirect:
        s.close()
        if status in [301, 302, 303]:
            return request("GET", redirect, None, None, headers, stream)
        else:
            return request(method, redirect, data, json, headers, stream)
    else:
        resp = Response(s)
        resp.status_code = status
        resp.reason = reason
        if resp_d is not None:
            resp.headers = resp_d
        return resp


def head(url, **kw):
    return request("HEAD", url, **kw)


def get(url, **kw):
    return request("GET", url, **kw)


def post(url, **kw):
    return request("POST", url, **kw)


def put(url, **kw):
    return request("PUT", url, **kw)


def patch(url, **kw):
    return request("PATCH", url, **kw)


def delete(url, **kw):
    return request("DELETE", url, **kw)

最后,用的时候要注意:

  1. 响应结果需要手工关闭,避免再发起请求时会报错。
  2. POST的数据,需要进行URL编码。
  3. 除非POST的数据是JSON,否则最好加上请求头“Content-type”。

示例代码如下:

import urequests

r = urequests.post("https://abc.com/path", data='id=123&name=apple', timeout=10, headers={'User-Agent': 'Micropython(ESP32C3)', 'Content-type': 'application/x-www-form-urlencoded'})
print(r.status_code)  # 打印相应状态,整数,正常为200
print(r.content)  # 打印响应数据
r.close() #  关闭连接

使用Termux多年了。主要是可以安装N多Linux命令,一个应用即可实现N多功能。目前最新版是0.118.0,推荐从F-Droid安装。

这里记录一下相关的初始化动作。

1 相关资料

2 设置国内源

跟大多数Linux发行版类似,软件源设置为国内镜像,有效提高安装、更新软件的效率。官方收录了软件源的中国镜像网站:

Termux Packages - Mirrors Hosted in China

我选择了清华大学的镜像:

Termux | 镜像使用帮助 | 清华大学开源软件镜像站

跟Debian类似,直接修改apt的source.list文件即可。即修改$PREFIX/etc/apt/sources.list文件的内容如下:

# The termux repository mirror from TUNA:
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-main stable main
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-root root stable
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-x11 x11 main

保存后,执行apt update,更新一下本地的缓存。

3 安装Termux工具

执行以下命令,安装Termux的相关工具。如果默认安装了,则不用再装。

pkg install termux-tools

4 开启储存访问

执行termux-setup-storage即可。对应Android的sdcard目录为~/storage/shared

5 关闭震动

本人不喜欢以震动方式提示错误,习惯关闭它。编辑文件~/.termux/termux.properties,末尾添加:

bell-character = ignore

6 设置辅助键盘(the extra keys)

官方详细说明: Termux Wiki - Extra Keys Row

值得一提,可以设置按键上滑,让辅助键盘拥有更多的功能。参考配置,修改文件~/.termux/termux.properties,末尾添加:

extra-keys = [ \
   [{key: ESC, popup: {macro: "CTRL d", display: "exit"}}, \
   {key: '/', popup: '\'}, \
   {key: '-', popup: '_'}, \
   {key: '(', popup: '{'}, \
   {key: UP, popup: PGUP}, \
   {key: ')', popup: '}'}, \
   {key: QUOTE, popup: '*'}], \
   [{key: TAB, popup: ':'}, \
   {key: CTRL}, \
   {key: ALT}, \
   {key: LEFT, popup: HOME}, \
   {key: DOWN, popup: PGDN}, \
   {key: RIGHT, popup: END}, \
   {key: APOSTROPHE, popup: '`'}] \
]

7 开启root

  • 如果系统已root,安装tsu,可以切换root用户,或者使用sudo以root用户执行命令。
pkg install tsu
sudo 命令
  • 如果Android系统没有root,安装proot,即可执行需要root权限的命令。
pkg install proot
proot 命令

8 安装vim及配置

本人习惯vim,其它编辑程序,可以选择nanoemacs等。安装命令如下:

pkg install vim

重点配置,编辑文件~/.vimrc,末尾添加:

" 解决中文显示
set fileencodings=utf-8,gb2312,gb18030,gbk,ucs-bom,cp936,latin1
set enc=utf8
set fencs=utf8,gbk,gb2312,gb18030

" 显示行号
set nu

" 颜色主题
colorscheme desert

" 语法高亮
syntax on

最后使配置生效:

source .vimrc

9 Termux相关应用

  • Termux:API

    从Termux访问Android功能。可以实现脚本控制Android。
  • Termux:Boot

    允许程序在启动时运行的Termux扩展应用。
  • Termux:Float

    在浮动终端窗口中使用Termux。
  • Termux:Styling

    自定义Termux终端的样式。
  • Termux:Tasker

    从Tasker运行Termux脚本的Tasker插件。需要配合Tasker使用。
  • Termux:Widget

    从主屏幕启动Termux命令。实现通过点击桌面图标执行相关的sh脚本。

10 其它常用命令

  • OpenSSH

    • 说明:完整的SSH客户端。
    • 安装命令:pkg install openssh
  • ADB

    • 说明:可以使用ADB连接本机,无需root权限就能执行input命令等。
    • 安装命令:pkg install android-tools

1 背景

受“新冠肺炎疫情”影响,出现不能回办公室上班的问题,所以制定一套安全的远程办公方案。

由于办公室有外网IP,原来的方案就是利用路由器的端口映射功能,把各个台式机(操作系统是Windows)的“远程桌面”端口直接映射到外网。这方案缺点如下:

  • “远程桌面”如果存在漏洞,比如绕过登录,电脑上的资料就可能被随意访问。
  • 需要远程访问的台式机,要24小时开机,否则连不上。
  • 需要远程访问的台式机,起码占用路由器的一个端口。

2 解决方案

结合SSH服务、wake on lan、远程桌面,实现更安全和灵活的远程办公。

  • 部署Linux服务器,只映射其SSH服务端口到外网,作为安全入口。
  • SSH客户端几乎覆盖所有平台(包括移动平台),且其功能强大。
  • 使用SSH的端口转发(Port Forward)功能,连上办公室内网的指定IP的“远程桌面”端口。
  • 各个台式机开启wake on lan功能,实现按需开机,工作完关机。
  • 各种操作系统有对应的远程客户端。Windows,使用微软的“远程桌面”客户端,全平台支持;Linux,使用SSH客户端;Mac操作系统,使用VNC客户端。

但是此方案仍有缺点:

  • 需要用户理解SSH及其功能。
  • 使用Linux远程开机命令(wakeonlan),即使把命令简化为Shell脚本,也不是普通人会用。
  • M系列CPU的Mac电脑,不能使用wake on lan,目前只能长期开机。

3 办公室部署

3.1 路由器

路由器的网络需要可外网访问,并且支持端口映射功能。基本路由器都支持端口映射,具体配置参考路由器说明书。

配置路由器外网端口,映射到Linux服务器的SSH服务端口。

3.2 Linux服务器

  1. 安装wakeonlan命令。

Debian或Ubuntu,执行以下命令安装

sudo apt install wakeonlan
  1. 部署SSH服务,作为安全入口。需要SSH服务的安全配置,例如:
  • 仅使用SSHv2协议

    Protocol 2
  • 禁止root用户登录。

    PermitRootLogin no
  • 禁止用户空密码登录。

    PermitEmptyPasswords no
  • 指定白名单用户。

    AllowUsers user1 user2 user3
  • 指定禁止登录的用户(一般指定白名单即可)。

    DenyUsers root user4 user5
  • 限制身份验证最大重试次数。

    MaxAuthTries 3
  • 登录用户的密码,使用强密码,甚至配置使用“密钥”验证登录。
  • 显示最后一次登录的日期和时间。

    PrintLastLog yes
  • 防止特权升级(一般默认配置)

    UsePrivilegeSeparation sandbox
  • 禁用 GSSAPI 认证

    GSSAPIAuthentication no

更详细的设置,可以搜索“Secure SSH”或者“SSH安全加固”等内容。

另外,最好配置一下服务器保持TCP连接的选项,避免客户端自动断开:

  • 开启保持TCP连接

    TCPKeepAlive yes
  • 向客户端发送是否存活的消息的时间间隔,单位是秒,默认是0,不发送

    ClientAliveInterval 30
  • 请求后客户端无响应则自动断开的最大次数

    ClientAliveCountMax 3

3.3 台式机

  1. 主板开启wake on lan功能。具体BIOS设置,需要查询主板的说明书。一般注意以下几点:

    • 板载有线网卡设置启用。
    • wake on lan设置启用。
    • 启动项,允许PCIE设备启动。
    • 启动项,出现pxe rom可选。
  2. 操作系统开启wake on lan功能。即操作系统执行关机时,让主板不要完全断电,并允许网卡运行于可接收Magic Package的状态,用于网络启动电脑。

  3. 开启远程访问服务。各个操作系统配置如下:

    • windows,开启“远程桌面”服务。
    • Linux,开启SSH服务。一般默认开启的。
    • Mac OS,开启“远程访问”服务,可以SSH客户端访问,即字符界面。
    • Mac OS,开启“远程桌面”服务,可以VNC客户端访问,即图形界面。

4 客户端部署

主要就是SSH客户端 + 远程客户端。

4.1 SSH客户端

4.1.1 Linux

一般Linux操作系统默认安装SSH客户端,如果没有,安装“OpenSSH”或者“Dropbear SSH”的客户端即可。

4.1.2 Windows

Windows 10或11可以通过“WinGet”命令安装“OpenSSH”客户端。例如:

winget install opensssh

Windows 7可以使用“PuTTY”。Windows都可以安装这个。

4.1.3 Android

可以使用“Termux”,再安装“OpenSSH”。

pkg install openssh

或者使用其它SSH客户端App。

4.1.4 iOS

安装Termius。需要注册账号,免费版可以使用SSH客户端和端口转发功能。

4.2 远程桌面客户端

  • Windows,自带“远程桌面”客户端。
  • Linux,推荐安装“Remmina”。
  • Android,安装微软官方“远程桌面”App。
  • iOS,安装微软官方“远程桌面”App。

4.3 VNC客户端

  • Windows,使用开源的“TightVNC”。
  • 其它,待补充。

5 客户端使用

以Windows远程桌面为例,其默认端口为3389,并假设该台式机的IP为192.168.0.123。其它服务类似操作。

  1. 远程开机。

    启动SSH客户端并登录,使用wakeonlan命令 + MAC地址,启动对应的台式机。注意,需要记录该台式机有线网卡的MAC地址。
  2. 开启端口转发。

    启动SSH客户端,设置本地端口(例如 43389)转发到办公室内网指定电脑端口(例如 192.168.0.123:3389)。
  3. 连接远程桌面。

    远程桌面客户端连接到本机端口(例如 127.0.0.1:43389),即可访问。如果是管理员帐号登录,需勾选“管理员模式”。

5.2 远程开机

普通用户执行wakeonlan命令,参数是对应台式机网卡的MAC地址。然后使用ping命令,检查该台式机是否开机成功。

要注意,Windows操作系统,不要使用shutdown /s命令关机,会导致wakeonlan命令无法开机。

5.3 开启端口转发

假设,办公室的外网域名为remote.office.com,SSH映射外网端口为22222,SSH登录用户为r-user,需要通过访问192.168.0.123:3389的“远程桌面”服务,并且本机开启43389端口去访问。

5.3.1 SSH命令

使用SSH客户端(例如OpenSSH客户端)的,直接执行以下命令,然后输入密码,让其一直运行即可。

ssh -f -N -L 43389:192.168.0.123:3389 r-user@remote.office.com -p 22222 -o ServerAliveInterval=30

关键参数说明如下:

  • -f后台运行。
  • -N不执行命令。
  • -L 43389:192.168.0.123:3389是把本机43389端口转发到办公室内网的192.168.0.123:3389端口。
  • -o ServerAliveInterval=30是每30秒向服务器发生一条表示客户端存活的消息,用于保持连接。

关于客户端保持连接,可以修改/etc/ssh/ssh_config文件,在Host *的配置下,加入以下配置。然后运行ssh命令,不用加上-o ServerAliveInterval=30这个参数。

ServerAliveInterval 30
ServerAliveCountMax 3

5.3.2 PuTTY设置

  1. 点Category -> Session,在Host name填remote.office.com,Port填22222,Connection Type选SSH。
  2. 点Category -> Connection -> Data,在Auto-login username填r-user
  3. 点Category -> Connection -> SSH -> Tunnels,Add new forward port下,Source port填43389,Destination填192.168.0.123:3389,勾选下面的“Local”和“Auto”,再点“Add”。
  4. 点Category -> Connection,在Seconds between keepalives (0 to turn off)填10,并勾选Enable TCP keepalives (SO_KEEPALIVE option)选项。这一步是设置客户端保持连接。
  5. 点Category -> Session,在Saved Sessions填remote_office,再点“Save”保存配置。
  6. 连接时,点Category -> Session,选中remote_office,点“Open”。输入密码后让其保持运行即可。

5.3.3 iOS设置Termius

  1. 安装Termius,并注册账户。
  2. 设置保持后台运行。

    • 在Settings -> SESSIONS -> 开启”Active Connect Saver“和”Save Location Data“。
    • 据说是使用了“获取地理位置”权限,实现App保持后台运行。
  3. 新建Hosts。

    • 填写连接到办公室的域名remote.office.com和SSH端口22222,然后命名为remote_office
  4. 新建Port Forwarding。

    • 在Port Forwarding,点“+”新建。
    • -> 选Local,点Continue。
    • ->“Set the local port and binding address”的Port number填写映射到本机的端口,例如3389,点CONTINUE。
    • -> 点Select a host,并选sdoffice。
    • -> “Set the destination host”填写目标电脑的内网IP和远程桌面端口,例如address为192.168.0.123,port为3389,点CONTINUE。
    • -> 最后填写标签,例如101-rdp,点DONE
  5. 连接。

    • 在Port Forwarding,长按101-rdp,点Connect。

5.4 远程桌面客户端

添加电脑,电脑名称为127.0.0.1:43389。如果是使用管理员账号,记得开启“管理员模式”。

6 其它方案

6.1 前端安全替代

  1. 使用虚拟内网,即VPN。连上VPN就等于进入办公室内网。

    • Android和iOS原生支持L2TP、IPSec、IKEv2等协议,不用安装客户端。
    • 路由器同样只需映射VPN服务的端口。
  2. 使用堡垒机做入口。

    • JumpServer。未了解。
    • Next Terminal。了解过,当前版本安全方面考虑不足,手机访问“远程桌面”不支持触屏等。
  3. 其它商业解决方案

    • TeamViewer
    • 向日葵远程控制软件

6.2 网络启动功能替代

可以使用WiFi开关 + 电脑通电启动,实现替代,但需要购买WiFi开关硬件。