2023年

Oracle接收了Java,主要为了赚钱,确实不是好事。但值得安慰的是,Java,或者说OpenJDK,还是保持开源(基于GPLv2)。

对JDK的选择,需关注:

  • Java 17,Oracle声称,Oracle JDK和OpenJDK,都是采用同一套源码构建。
  • Oracle JDK需要关注商业使用的授权问题。
  • Oracle只提供OpenJDK最新源码,不提供最新构建版的下载,导致冒出很多JDK 17的构建版本。

参考:

结论:

  1. 最佳选择:Adoptium Eclipse Temurin。其官网:Adoptium
  2. Linux分发版,一般直接安装其维护的OpenJDK版本即可。

补充:

Linux系统,使用initd时,想要开机启动时执行指定Shell脚本,只需修改/etc/rc.local。现在的分发版基本改用Systemd,找不到rc.local了。要实现类似的功能,有两个方案:

  1. 启用rc-local.service服务。
  2. 创建一个开机启动的服务。

推荐方案2,可以设置服务关联。比如设置网络服务启动后才执行指定脚本。

以Debian 11为例,记录一下两个方案的具体操作。

方案1,启用rc-local.service服务

  1. 创建/etc/rc.local文件,并设置执行权限
cat <<EOF >/etc/rc.local
#!/bin/sh -e
# run at the end of each multiuser runlevel

exit 0 EOF
chmod +x /etc/rc.local

exit 0之前,编写要执行的Shell脚本。一定不要删除结尾的exit 0

  1. 启用服务
sudo systemctl enable --now rc-local

方案2,创建开机启动服务

就是配置一个Systemd服务,并设置开机启动。其启动的代码,就是执行指定Shell脚本。

  1. 创建配置文件。

在目录/etc/systemd/system,创建文件startup.service(文件名可自取),并添加以下内容:

#################################
# Run at startup
#################################

[Unit]
Description=Run the shell script at system startup.
Wants=network-online.target
After=network.target network-online.target

[Service]
Type=simple
WorkingDirectory=/opt/startup
ExecStart=/opt/startup/run.sh
User=root

[Install]
WantedBy=multi-user.target

其中ExecStart就是配置要执行的Shell脚本。这里假设Shell脚本需要联网后才能执行。

  1. 启用等待网络正常启动的服务。

服务配置依赖network-online.target时,需要开启systemd-networkd-wait-online.service服务,才能确保在网卡正确启动并获取IP地址后执行。

sudo systemctl enable systemd-networkd-wait-online.service
  1. 设置开机启动。
sudo systemctl daemon-reload
sudo enable startup.service

注:升级Chrome 114.0.5735.90后,问题已解决。

Lunbuntu 22.04.2,昨晚升级安装系统升级包后,Chrome浏览器渲染页面不正常了。具体是,页面背景色正常,布局正常,也能显示一些布局样式,但完全不显示相关文字,甚至有些图片也不能显示。

尝试Shell里直接运行/opt/google/chrome/chrome,会弹出以下信息:

MESA-INTEL: warning: Haswell Vulkan support is incomplete

应该是显卡与其驱动,不支持Chrome的新特性吧。

于是找到这个文章:Linux: Chrome Starts Very Slowly After Enable Nvidia Driver

根据文章的相关内容,尝试出三个可以正常渲染页面的Chrome浏览器的启动参数(如下)。

启动参数说明效果
--disable-gpu禁用GPU硬件加速。正常
--disable-gpu-driver-bug-workarounds禁用各种GPU驱动程序错误的解决方法。正常
--enable-features=Vulkan开启Vulkan特性可以渲染页面,但有些图片显示为黑块

最后的解决方案:

  1. 选用启动参数--disable-gpu-driver-bug-workarounds
  2. 修改/usr/bin/google-chrome文件,在最后一行exec -a "$0" "$HERE/chrome" "$@"中添加启动参数。即该文件的最后一行修改为:
#exec -a "$0" "$HERE/chrome" "$@"
exec -a "$0" "$HERE/chrome" --disable-gpu-driver-bug-workarounds "$@"

另外,关于Chrome浏览器的各个启动参数,有个文章整理得很详细:List of Chromium Command Line Switches

当今时代,代码版本管理,一般推荐“Git”。但是工作用到“Subversion”,即“SVN”,所以记录一下相关部署。

1. 概述

在Debian 11上部署SVN(即Subversion)服务,一般按以下步骤执行即可。

  • 安装。Debian使用apt命令即可。
  • 配置。包括创建运行用户、建立部署文件夹、设置Systemd自动启动。
  • “仓库”管理。包括“仓库”的创建、备份、导出、迁移等。
  • “踩坑”。记录遇到的问题。

参考

2. 安装

通过apt命令安装“subversion”即可。

sudo apt install subversion

“subversion”包含3类程序:

  • 服务程序

    • svnserve:提供SVN的网络服务。
    • svnsync:仓库镜像工具,仓库都可以是远程的或本地的。
  • 管理程序

    • svnadmin:管理SVN仓库。
    • svnlook:查看SVN仓库,纯只读。
    • svnrdump:远程仓库数据迁移。相当于svnadmin dumpsvnadmin load的网络版。
    • svndumpfilter:历史过滤工具,从SVN的转储文件中移除某些历史 (即版本号)。
  • 客户端程序

    • svn:SVN命令行客户端。
    • svnversion:显示工作副本版本信息。
    • svnmucc:多URL命令行客户端,允许用户在没有工作副本的情况下,向仓库提交任意修改。

以上程序的使用,可参考:Subversion 命令行参考手册

3. 配置

  • 创建系统用户“svn”,用于运行SVN服务。
useradd -r -M -s /bin/false svn
  • 创建SVN“仓库”的路径。
sudo mkdir -p /opt/svn/repos
sudo chown svn /opt/svn/repos
  • 创建Systemd的服务配置文件/etc/systemd/system/svnserve.service。其内容如下:
[Unit]
Description=Subversion protocol daemon
After=syslog.target network.target

[Service]
Type=forking
RuntimeDirectory=svnserve
PIDFile=/run/svnserve/svnserve.pid
EnvironmentFile=/etc/default/svnserve
ExecStart=/usr/bin/svnserve $DAEMON_ARGS
User=svn
Group=svn
KillMode=control-group
Restart=on-failure

[Install]
WantedBy=multi-user.target
  • 创建设置SVN服务启动参数的配置文件/etc/default/svnserve。其内容如下:
# svnserve options
DAEMON_ARGS="--daemon --pid-file /run/svnserve/svnserve.pid --root /srv/svn/repos --log-file /var/log/svnserve/svnserve.log"
  • 创建Log日志文件目录。
sudo mkdir /var/log/svnserve
sudo chown svn /var/log/svnserve
  • 创建Log日志文件自动转存的配置文件/etc/logrotate.d/svnserve。设置每天按天进行转存并压缩日志,最多保留最近30天。其内容如下:
/var/log/svnserve/*.log {
    daily
    missingok
    rotate 30
    compress
    notifempty
    create 640 svn adm
    sharedscripts
    postrotate
            if /bin/systemctl status svnserve > /dev/null ; then \
                /bin/systemctl restart svnserve > /dev/null; \
            fi;
    endscript
}
  • 最后SVN服务设置为自动开机启动,并立即启动它。
sudo systemctl daemon-reload
sudo systemctl enable svnserve.service
sudo systemctl start svnserve.service

4. 仓库管理

SVN的“仓库”(Repository),是一系列需要进行版本管理的目录和文件的管理单位。例如一个包含几个子项目的大型项目。“仓库”没有限制组织结构,只是以树状形式保存相关目录和文件。

  • 创建“仓库”。在所有“仓库”的存放目录下,建立指定“仓库”目录“project1”。
sudo -u svn svnadmin create /opt/svn/repos/project1
  • 设置“仓库”访问权限。修改仓库的“conf”目录下的文件。

    • svnserve.conf,配置“仓库”的认证、授权策略、访问控制等。
    • authz-db,“仓库”的访问控制文件,包括配置可访问“仓库”的用户。
    • password-db,可访问“仓库”的用户的密码。
  • 迁移“仓库”。把“仓库”导出,再导入到另一个新的仓库。可用于升级SVN服务软件后,把已有“仓库”迁移过去。
# 导出仓库project1
sudo -u svn svnadmin dump project1 > project1.dump

# 导入到仓库project2。project2需要提前新建。
sudo -u svn svnadmin load project2 < project1.dump
  • 备份“仓库”。使用“svnadmin hotcopy”命令,对“仓库”进行在线备份(不用担心是否还有其他进程在访问“仓库”),并能够在原“仓库”出现故障时顶替上去。此命令也可用于不同机器的同版本SVN服务之间迁移数据。
# 执行备份
sudo -u svn mkdir -p /var/svn/backups/ 
sudo -u svn svnadmin hotcopy /opt/svn/repos/project1 /var/svn/backups/project1.bak

# 执行还原
sudo -u svn svnadmin hotcopy /var/svn/backups/project1.bak /opt/svn/repos/project1

5. 踩坑

5.1. 默认权限不能查看历史记录

使用svnadmin create创建“仓库”后,默认不能查看历史记录。需要修改“仓库”下的conf/svnserve.conf,设置以下两个配置项,并重启SVN服务:

anon-access = none
auth-access = write

5.2. 影响MySQL部署

Debian 11的服务器上,通过MySQL官方源安装了“mysql-server”服务,再安装“subversion”后,MySQL的配置文件被改为指向了MariaDB的(如下),导致MySQL服务不能运行。手工修正解决问题。

/etc/alternatives/my.cnf -> /etc/mysql/mariadb.cnf

由于需要开发移动端App,有机会用上了Apple M1 CPU的Mac Mini。对比过Windows 11、Ubuntu 22.04,感觉是目前最好的 App开发机。

1. 优点

1.1. 唯一支持两大移动平台

没办法,Build iOS App和提交Apple Store,都只能使用Xcode ,也就离不开Mac OS。

Android就随意了。

1.2. 支持两大平台的模拟器,且很爽

支持iOS和Android的模拟器,除了Android SDK下载镜像慢,创建很快、运行很流畅。能满足大部分需要兼容多个系统版本的开发测试的场景。相比购买和使用一堆实体机器进行开发测试,这个方便太多。

但是模拟器具体有哪些不支持的功能,需要使用实体机验证的,暂时没遇到。

2. 缺点

2.1. 快捷键不习惯

估计Mac OS新手用户,特别是从Windows或Linux切换过来的,都有这个问题吧。网上有解决方案,调整快捷键。但我选择了去适应它。

2.2. 随机弹出屏幕保护程序

这个屏幕保护程序,即使设置了关闭,也会弹出。主要是使用VNC远程时关不掉,导致不能进入桌面。解决方案见“附录”。

2.3. 使用 VNC 远程桌面比较卡

由于使用非Mac的电脑远程过去,只能使用VNC。用Mac远程到Mac,可能是另一种体验。

2.4. 不支持多人同时远程桌面

由于习惯了Windows、Linux的服务器,可以多人同时远程上去,特别是Windows远程桌面可以多人同时使用,所以提出这个问题。某个程序员的社交网站发过贴请教,结果被吐槽为什么不是人手一台Mac……当然,人手一台Mac的话,就不会有这个应用场景和问题了。

另外,Mac的SSH服务是可以多人同时登录。

3. 附录

踩过的坑,参考:Headless Mac Mini 折腾记

3.1 远程访问

最好是同时开启屏幕共享(vnc)远程登录(ssh)。遇到万不得已的情况,可以SSH进去执行sudo reboot重启。

3.2 关闭屏幕保护

在图形界面“设置”关闭了屏幕保护后,仍然会随机运行。可使用以下命令设置关闭:

sudo defaults write /Library/Preferences/com.apple.screensaver loginWindowIdleTime 0

如果屏幕保护程序在运行且不能退出的情况,可以使用以下命令(可以通过SSH执行)去关闭其进程,实现退出屏幕保护:

killall ScreenSaverEngine

1 背景

烘焙咖啡豆的过程中,为了更好地测量和记录温度及其变化,使用ESP32C3制作了一个温度监控模块。

2 需求

如果从烘焙咖啡豆的角度去考虑,参照商用机器的设计,这个温度监测会有很多功能要实现。作为起始的设计,还是先简化需求,逐步实现更多功能。所以第一版先确定以下需求:

  • 计时。这个实现起来简单,记录的温度也需要跟时间关联。
  • 读取测量出的温度值。用于展示、分析、记录等。
  • 展示温度变化。显示当前温度值,和温度与时间的曲线。

3 设计

3.1 元件

  • 主控:ESP32C3-Core,刷上MicroPython固件

    • 此开发板很廉价,最低9.9 RMB包邮。
    • 支持WiFi和蓝牙,数据可以很方便地同步到其它设备。
    • 基于MicroPython开发程序,调试很方便。
  • 温度检测模块:MAX6675,K型热电偶温度传感模块,SPI接口。

    • 廉价。
    • 最低精度为0.25 °C,够用。
  • 温度检测探头:K型铠装热电偶,探头为可弯曲、接壳式、304不锈钢材质。

    • 这个探头比温度检测模块附送的灵敏很多。
    • 一般型号为WRNK191。
    • 参考规格:直径1mm,插深50mm,线长500mm。
  • 显示模块:SSD1306,单色OLED屏,0.96英寸,分辨率128x64,两线I2C接口。

    • 廉价。
    • I2C接口的数据线只有两根,减少GPIO的占用。

3.2 接线

这里忽略电源(VCC 3.3V)和接地(GND)的连接。详细如下:

ESP32C3的接口模块接口
GPIO10MAX6675SO
GPIO02MAX6675SCK
GPIO12MAX6675CS
GPIO05SSD1306SCL
GPIO04SSD1306SDA

接线不是固定的,可以根据实际调整,但是要改main.py的对应配置。

4 程序

相关代码文件:

  • max6675.py:MAX6675的驱动程序。
  • ssd1306.py:SSD1306的驱动程序。
  • series_list.py:温度数据类,方便后面扩展对温度数据的保存。
  • main.py:主程序。

4.1 MAX6675的驱动程序

文件名max6675.py

# from https://github.com/BetaRavener/micropython-hw-lib/blob/master/MAX6675/max6675.py

import time


class MAX6675:
    MEASUREMENT_PERIOD_MS = 220

    def __init__(self, sck, cs, so):
        """
        Creates new object for controlling MAX6675
        :param sck: SCK (clock) pin, must be configured as Pin.OUT
        :param cs: CS (select) pin, must be configured as Pin.OUT
        :param so: SO (data) pin, must be configured as Pin.IN
        """
        # Thermocouple
        self._sck = sck
        self._sck.off()

        self._cs = cs
        self._cs.on()

        self._so = so
        self._so.off()

        self._last_measurement_start = 0
        self._last_read_temp = 0
        self._error = 0

    def _cycle_sck(self):
        self._sck.on()
        time.sleep_us(1)
        self._sck.off()
        time.sleep_us(1)

    def refresh(self):
        """
        Start a new measurement.
        """
        self._cs.off()
        time.sleep_us(10)
        self._cs.on()
        self._last_measurement_start = time.ticks_ms()

    def ready(self):
        """
        Signals if measurement is finished.
        :return: True if measurement is ready for reading.
        """
        return time.ticks_ms() - self._last_measurement_start > MAX6675.MEASUREMENT_PERIOD_MS

    def error(self):
        """
        Returns error bit of last reading. If this bit is set (=1), there's problem with the
        thermocouple - it can be damaged or loosely connected
        :return: Error bit value
        """
        return self._error

    def read(self):
        """
        Reads last measurement and starts a new one. If new measurement is not ready yet, returns last value.
        Note: The last measurement can be quite old (e.g. since last call to `read`).
        To refresh measurement, call `refresh` and wait for `ready` to become True before reading.
        :return: Measured temperature
        """
        # Check if new reading is available
        if self.ready():
            # Bring CS pin low to start protocol for reading result of
            # the conversion process. Forcing the pin down outputs
            # first (dummy) sign bit 15.
            self._cs.off()
            time.sleep_us(10)

            # Read temperature bits 14-3 from MAX6675.
            value = 0
            for i in range(12):
                # SCK should resemble clock signal and new SO value
                # is presented at falling edge
                self._cycle_sck()
                value += self._so.value() << (11 - i)

            # Read the TC Input pin to check if the input is open
            self._cycle_sck()
            self._error = self._so.value()

            # Read the last two bits to complete protocol
            for i in range(2):
                self._cycle_sck()

            # Finish protocol and start new measurement
            self._cs.on()
            self._last_measurement_start = time.ticks_ms()

            self._last_read_temp = value * 0.25

        return self._last_read_temp

4.2 SSD1306的驱动程序

文件名ssd1306.py

这个驱动继承了FrameBuffer类,显示文字或绘图的方法,参考FrameBuffer类的文档即可(代码有说明)。

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
# from https://github.com/micropython/micropython-lib/blob/master/micropython/drivers/display/ssd1306/ssd1306.py

from micropython import const
import framebuf


# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)

# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        self.buffer = bytearray(self.pages * self.width)
        super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
        self.init_display()

    def init_display(self):
        for cmd in (
            SET_DISP,  # display off
            # address setting
            SET_MEM_ADDR,
            0x00,  # horizontal
            # resolution and layout
            SET_DISP_START_LINE,  # start at line 0
            SET_SEG_REMAP | 0x01,  # column addr 127 mapped to SEG0
            SET_MUX_RATIO,
            self.height - 1,
            SET_COM_OUT_DIR | 0x08,  # scan from COM[N] to COM0
            SET_DISP_OFFSET,
            0x00,
            SET_COM_PIN_CFG,
            0x02 if self.width > 2 * self.height else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV,
            0x80,
            SET_PRECHARGE,
            0x22 if self.external_vcc else 0xF1,
            SET_VCOM_DESEL,
            0x30,  # 0.83*Vcc
            # display
            SET_CONTRAST,
            0xFF,  # maximum
            SET_ENTIRE_ON,  # output follows RAM contents
            SET_NORM_INV,  # not inverted
            SET_IREF_SELECT,
            0x30,  # enable internal IREF during display on
            # charge pump
            SET_CHARGE_PUMP,
            0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01,  # display on
        ):  # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()

    def poweroff(self):
        self.write_cmd(SET_DISP)

    def poweron(self):
        self.write_cmd(SET_DISP | 0x01)

    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)

    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))

    def rotate(self, rotate):
        self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
        self.write_cmd(SET_SEG_REMAP | (rotate & 1))

    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width != 128:
            # narrow displays use centred columns
            col_offset = (128 - self.width) // 2
            x0 += col_offset
            x1 += col_offset
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_data(self.buffer)


class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        self.write_list = [b"\x40", None]  # Co=0, D/C#=1
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.temp[0] = 0x80  # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)

    def write_data(self, buf):
        self.write_list[1] = buf
        self.i2c.writevto(self.addr, self.write_list)


class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        import time

        self.res(1)
        time.sleep_ms(1)
        self.res(0)
        time.sleep_ms(10)
        self.res(1)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(0)
        self.cs(0)
        self.spi.write(bytearray([cmd]))
        self.cs(1)

    def write_data(self, buf):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(1)
        self.cs(0)
        self.spi.write(buf)
        self.cs(1)

4.3 温度数据类

文件名series_list.py

想模拟成时序数据库,目前只是保存了屏幕可以显示的数据数量。

class SeriesList:
    
    def __init__(self, maxLen: int, firstVal: float):
        self._maxLen = 1 if maxLen <= 0 else maxLen
        self._list = [firstVal]
        self._len = len(self._list)
    
    def append(self, val: float):
        self._list.append(val)
        if (self._len + 1) > self._maxLen:
            delVal = self._list.pop(0)
        else:
            self._len += 1
    
    def last(self, index: int = 0) -> int:
        return self._list[self._len - index -1]
    
    def histogram(self, maxRange: int) -> list:
        hList = self._list.copy()
        hMax = int(max(self._list))
        hMin = int(min(self._list))
        hRange = hMax - hMin + 1
        rate = 1 if hRange <= maxRange else (maxRange / hRange)
        if rate == 1:
            for i, v in enumerate(hList):
                hList[i] = int(v) - hMin
        else:
            for i, v in enumerate(hList):
                hList[i] = int((int(v) - hMin) * rate)
        return hList

4.4 主程序

文件名main.py

总结一下:

  • 使用了定时任务(Timer)去探测温度和更新显示,每秒执行一次。
  • 每次执行最大耗时约150毫秒,绘画曲线的代码可以再优化。
  • 绘画曲线时,如果把整个区域置黑,再画曲线,性能比较高,但是会闪屏。目前是逐列置黑,再画,虽然慢了点,但观感良好。
  • 展示的曲线是为了直观看到温度变化,如果显示范围内的温度差超过屏幕区域的分辨率,曲线会按比例压缩。
from micropython import const
import time
from machine import Pin, SoftI2C, Timer
from max6675 import MAX6675
from ssd1306 import SSD1306_I2C
from series_list import SeriesList

UNIT_60 = const(60)
SCREEN_W = const(128)
SCREEN_H = const(64)
HISTOGRAM_X = const(0)
HISTOGRAM_Y = const(10)
HISTOGRAM_W = const(SCREEN_W)
HISTOGRAM_H = const(SCREEN_H - HISTOGRAM_Y)

""" init MAX6675 ################################## """
print('init MAX6675')
so = Pin(10, Pin.IN) # GPIO10
sck = Pin(2, Pin.OUT) # GPIO02
cs = Pin(12, Pin.OUT) # GPIO12
max = MAX6675(sck, cs, so)
time.sleep(1)
curTemp = max.read()

""" init OLED ################################## """
print('init OLED')
i2c = SoftI2C(scl=Pin(5), sda=Pin(4))
oled = SSD1306_I2C(SCREEN_W, SCREEN_H, i2c)

""" init Ticks ################################## """
ticksStart = time.ticks_ms()

""" init Temperature list ################################## """
sList = SeriesList(SCREEN_W, curTemp)

""" init Timer ################################## """
def timerRefresh(t):
    global max, curTemp, oled, timeSec, sList
    
    duration = 0 if ticksStart==None else int(time.ticks_diff(time.ticks_ms(), ticksStart) / 1000)
    dSec = duration % UNIT_60
    dMin = int(duration / UNIT_60)
    
    curTemp = max.read() # Current temperature
    sList.append(curTemp)
    
    oled.fill(0)
    oled.text('{:02d}:{:02d} | {:>6.2f} C'.format(dMin, dSec, curTemp) , 0, 0)
    hList = sList.histogram(HISTOGRAM_H)
    hLen = len(hList)
    hPre = 0
    hCur = 0
    startX = SCREEN_W - hLen
    startY = HISTOGRAM_Y + HISTOGRAM_H - 1
    for i, v in enumerate(hList):
        hPre = v if i == 0 else hCur
        hCur = v
        oled.vline(startX + i, startY - hCur, (hCur - hPre if hPre < hCur else 1), 1)
        if hPre > hCur:
            oled.vline(startX + i - 1, startY - hPre, hPre - hCur, 1)
    oled.show()

timerTemp = Timer(0)
timerTemp.init(period=500, mode=Timer.PERIODIC, callback=timerRefresh) # Every 1 second

print('start run')

春节前,以35rmb入手了Kido x3儿童智能手表,刷上基于官方系统修改的第三方ROM,尝试成为日常通讯设备,以失败告终。

1. 背景

随着时代的变化,现在已经日常使用两个手机SIM卡。一个是“保号卡”,以低价月租套餐保留手机号,一般用于注册、认证、社交等。另一个是“流量卡”,主要使用大流量套餐,子卡互打免费等。这个方案方便更换更优惠的手机流量套餐,同时避免更换手机号。

然后就想,是否可以把“保号卡”插上轻量化设备,比如可插卡的智能手表,避免别人联系不上。此时,“流量卡”可以插上其它智能设备,比如平板、笔记本电脑之类。

决定入手Kido x3,去实行这个想法。

2. 硬件介绍

Kido x3相关介绍视频:

其配置不算突出,吸引我的地方主要是:

  • 可破解,有定制系统。
  • 可插SIM卡,支持4G LTE和WiFi。
  • 价格便宜,当时只要35rmb。
  • 电池容量为900mAh,比同类产品高。
  • 防水级别为IP68,日常生活防水。

其详细配置如下:

  • CPU: Qualcomm® Snapdragon Wear™ 2100
  • 内存: 512 MB
  • 闪存: 4 GB(用户可用不到2GB)
  • 电池: 900 mAh
  • 重量: 57.4g
  • 屏幕: 1.55英寸IPS,320 * 360像素
  • 按键: 两个
  • 摄像头: 侧边500万像素、前置200万像素
  • 移动网络: 4G全网通(可插1个SIM卡)
  • WiFi: 只支持2.4GHz
  • 蓝牙: (未知)
  • GPS: L1+L5 Dual-Frequency GPS
  • 防水级别: IP68
  • 充电: 磁吸线、无线充
  • 系统:Android 7.1.1

3. 刷机

主要思路是,重启进入高通9008模式,利用高通刷机工具,刷入userdata、system、boot三个分区。

刷机前,需要制作数据线,把手表背面的4个触点连上USB 2.0的线。

具体过程,可参照ROM维护者的说明文档。

4. 缺点

  1. 定制系统优化不足。

由于原版系统没有开源,只能在原版ROM基础上修修补补。虽然可以获得root权限,但是改得有限。最致命的缺点是不够省电。虽然其电池容量比同类要大,但是开启4G流量后电量雪崩,而且是关屏待机状态,也不能坚持一天。对比孩子的“小天才”电话手表,启用流量后,能待机两天多。

还出现过致命问题,别人打不进电话!虽然重启后恢复正常,但完全没办法知道什么时候又不行了。

  1. 配套软件不足。

主要是屏幕太小,而一般App都是针对手机设计,用起来就是手机屏幕缩小的效果。缓解的办法是,系统设置三击屏幕后放大显示。

虽然有一些小市场收集了专门的App,但是App数量实在少得可怜。

  1. 抬腕操作很累。

抬腕看手表屏幕的动作,持续太久,手臂会累。另外,手表也有点重,毕竟电池容量大,日常佩戴会感到累。

  1. 一般功能的缺失。

很多传感器,比如计步、GPS等,是没有的。估计是驱动问题吧。

5. 收获

  1. 手环更实用

使用“小米手环3 NFC”已经好几年了。没有连上手机,只是作为手表(看时间)、闹钟、计步、久坐提醒、公交卡等,只需一个月充电一次。相比之下,这个手表相差太远。

  1. 体会到儿童手表的优化

以前一直很鄙视那些系统版本低下的儿童手表,例如某品牌基于Android 4.4改造还卖得贼贵。但用过才知道,儿童手表都做了大量优化,特别是省电方面的。比如我孩子的“小天才 Q2”,开启4G流量后,仍能待机两天多。另外,UI优化、配套App等,就不用多说了。

  1. 表带螺丝的替代

由于到手的kido x3只有本体,没有表带。即使购买了合适的表带,也找到合适的表带螺丝。后来才了解到,可以使用“舌钉”、“乳环”之类代替表带螺丝,确实刷新了对世界的认识(奇怪的知识有增加了)。

  1. 破解系统

kido x3的背面,4个触点,就是对应USB 2.0的四条线。接上USB后,就可以使用ADB连上系统。不少儿童手表都可以用这个方法破解。另外,这个手表是基于高通的CPU,也有专用工具刷入分区镜像,实现破解。

  1. 儿童微信

用过儿童微信,不算好用,但是提供了一个实现微信分身的方法。同一个微信号,可以同时登录手机和儿童微信,而且各不影响接收消息。不过安装儿童微信需要license,可从相关的儿童手表获取,或者直接到“咸鱼”购买。

  1. 涨价

可能由于视频的宣传带动,这货居然涨价了。折腾一番后,仍能高于原价卖出,真好~

本来CFO想帮孩子报个关于“竹子“的研学活动,包括挖春笋和做竹筒饭,收费280rmb/人。抠门的她越想越不对劲,本着“我也可以”的想法,自己策划了一场类似的活动。

研学活动的策划如下:

1. 背景

竹子除了用途广泛,还可以食用,深受国人喜爱。特别是农村,都会自家种一些。

2. 第一课时,采竹

  • 地点:种有竹子的山丘、公园
  • 内容:亲身了解竹子的生长环境,挖竹笋、砍竹子
  • 收获:

    • 体验挖竹笋,了解竹笋可食。不能挖的,可菜市场购买竹笋,再现场讲解。
    • 砍了几条竹枝。
    • 收获粗竹子一根(可淘宝购买)。

3. 第二课时,制竹

  • 地点:有编织竹子制品的店铺。
  • 内容:参观竹制品店铺,了解竹子的用途。
  • 收获:

    • 利用上一课时砍的竹枝,制作吸管。
    • 淘宝购买竹制品,因为实体店比较贵。

4. 第三课时,煮竹

  • 地点:可烧烤的阳台、烧烤场、露营场地等。
  • 内容:

    • 劈柴,把干竹枝劈开。
    • 生火,利用劈好的竹枝生火。
    • 煮竹筒饭,把粗竹子锯成段,放入米、水、切片的广式腊肠,放火上烤。
  • 收获:

    • 竹筒饭带着新鲜竹香,很好吃。
    • 体会到竹筒饭还是蒸的靠谱,不会烧焦。最好顺便搞场烧烤。

5. 总结

  • 幼儿园或小学1、2年级,以口述、绘画形式做总结。
  • 小学3到6年级,以记叙文、日记、书信、手抄报等形式做总结。

为了在MicroPython上实现Web服务,找了几个框架,最终选择tinyweb

tinyweb主要是小巧、符合编程习惯、做好错误处理、实现了HTTP 1.0基本功能等。相关信息如下:

但是tinyweb采用logging做log记录,而logging依赖的库有点多,于是改为使用ulooger。修改后的源码如下:

"""
Tiny Web - pretty simple and powerful web server for tiny platforms like ESP8266 / ESP32
MIT license
(C) Konstantin Belyalov 2017-2018

- project: https://github.com/belyalov/tinyweb
- source code: https://github.com/belyalov/tinyweb/blob/master/tinyweb/server.py
- version: e92546054b905de46f42157282677f56764fb2f0

edit: replace logging with ulogger
"""
import ulogger
import uasyncio as asyncio
import uasyncio.core
import ujson as json
import gc
import uos as os
import sys
import uerrno as errno
import usocket as socket


#log = logging.getLogger('WEB')
class UtcClock(ulogger.BaseClock):
    def __call__(self) -> str:
        # UTC date time: (year, month, mday, hour, minute, second, weekday, yearday)
        dt = time.gmtime()
        return f'{dt[0]}{dt[1]:02d}{dt[2]:02d}_{dt[3]:02d}{dt[4]:02d}{dt[5]:02d}'

handler_to_term = ulogger.Handler(
    level=ulogger.INFO,
    colorful=True,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_TERM,
)

handler_to_file = ulogger.Handler(
    level=ulogger.INFO,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_FILE,
    file_name="tinyweb.log",
    max_file_size=102400 # max for 100KB
)
logger = ulogger.Logger(
    name = __name__,
    handlers = (
        handler_to_term,
        handler_to_file
    )
)

type_gen = type((lambda: (yield))())

# uasyncio v3 is shipped with MicroPython 1.13, and contains some subtle
# but breaking changes. See also https://github.com/peterhinch/micropython-async/blob/master/v3/README.md
IS_UASYNCIO_V3 = hasattr(asyncio, "__version__") and asyncio.__version__ >= (3,)


def urldecode_plus(s):
    """Decode urlencoded string (including '+' char).

    Returns decoded string
    """
    s = s.replace('+', ' ')
    arr = s.split('%')
    res = arr[0]
    for it in arr[1:]:
        if len(it) >= 2:
            res += chr(int(it[:2], 16)) + it[2:]
        elif len(it) == 0:
            res += '%'
        else:
            res += it
    return res


def parse_query_string(s):
    """Parse urlencoded string into dict.

    Returns dict
    """
    res = {}
    pairs = s.split('&')
    for p in pairs:
        vals = [urldecode_plus(x) for x in p.split('=', 1)]
        if len(vals) == 1:
            res[vals[0]] = ''
        else:
            res[vals[0]] = vals[1]
    return res


class HTTPException(Exception):
    """HTTP protocol exceptions"""

    def __init__(self, code=400):
        self.code = code


class request:
    """HTTP Request class"""

    def __init__(self, _reader):
        self.reader = _reader
        self.headers = {}
        self.method = b''
        self.path = b''
        self.query_string = b''

    async def read_request_line(self):
        """Read and parse first line (AKA HTTP Request Line).
        Function is generator.

        Request line is something like:
        GET /something/script?param1=val1 HTTP/1.1
        """
        while True:
            rl = await self.reader.readline()
            # skip empty lines
            if rl == b'\r\n' or rl == b'\n':
                continue
            break
        rl_frags = rl.split()
        if len(rl_frags) != 3:
            raise HTTPException(400)
        self.method = rl_frags[0]
        url_frags = rl_frags[1].split(b'?', 1)
        self.path = url_frags[0]
        if len(url_frags) > 1:
            self.query_string = url_frags[1]

    async def read_headers(self, save_headers=[]):
        """Read and parse HTTP headers until \r\n\r\n:
        Optional argument 'save_headers' controls which headers to save.
            This is done mostly to deal with memory constrains.

        Function is generator.

        HTTP headers could be like:
        Host: google.com
        Content-Type: blah
        \r\n
        """
        while True:
            gc.collect()
            line = await self.reader.readline()
            if line == b'\r\n':
                break
            frags = line.split(b':', 1)
            if len(frags) != 2:
                raise HTTPException(400)
            if frags[0] in save_headers:
                self.headers[frags[0]] = frags[1].strip()

    async def read_parse_form_data(self):
        """Read HTTP form data (payload), if any.
        Function is generator.

        Returns:
            - dict of key / value pairs
            - None in case of no form data present
        """
        # TODO: Probably there is better solution how to handle
        # request body, at least for simple urlencoded forms - by processing
        # chunks instead of accumulating payload.
        gc.collect()
        if b'Content-Length' not in self.headers:
            return {}
        # Parse payload depending on content type
        if b'Content-Type' not in self.headers:
            # Unknown content type, return unparsed, raw data
            return {}
        size = int(self.headers[b'Content-Length'])
        if size > self.params['max_body_size'] or size < 0:
            raise HTTPException(413)
        data = await self.reader.readexactly(size)
        # Use only string before ';', e.g:
        # application/x-www-form-urlencoded; charset=UTF-8
        ct = self.headers[b'Content-Type'].split(b';', 1)[0]
        try:
            if ct == b'application/json':
                return json.loads(data)
            elif ct == b'application/x-www-form-urlencoded':
                return parse_query_string(data.decode())
        except ValueError:
            # Re-generate exception for malformed form data
            raise HTTPException(400)


class response:
    """HTTP Response class"""

    def __init__(self, _writer):
        self.writer = _writer
        self.send = _writer.awrite
        self.code = 200
        self.version = '1.0'
        self.headers = {}

    async def _send_headers(self):
        """Compose and send:
        - HTTP request line
        - HTTP headers following by \r\n.
        This function is generator.

        P.S.
        Because of usually we have only a few HTTP headers (2-5) it doesn't make sense
        to send them separately - sometimes it could increase latency.
        So combining headers together and send them as single "packet".
        """
        # Request line
        hdrs = 'HTTP/{} {} MSG\r\n'.format(self.version, self.code)
        # Headers
        for k, v in self.headers.items():
            hdrs += '{}: {}\r\n'.format(k, v)
        hdrs += '\r\n'
        # Collect garbage after small mallocs
        gc.collect()
        await self.send(hdrs)

    async def error(self, code, msg=None):
        """Generate HTTP error response
        This function is generator.

        Arguments:
            code - HTTP response code

        Example:
            # Not enough permissions. Send HTTP 403 - Forbidden
            await resp.error(403)
        """
        self.code = code
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    async def redirect(self, location, msg=None):
        """Generate HTTP redirect response to 'location'.
        Basically it will generate HTTP 302 with 'Location' header

        Arguments:
            location - URL to redirect to

        Example:
            # Redirect to /something
            await resp.redirect('/something')
        """
        self.code = 302
        self.add_header('Location', location)
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    def add_header(self, key, value):
        """Add HTTP response header

        Arguments:
            key - header name
            value - header value

        Example:
            resp.add_header('Content-Encoding', 'gzip')
        """
        self.headers[key] = value

    def add_access_control_headers(self):
        """Add Access Control related HTTP response headers.
        This is required when working with RestApi (JSON requests)
        """
        self.add_header('Access-Control-Allow-Origin', self.params['allowed_access_control_origins'])
        self.add_header('Access-Control-Allow-Methods', self.params['allowed_access_control_methods'])
        self.add_header('Access-Control-Allow-Headers', self.params['allowed_access_control_headers'])

    async def start_html(self):
        """Start response with HTML content type.
        This function is generator.

        Example:
            await resp.start_html()
            await resp.send('<html><h1>Hello, world!</h1></html>')
        """
        self.add_header('Content-Type', 'text/html')
        await self._send_headers()

    async def send_file(self, filename, content_type=None, content_encoding=None, max_age=2592000, buf_size=128):
        """Send local file as HTTP response.
        This function is generator.

        Arguments:
            filename - Name of file which exists in local filesystem
        Keyword arguments:
            content_type - Filetype. By default - None means auto-detect.
            max_age - Cache control. How long browser can keep this file on disk.
                      By default - 30 days
                      Set to 0 - to disable caching.

        Example 1: Default use case:
            await resp.send_file('images/cat.jpg')

        Example 2: Disable caching:
            await resp.send_file('static/index.html', max_age=0)

        Example 3: Override content type:
            await resp.send_file('static/file.bin', content_type='application/octet-stream')
        """
        try:
            # Get file size
            stat = os.stat(filename)
            slen = str(stat[6])
            self.add_header('Content-Length', slen)
            # Find content type
            if content_type:
                self.add_header('Content-Type', content_type)
            # Add content-encoding, if any
            if content_encoding:
                self.add_header('Content-Encoding', content_encoding)
            # Since this is static content is totally make sense
            # to tell browser to cache it, however, you can always
            # override it by setting max_age to zero
            self.add_header('Cache-Control', 'max-age={}, public'.format(max_age))
            with open(filename) as f:
                await self._send_headers()
                gc.collect()
                buf = bytearray(min(stat[6], buf_size))
                while True:
                    size = f.readinto(buf)
                    if size == 0:
                        break
                    await self.send(buf, sz=size)
        except OSError as e:
            # special handling for ENOENT / EACCESS
            if e.args[0] in (errno.ENOENT, errno.EACCES):
                raise HTTPException(404)
            else:
                raise


async def restful_resource_handler(req, resp, param=None):
    """Handler for RESTful API endpoins"""
    # Gather data - query string, JSON in request body...
    data = await req.read_parse_form_data()
    # Add parameters from URI query string as well
    # This one is actually for simply development of RestAPI
    if req.query_string != b'':
        data.update(parse_query_string(req.query_string.decode()))
    # Call actual handler
    _handler, _kwargs = req.params['_callmap'][req.method]
    # Collect garbage before / after handler execution
    gc.collect()
    if param:
        res = _handler(data, param, **_kwargs)
    else:
        res = _handler(data, **_kwargs)
    gc.collect()
    # Handler result could be:
    # 1. generator - in case of large payload
    # 2. string - just string :)
    # 2. dict - meaning client what tinyweb to convert it to JSON
    # it can also return error code together with str / dict
    # res = {'blah': 'blah'}
    # res = {'blah': 'blah'}, 201
    if isinstance(res, type_gen):
        # Result is generator, use chunked response
        # NOTICE: HTTP 1.0 by itself does not support chunked responses, so, making workaround:
        # Response is HTTP/1.1 with Connection: close
        resp.version = '1.1'
        resp.add_header('Connection', 'close')
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Transfer-Encoding', 'chunked')
        resp.add_access_control_headers()
        await resp._send_headers()
        # Drain generator
        for chunk in res:
            chunk_len = len(chunk.encode('utf-8'))
            await resp.send('{:x}\r\n'.format(chunk_len))
            await resp.send(chunk)
            await resp.send('\r\n')
            gc.collect()
        await resp.send('0\r\n\r\n')
    else:
        if type(res) == tuple:
            resp.code = res[1]
            res = res[0]
        elif res is None:
            raise Exception('Result expected')
        # Send response
        if type(res) is dict:
            res_str = json.dumps(res)
        else:
            res_str = res
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Content-Length', str(len(res_str)))
        resp.add_access_control_headers()
        await resp._send_headers()
        await resp.send(res_str)


class webserver:

    def __init__(self, request_timeout=3, max_concurrency=3, backlog=16, debug=False):
        """Tiny Web Server class.
        Keyword arguments:
            request_timeout - Time for client to send complete request
                              after that connection will be closed.
            max_concurrency - How many connections can be processed concurrently.
                              It is very important to limit this number because of
                              memory constrain.
                              Default value depends on platform
            backlog         - Parameter to socket.listen() function. Defines size of
                              pending to be accepted connections queue.
                              Must be greater than max_concurrency
            debug           - Whether send exception info (text + backtrace)
                              to client together with HTTP 500 or not.
        """
        self.loop = asyncio.get_event_loop()
        self.request_timeout = request_timeout
        self.max_concurrency = max_concurrency
        self.backlog = backlog
        self.debug = debug
        self.explicit_url_map = {}
        self.catch_all_handler = None
        self.parameterized_url_map = {}
        # Currently opened connections
        self.conns = {}
        # Statistics
        self.processed_connections = 0

    def _find_url_handler(self, req):
        """Helper to find URL handler.
        Returns tuple of (function, opts, param) or (None, None) if not found.
        """
        # First try - lookup in explicit (non parameterized URLs)
        if req.path in self.explicit_url_map:
            return self.explicit_url_map[req.path]
        # Second try - strip last path segment and lookup in another map
        idx = req.path.rfind(b'/') + 1
        path2 = req.path[:idx]
        if len(path2) > 0 and path2 in self.parameterized_url_map:
            # Save parameter into request
            req._param = req.path[idx:].decode()
            return self.parameterized_url_map[path2]

        if self.catch_all_handler:
            return self.catch_all_handler

        # No handler found
        return (None, None)

    async def _handle_request(self, req, resp):
        await req.read_request_line()
        # Find URL handler
        req.handler, req.params = self._find_url_handler(req)
        if not req.handler:
            # No URL handler found - read response and issue HTTP 404
            await req.read_headers()
            raise HTTPException(404)
        # req.params = params
        # req.handler = han
        resp.params = req.params
        # Read / parse headers
        await req.read_headers(req.params['save_headers'])

    async def _handler(self, reader, writer):
        """Handler for TCP connection with
        HTTP/1.0 protocol implementation
        """
        gc.collect()

        try:
            req = request(reader)
            resp = response(writer)
            # Read HTTP Request with timeout
            await asyncio.wait_for(self._handle_request(req, resp),
                                   self.request_timeout)

            # OPTIONS method is handled automatically
            if req.method == b'OPTIONS':
                resp.add_access_control_headers()
                # Since we support only HTTP 1.0 - it is important
                # to tell browser that there is no payload expected
                # otherwise some webkit based browsers (Chrome)
                # treat this behavior as an error
                resp.add_header('Content-Length', '0')
                await resp._send_headers()
                return

            # Ensure that HTTP method is allowed for this path
            if req.method not in req.params['methods']:
                raise HTTPException(405)

            # Handle URL
            gc.collect()
            if hasattr(req, '_param'):
                await req.handler(req, resp, req._param)
            else:
                await req.handler(req, resp)
            # Done here
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass
        except OSError as e:
            # Do not send response for connection related errors - too late :)
            # P.S. code 32 - is possible BROKEN PIPE error (TODO: is it true?)
            if e.args[0] not in (errno.ECONNABORTED, errno.ECONNRESET, 32):
                try:
                    await resp.error(500)
                except Exception as e:
                    #log.exc(e, "")
                    logger.error(f'OSError: {e}')
        except HTTPException as e:
            try:
                await resp.error(e.code)
            except Exception as e:
                #log.exc(e)
                logger.error(f'HTTPException: {e}')
        except Exception as e:
            # Unhandled expection in user's method
            #log.error(req.path.decode())
            #log.exc(e, "")
            logger.error(f'Unhandled expection. URL: {req.path.decode()}, exception: {e}')
            try:
                await resp.error(500)
                # Send exception info if desired
                if self.debug:
                    sys.print_exception(e, resp.writer.s)
            except Exception as e:
                pass
        finally:
            await writer.aclose()
            # Max concurrency support -
            # if queue is full schedule resume of TCP server task
            if len(self.conns) == self.max_concurrency:
                self.loop.create_task(self._server_coro)
            # Delete connection, using socket as a key
            del self.conns[id(writer.s)]

    def add_route(self, url, f, **kwargs):
        """Add URL to function mapping.

        Arguments:
            url - url to map function with
            f - function to map

        Keyword arguments:
            methods - list of allowed methods. Defaults to ['GET', 'POST']
            save_headers - contains list of HTTP headers to be saved. Case sensitive. Default - empty.
            max_body_size - Max HTTP body size (e.g. POST form data). Defaults to 1024
            allowed_access_control_headers - Default value for the same name header. Defaults to *
            allowed_access_control_origins - Default value for the same name header. Defaults to *
        """
        if url == '' or '?' in url:
            raise ValueError('Invalid URL')
        # Initial params for route
        params = {'methods': ['GET'],
                  'save_headers': [],
                  'max_body_size': 1024,
                  'allowed_access_control_headers': '*',
                  'allowed_access_control_origins': '*',
                  }
        params.update(kwargs)
        params['allowed_access_control_methods'] = ', '.join(params['methods'])
        # Convert methods/headers to bytestring
        params['methods'] = [x.encode() for x in params['methods']]
        params['save_headers'] = [x.encode() for x in params['save_headers']]
        # If URL has a parameter
        if url.endswith('>'):
            idx = url.rfind('<')
            path = url[:idx]
            idx += 1
            param = url[idx:-1]
            if path.encode() in self.parameterized_url_map:
                raise ValueError('URL exists')
            params['_param_name'] = param
            self.parameterized_url_map[path.encode()] = (f, params)

        if url.encode() in self.explicit_url_map:
            raise ValueError('URL exists')
        self.explicit_url_map[url.encode()] = (f, params)

    def add_resource(self, cls, url, **kwargs):
        """Map resource (RestAPI) to URL

        Arguments:
            cls - Resource class to map to
            url - url to map to class
            kwargs - User defined key args to pass to the handler.

        Example:
            class myres():
                def get(self, data):
                    return {'hello': 'world'}


            app.add_resource(myres, '/api/myres')
        """
        methods = []
        callmap = {}
        # Create instance of resource handler, if passed as just class (not instance)
        try:
            obj = cls()
        except TypeError:
            obj = cls
        # Get all implemented HTTP methods and make callmap
        for m in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            fn = m.lower()
            if hasattr(obj, fn):
                methods.append(m)
                callmap[m.encode()] = (getattr(obj, fn), kwargs)
        self.add_route(url, restful_resource_handler,
                       methods=methods,
                       save_headers=['Content-Length', 'Content-Type'],
                       _callmap=callmap)

    def catchall(self):
        """Decorator for catchall()

        Example:
            @app.catchall()
            def catchall_handler(req, resp):
                response.code = 404
                await response.start_html()
                await response.send('<html><body><h1>My custom 404!</h1></html>\n')
        """
        params = {'methods': [b'GET'], 'save_headers': [], 'max_body_size': 1024, 'allowed_access_control_headers': '*', 'allowed_access_control_origins': '*'}

        def _route(f):
            self.catch_all_handler = (f, params)
            return f
        return _route

    def route(self, url, **kwargs):
        """Decorator for add_route()

        Example:
            @app.route('/')
            def index(req, resp):
                await resp.start_html()
                await resp.send('<html><body><h1>Hello, world!</h1></html>\n')
        """
        def _route(f):
            self.add_route(url, f, **kwargs)
            return f
        return _route

    def resource(self, url, method='GET', **kwargs):
        """Decorator for add_resource() method

        Examples:
            @app.resource('/users')
            def users(data):
                return {'a': 1}

            @app.resource('/messages/<topic_id>')
            async def index(data, topic_id):
                yield '{'
                yield '"topic_id": "{}",'.format(topic_id)
                yield '"message": "test",'
                yield '}'
        """
        def _resource(f):
            self.add_route(url, restful_resource_handler,
                           methods=[method],
                           save_headers=['Content-Length', 'Content-Type'],
                           _callmap={method.encode(): (f, kwargs)})
            return f
        return _resource

    async def _tcp_server(self, host, port, backlog):
        """TCP Server implementation.
        Opens socket for accepting connection and
        creates task for every new accepted connection
        """
        addr = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][-1]
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(addr)
        sock.listen(backlog)
        try:
            while True:
                if IS_UASYNCIO_V3:
                    yield uasyncio.core._io_queue.queue_read(sock)
                else:
                    yield asyncio.IORead(sock)
                csock, caddr = sock.accept()
                csock.setblocking(False)
                # Start handler / keep it in the map - to be able to
                # shutdown gracefully - by close all connections
                self.processed_connections += 1
                hid = id(csock)
                handler = self._handler(asyncio.StreamReader(csock),
                                        asyncio.StreamWriter(csock, {}))
                self.conns[hid] = handler
                self.loop.create_task(handler)
                # In case of max concurrency reached - temporary pause server:
                # 1. backlog must be greater than max_concurrency, otherwise
                #    client will got "Connection Reset"
                # 2. Server task will be resumed whenever one active connection finished
                if len(self.conns) == self.max_concurrency:
                    # Pause
                    yield False
        except asyncio.CancelledError:
            return
        finally:
            sock.close()

    def run(self, host="127.0.0.1", port=8081, loop_forever=True):
        """Run Web Server. By default it runs forever.

        Keyword arguments:
            host - host to listen on. By default - localhost (127.0.0.1)
            port - port to listen on. By default - 8081
            loop_forever - run loo.loop_forever(), otherwise caller must run it by itself.
        """
        self._server_coro = self._tcp_server(host, port, self.backlog)
        self.loop.create_task(self._server_coro)
        if loop_forever:
            self.loop.run_forever()

    def shutdown(self):
        """Gracefully shutdown Web Server"""
        asyncio.cancel(self._server_coro)
        for hid, coro in self.conns.items():
            asyncio.cancel(coro)

使用时,其实也有不少东西需要注意。

  1. 处理POST请求的方法,其注解需要设置save_headers = ['Content-Length','Content-Type'],才能获取到请求的参数。例如:
app = tinyweb.webserver()

@app.route('/save', methods = ['POST'], save_headers = ['Content-Length','Content-Type'])
async def save(request, response):
    params = await request.read_parse_form_data()
    await response.send(params)
  1. 一个方法处理多个请求链接,加多行注解即可。例如:
app = tinyweb.webserver()

@app.route('/')
@app.route('/index')
async def index(request, response):
    await response.send("Hello world!")
  1. 如果需要浏览器或对方客户端不要缓存,设置响应头即可。相关代码:
    response.add_header('Cache-Control', 'no-cache')  # for HTTP 1.1
    response.add_header('Pragma', 'no-cache')  # for HTTP 1.0
    response.add_header('Expires', '0')  # for proxies
  1. Web服务的max_concurrency不要设置太大,以降低内存占用。
  2. 更多的示例,可参考官方文档。

采用MicroPython编写的定时任务,特别是在实际环境测试,一般不能看到错误信息。因此,需要做log记录。

找到一个实现Logger功能的项目ulogger,其代码没有依赖其它库,使用方式也符合一般的Logger用法。相关信息如下:

相关代码复制了一份过来,取消了TextIOWrapper的引用:

"""
- project: micropython-ulogger
  https://github.com/whales-chen/micropython-ulogger
- code from
  https://github.com/whales-chen/micropython-ulogger/blob/main/ulogger/__init__.py
- version
  ec4f6b3842c677fbb457f6bc6d88afd8a82eeed6
"""
try:    import time
except: import utime as time

try:    from micropython import const
except: const = lambda x:x # for debug

#from io import TextIOWrapper
import io

__version__ = "v1.2"

DEBUG:    int = const(10)
INFO:     int = const(20)
WARN:     int = const(30)
ERROR:    int = const(40)
CRITICAL: int = const(50)

TO_FILE = const(100)
TO_TERM = const(200)

# fmt map 的可选参数
_level  = const(0)
_msg    = const(1)
_time   = const(2)
_name   = const(3)
_fnname = const(4)


def level_name(level: int, color: bool = False) -> str:
    if not color:
        if level == INFO:
            return "INFO"
        elif level == DEBUG:
            return "DEBUG"
        elif level == WARN:
            return "WARN"
        elif level == ERROR:
            return "ERROR"
        elif level == CRITICAL:
            return "CRITICAL"
    else:
        if level == INFO:
            return "\033[97mINFO\033[0m"
        elif level == DEBUG:
            return "\033[37mDEBUG\033[0m"
        elif level == WARN:
            return "\033[93mWARN\033[0m"
        elif level == ERROR:
            return "\033[35mERROR\033[0m"
        elif level == CRITICAL:
            return "\033[91mCRITICAL\033[0m"


class BaseClock ():
    """
    This is a BaseClock for the logger.
    Please inherit this class by your custom.
    """

    def __call__(self) -> str:
        """
        Acquire the time of now, please inherit this method.
        We will use the return value of this function as the time format of the log,
        such as `2021 - 6 - 13` or `12:45:23` and so on.

        :return: the time string.
        """
        return '%d' % time.time()


class Handler():
    """The Handler for logger.
    """
    _template: str
    _map: bytes
    level: int
    _direction: int
    _clock: BaseClock
    _color: bool
    _file_name: str
    _max_size: int
    #_file = TextIOWrapper
    _file = None

    def __init__(self,
        level: int = INFO,
        colorful: bool = True,
        fmt: str = "&(time)% - &(level)% - &(name)% - &(msg)%",
        clock: BaseClock = None,
        direction: int = TO_TERM,
        file_name: str = "logging.log",
        max_file_size: int = 4096
        ):
        """
        Create a Handler that you can add to the logger later

        ## Options available for fmt.
        - &(level)%  : the log level
        - &(msg)%    : the log message
        - &(time)%   : the time acquire from clock, see `BaseClock`
        - &(name)%   : the logger's name
        - &(fnname)%  : the function name which you will pass on.
        - more optional is developing.

        ## Options available for level.
        - DEBUG
        - INFO
        - WARN
        - ERROR
        - CRITICAL

        ## Options available for direction.
        - TO_FILE : output to a file
        - TO_TERM : output to terminal

        :param level: Set a minimum level you want to be log
        :type level: int(see the consts in this module)

        :param colorful: Whether to use color display information to terminal(Not applicable to files)
        :type colorful: bool

        :param fmt: the format string like: "&(time)% - &(level)% - &(name)% - &(fnname)% - &(msg)%"(default)
        :type fmt: str

        :param clock: The Clock which will provide time str. see `BaseClock`
        :type clock: BaseClock(can be inherit )

        :param direction: Set the direction where logger will output
        :type direction: int (`TO_FILE` or `TO_TERM`)

        :param file_name: available when you set `TO_FILE` to param `direction`. (default for `logging.log`)
        :type file_name: str
        :param max_file_size: available when you set `TO_FILE` to param `direction`. The unit is `byte`, (default for 4k)
        :type max_file_size: str
        """
        #TODO: 文件按日期存储, 最大份数的设置.
        self._direction = direction
        self.level = level
        self._clock = clock if clock else BaseClock()
        self._color = colorful
        self._file_name = file_name if direction == TO_FILE else ''
        self._max_size = max_file_size if direction == TO_FILE else 0

        if direction == TO_FILE:
            self._file = open(file_name, 'a+')

        # 特么的re居然不能全局匹配, 烦了, 只能自己来.
        # m = re.match(r"&\((.*?)\)%", fmt)
        # i = 0
        # while True:
        #     # 由于蛋疼的 ure 不能直接获取匹配结果的数量, 只能使用这种蠢蛋方法来循环.
        #     try:
        #         text = m.group(i)

        #     except:
        #         # 发生错误说明已经遍历完毕
        #         break

        #     # 使用指针代替文本来减少开销
        #     if text == "level":
        #         self._map.append(_level)
        #     elif text == "msg":
        #         self._map.append(_msg)
        #     elif text == "time":
        #         self._map.append(_time)
        #     elif text == "name":
        #         self._map.append(_name)
        #     elif text == "fnname":
        #         self._map.append(_fnname)

        #     i += 1

        # 添加映射
        self._map = bytearray()
        idx = 0
        while True:
            idx = fmt.find("&(", idx)
            if idx >= 0:  # 有找到
                a_idx = fmt.find(")%", idx+2)
                if a_idx < 0:
                    # 没找到后缀, 报错
                    raise Exception(
                        "Unable to parse text format successfully.")
                text = fmt[idx+2:a_idx]
                idx = a_idx+2  # 交换位置
                if text == "level":
                    self._map.append(_level)
                elif text == "msg":
                    self._map.append(_msg)
                elif text == "time":
                    self._map.append(_time)
                elif text == "name":
                    self._map.append(_name)
                elif text == "fnname":
                    self._map.append(_fnname)
            else:  # 没找到, 代表后面没有了
                break

        # 将 template 变成可被格式化的文本
        # 确保最后一个是换行字符

        self._template = fmt.replace("&(level)%", "%s")\
            .replace("&(msg)%", "%s")\
            .replace("&(time)%", "%s")\
            .replace("&(name)%", "%s")\
            .replace("&(fnname)%", "%s")\
            + "\n" if fmt[:-1] != '\n' else ''

    def _msg(self, *args, level: int, name: str, fnname: str):
        """
        Log a msg
        """
        
        if level < self.level:
            return
        # generate msg
        temp_map = []
        text = ''
        for item in self._map:
            if item == _msg:
                for text_ in args:  # 将元组内的文本添加到一起
                    text = "%s%s" % (text, text_)  # 防止用户输入其他类型(int, float)
                temp_map.append(text)
            elif item == _level:
                if self._direction == TO_TERM:  # only terminal can use color.
                    temp_map.append(level_name(level, self._color))
                else:
                    temp_map.append(level_name(level))
            elif item == _time:
                temp_map.append(self._clock())
            elif item == _name:
                temp_map.append(name)
            elif item == _fnname:
                temp_map.append(fnname if fnname else "unknownfn")

        if self._direction == TO_TERM:
            self._to_term(tuple(temp_map))
        else:
            self._to_file(tuple(temp_map))
        # TODO: 待验证: 转换为 tuple 和使用 fromat 谁更快

    def _to_term(self, map: tuple):
        print(self._template % map, end='')

    def _to_file(self, map: tuple):
        fp = self._file
        # 检查是否超出大小限制.
        prev_idx = fp.tell()  # 保存原始指针位置
        # 将读写指针跳到文件最大限制的地方,
        # 如果能读出数据, 说明文件大于指定的大小
        fp.seek(self._max_size)
        if fp.read(1):  # 能读到数据, 说明超出大小限制了
            fp = self._file = open(self._file_name, 'w')  # 使用 w 选项清空文件内容
        else:
            # 没有超出限制
            fp.seek(prev_idx)  # 指针回到原来的地方

        # 检查完毕, 开始写入数据
        fp.write(self._template % map)
        fp.flush()


class Logger():
    _handlers: list

    def __init__(self,
        name: str,
        handlers: list = None,
        ):

        self.name = name
        if not handlers:
            # 如果没有指定处理器, 自动创建一个默认的
            self._handlers = [Handler()]
        else:
            self._handlers = handlers

    @property
    def handlers(self):
        return self._handlers

    def _msg(self, *args, level: int, fn: str):

        for item in self._handlers:
            #try:
            item._msg(*args, level=level, fnname=fn, name=self.name)
            #except:
            #    print("Failed while trying to record")

    def debug(self, *args, fn: str = None):
        self._msg(*args, level=DEBUG, fn=fn)

    def info(self, *args, fn: str = None):
        self._msg(*args, level=INFO, fn=fn)

    def warn(self, *args, fn: str = None):
        self._msg(*args, level=WARN, fn=fn)

    def error(self, *args, fn: str = None):
        self._msg(*args, level=ERROR, fn=fn)

    def critical(self, *args, fn: str = None):
        self._msg(*args, level=CRITICAL, fn=fn)


__all__ = [
    Logger,
    Handler,
    BaseClock,


    DEBUG,
    INFO,
    WARN,
    ERROR,
    CRITICAL,

    TO_FILE,
    TO_TERM,

    __version__
]