分类 编程相关 下的文章

计划分两部分总结一下Docker的学习。本文先从入门到应用开始,了解Docker并用上它。

0 总结:

本文推荐使用Docker Compose运行和管理容器。

1 概述

1.1 Docker是什么?

Docker是依赖Linux内核,通过建立应用的运行环境,实现虚拟化。其实现方式不是采用硬件级别的虚拟,而是“隔离”。就是通过隔离手段实现软件运行环境的沙箱化。

其底层依赖于Linux的chroot、namespace、cgroup等三种老技术,结合当前系统的Linux内核,建立另一个Linux运行环境,并运行指定软件。试过在Android系统部署Linux分发版(Android也是一个Linux发行版),就是使用了类似的技术。

要注意,Docker的目的不是提供一个完整的虚拟Linux,而是把运行环境和软件组合为服务,去运行和管理。例如Debian官方镜像默认没有提供“服务”(servcie)管理的功能,而是作为一个指定服务的一部分去运行。

参考资料:

1.2 为什么要用Docker?

  • 一次建立,到处运行。

    • 环境和软件一起打包,不用担心测试环境与正式环境的区别。
    • 对于Window平台可以使用WSL运行。
  • 优胜于传统虚拟机。

    • 轻,只虚拟环境。
    • 快,运行快。
    • 小,资源占用小,包括CPU、内存、硬盘等资源。
  • 管理简单。

    • 镜像作为模板,容器作为实例。
    • 多个容器可以关联组合。

例如,部署多个MySQL数据库,并配置不同的参数作对比,那么直接运行多个MySQL的Docker容器即可,其表现为运行在不同IP的多个MySQL服务。

1.3 基本概念

  1. 仓库(Repository)

  2. 镜像(Image)

    • 容器运行时所需的root文件系统(包括程序、库、资源、配置等文件),以及配置参数(包括匿名卷、环境变量、用户等)。
    • 可来理解为容器的模板。其不包含动态数据,构建后不可修改。
  3. 容器(Container)

    • 简单来说,跟面向对象程序设计中的“类”(镜像)和“实例”(容器)一样。
    • 容器包括容器配置、相关文件、相关进程。

1.4 怎样使用?

  1. 再次强调,Docker是建立应用的运行环境,不是虚拟机。所以不要在Docker容器里面跑服务,而是把整个Docker容器作为一个服务进行管理和使用。

    • 例如启动Dokcer容器时,执行一个启动脚本,运行所需软件。
  2. 一般建议把重要数据映射到容器外部的文件系统,避免删除或升级容器时同时删除相关文件,也能减少性能损耗。

    • 例如Docker部署MySQL,建议把数据目录映射出来。
  3. 升级容器时,先下载更新的镜像,再删除该容器,并使用相同的配置文件,建立同名的容器。

1.5 Docker Compose

为了更好地解决多个Docker容器协同部署、工作,诞生了“Docker Compose”。一开始使用“Python”实现的,但到了2.0,Docker官方使用Go重新实现,并以插件形式集成到docker-ce(其软件包名为docker-compose-plugin)。使用docker compose命令(代替1.0的docker-compose命令)进行Docker容器的编排。

Docker Compose另一个的优点是,借助YAML脚本和环境变量,可以轻松配置或修改Docker容器。相比之下,使用docker run命令部署的Docker容器,修改配置需要重启Docker服务(影响其它Docker容器),相当麻烦。所以建议使用Docker Compose的方式运行Docker容器,而且一般Docker会提供Docker Compose的示例配置文件。

1.6 Kubernetes

Kubernetes也称为K8s,是用于自动部署、扩缩和管理容器化应用程序的开源系统。它将组成应用程序的容器组合成逻辑单元(Pod),以便于管理和服务发现。这部分内容,计划另起一个文章再整理。

1.7 参考文档

2 安装部署

2.1 安装

官方安装教程如下:

针对Debian或Ubuntu,总结如下:

  • 推荐使用Docker官方APT仓库进行安装。一般先卸载通过系统仓库安装的自带Docker引擎。
  • 安装Docker CE版,即Community Edition(社区版,免费)。
  • 安装docker-compose插件:sudo apt install docker-compose-plugin

2.2 配置

Docker服务的配置,主要分为命令行参数和配置文档。

详见官方说明文档:

由于这里以SystemD服务的方式运行Docker服务,所以采用配置文件的方式进行配置。在采用SystemD的Linux发行版上(例如Ubuntu 16.04+、Debian 8+、CentOS 7等),对应配置文件为/etc/docker/daemon.json(如没有该文件,需要自行创建)。

修改并保存daemon.json后,一般无需重启Docker服务,发送reload信号即可。如下:

sudo systemctl daemon-reload
sudo systemctl reload docker.service

2.3 设置“仓库”镜像(mirror)网站

如果访问官方“仓库”比较慢,可以使用中国的镜像网站。在配置文件/etc/docker/daemon.json中加入:

{
  "registry-mirrors": ["https://hub-mirror.c.163.com/"]
}

其它可用的加速镜像可参考:

2.4 设置代理

这里是为了解决Docker不能访问第三方“仓库”的网址,导致不能拉取相关“镜像”文件。例如gcr.io/cadvisor/cadvisor.

在配置文件/etc/docker/daemon.json中加入:

{
  "proxies": {
    "http-proxy": "http://proxy.example.com:80",
    "https-proxy": "https://proxy.example.com:443",
    "no-proxy": "*.test.example.com,.example.org,.163.com,192.168.0.0/24,127.0. 0.0/8"
  }
}

其中,http-proxyhttps-proxy是设置相应的HTTP代理服务,no-proxy是设置不走代理的域名或IP。

2.5 设置权限阻隔

默认情况下,“容器”中的进程以root用户权限运行,与“宿主机”中的root是同一个用户。这就意味着一旦“容器”中的进程有了适当的机会,它就可以控制“宿主机”上的一切。

解决方法有两种,一种是使用非root用户运行容器,另一种是用户命名空间(user namespace)方式。

关于用户命名空间的设置,参考下文:

3 使用

3.1 运行“容器”

到“仓库”找所需“镜像”,下载“镜像”后运行对应的“容器”。直接执行“docker run”时,如果本地没有相关“镜像”,会自动在Docker官方“仓库”下载,然后创建“容器”并运行。

一般使用软件官方提交的“镜像”,若要使用第三方“镜像”,最好确认清楚是否安全。

下面以部署Debian容器为例:

# 搜索Debian“镜像”
docker search debian

# 查看“镜像”版本,最好是访问docker官网。例如Debian镜像的版本:https://registry.hub.docker.com/_/debian/tags

# 下载Debian“镜像”。格式是,“镜像”名称:版本名称。如果不填版本名称,默认是Latest。
docker image pull debian:stable

# 列出所有已下载的“镜像”,包括隐藏的。以下三个命令的效果是一样的。
docker image ls -a
docker image list -a
docker images -a

# 查看正在运行的“容器”。以下三个命令的效果是一样的,推荐第一个。
docker container ls
docker container ps
docker container list

# 查看所有“容器”,包括未运行。
docker container ls -a

# 创建并运行“容器”。名为“debian-test”,映射两个端口(本机50080->“容器”40080,本机51080->“容器”41080),并把本地“/opt/docker-files/debian-test/opt”目录映射到“容器”的“/opt”目录。
docker run -it --name debian-test -p 50080:40080 -p 51080:41080 -v /opt/docker-files/debian-test/opt:/opt debian:stable

# 查看“容器”运行日志,检查是否报错
docker logs debian-test

3.2 进入“容器”

进入“容器”。可以使用attachexec命令,但建议使用exec并运行一个新的Bash,避免退出时影响“容器”当前运行的程序。

  1. attach命令。适合“容器”正在运行具有命令行交互的程序,比如“Bash”。以下两个命令是一样的。
# 两个命令是一样的
docker attach debian-test
docker container attach debian-test

要注意,使用attach命令进入“容器”并输入“exit”进行退出时,就等于Bash程序结束执行,“容器”随之停止运行。解决方案:

  • 方案1,“docker attach”进入容器后,按Ctrl + P + Q退出。
  • 方案2,进入容器时,加入参数“--sig-proxy=false”,退出时使用“exit”命令而不停止容器。

    • 例如:docker attach --sig-proxy=false debian-test
  1. exec命令。打开一个具有命令行交互的程序,即进入“容器”。
# 两个命令是一样的,运行Bash并进入
docker exec -it debian-test /bin/bash
docker container exec -it debian-test /bin/bash

# 使用root用户运行Bash,并设置字符编码
docker exec -u root -it debian-test env LANG=C.UTF-8 /bin/bash

退出“容器”就等于退出当前运行的Bash,对“容器”正在运行的程序无影响。

3.4 管理“容器”

# 启动“容器”
docker container start debian-test

# 停止“容器”
docker container stop debian-test

# 重启“容器”
docker container restart debian-test

# 删除“容器”
docker container rm debian-test

3.5 复制“容器”

主要思路是,把指定“容器1”生成“镜像A”,再用“镜像A”创建“容器2”,实现把“容器1“复制为”容器2“。

注意:生成“镜像”时,“容器”外部的文件,不会打包进“镜像”文件。需要手工复制这些文件,并设置映射。

假设已创建“容器”:debian-test,需要复制为另一个“容器”:debian-test2。

# 把“容器” debian-test,生成“镜像”文件
docker export debian-test > /opt/docker_image_debian-test.tar

# 引入“镜像”,并命名为 debian-test:v1
docker import /opt/docker_image_debian-test.tar debian-test:v1

# 创建另一个“容器” debian-test2
docker run -itd --name debian-test2 -p 50081:40080 -p 51081:41080 -v /opt/docker-files/debian-teset2/opt:/opt debian-test:v1 /bin/bash

3.5 Dokcer图形化管理

Docker Desktop下载地址和安装介绍:

4 使用Docker Compose

Docker Compose是以“项目”(project)作为管理单元。一个项目,是由一个或多个“服务”(service,即“容器”)组合而成。

4.1 使用

把Compose配置文件存放到指定目录,进入该目录,并执行docker compose up,即可启动。

4.2 命令说明

格式:docker compose [可选项] 命令。“可选项”和“命令”的说明如下:

可选项说明
--ansi string控制何时打印 ANSI 控制字符。string的可选值为"never"、"always"、"auto",默认为"auto"。
--compatibility以向后兼容模式运行 compose。
--env-file stringArray指定备用环境文件。
-f, --file stringArrayCompose配置文件。
--parallel int控制最大并行度,-1 为无限制。默认-1。
--profile stringArray指定要启用的配置文件。
--project-directory string指定备用工作目录。默认:首先采用所配置的第一个Compose文件的路径。
-p, --project-name string项目名称。
命令说明
build构建或重新构建服务
config以规范格式粘贴、解析和呈现Compose配置文件
cp在服务容器和本地文件系统之间复制文件或文件夹
create为服务创建容器
down停止并移除容器、网络
events从容器接收实时事件
exec在正在运行的容器中执行命令
images列出创建的容器使用的镜像
kill强制停止服务容器
logs查看容器的输出
ls列出正在运行的Compose项目
pause暂停服务
port打印端口绑定的公共端口
ps列出容器
pull拉取服务镜像
push拉取服务镜像
restart重启服务容器
rm删除停止的服务容器
run在服务上运行一次性命令
start启动服务
stop停止服务
top显示正在运行的进程
unpause取消暂停服务
up创建并启动容器
version显示 Docker Compose 版本信息

Oracle接收了Java,主要为了赚钱,确实不是好事。但值得安慰的是,Java,或者说OpenJDK,还是保持开源(基于GPLv2)。

对JDK的选择,需关注:

  • Java 17,Oracle声称,Oracle JDK和OpenJDK,都是采用同一套源码构建。
  • Oracle JDK需要关注商业使用的授权问题。
  • Oracle只提供OpenJDK最新源码,不提供最新构建版的下载,导致冒出很多JDK 17的构建版本。

参考:

结论:

  1. 最佳选择:Adoptium Eclipse Temurin。其官网:Adoptium
  2. Linux分发版,一般直接安装其维护的OpenJDK版本即可。

补充:

Linux系统,使用initd时,想要开机启动时执行指定Shell脚本,只需修改/etc/rc.local。现在的分发版基本改用Systemd,找不到rc.local了。要实现类似的功能,有两个方案:

  1. 启用rc-local.service服务。
  2. 创建一个开机启动的服务。

推荐方案2,可以设置服务关联。比如设置网络服务启动后才执行指定脚本。

以Debian 11为例,记录一下两个方案的具体操作。

方案1,启用rc-local.service服务

  1. 创建/etc/rc.local文件,并设置执行权限
cat <<EOF >/etc/rc.local
#!/bin/sh -e
# run at the end of each multiuser runlevel

exit 0 EOF
chmod +x /etc/rc.local

exit 0之前,编写要执行的Shell脚本。一定不要删除结尾的exit 0

  1. 启用服务
sudo systemctl enable --now rc-local

方案2,创建开机启动服务

就是配置一个Systemd服务,并设置开机启动。其启动的代码,就是执行指定Shell脚本。

  1. 创建配置文件。

在目录/etc/systemd/system,创建文件startup.service(文件名可自取),并添加以下内容:

#################################
# Run at startup
#################################

[Unit]
Description=Run the shell script at system startup.
Wants=network-online.target
After=network.target network-online.target

[Service]
Type=simple
WorkingDirectory=/opt/startup
ExecStart=/opt/startup/run.sh
User=root

[Install]
WantedBy=multi-user.target

其中ExecStart就是配置要执行的Shell脚本。这里假设Shell脚本需要联网后才能执行。

  1. 启用等待网络正常启动的服务。

服务配置依赖network-online.target时,需要开启systemd-networkd-wait-online.service服务,才能确保在网卡正确启动并获取IP地址后执行。

sudo systemctl enable systemd-networkd-wait-online.service
  1. 设置开机启动。
sudo systemctl daemon-reload
sudo enable startup.service

注:升级Chrome 114.0.5735.90后,问题已解决。

Lunbuntu 22.04.2,昨晚升级安装系统升级包后,Chrome浏览器渲染页面不正常了。具体是,页面背景色正常,布局正常,也能显示一些布局样式,但完全不显示相关文字,甚至有些图片也不能显示。

尝试Shell里直接运行/opt/google/chrome/chrome,会弹出以下信息:

MESA-INTEL: warning: Haswell Vulkan support is incomplete

应该是显卡与其驱动,不支持Chrome的新特性吧。

于是找到这个文章:Linux: Chrome Starts Very Slowly After Enable Nvidia Driver

根据文章的相关内容,尝试出三个可以正常渲染页面的Chrome浏览器的启动参数(如下)。

启动参数说明效果
--disable-gpu禁用GPU硬件加速。正常
--disable-gpu-driver-bug-workarounds禁用各种GPU驱动程序错误的解决方法。正常
--enable-features=Vulkan开启Vulkan特性可以渲染页面,但有些图片显示为黑块

最后的解决方案:

  1. 选用启动参数--disable-gpu-driver-bug-workarounds
  2. 修改/usr/bin/google-chrome文件,在最后一行exec -a "$0" "$HERE/chrome" "$@"中添加启动参数。即该文件的最后一行修改为:
#exec -a "$0" "$HERE/chrome" "$@"
exec -a "$0" "$HERE/chrome" --disable-gpu-driver-bug-workarounds "$@"

另外,关于Chrome浏览器的各个启动参数,有个文章整理得很详细:List of Chromium Command Line Switches

当今时代,代码版本管理,一般推荐“Git”。但是工作用到“Subversion”,即“SVN”,所以记录一下相关部署。

1. 概述

在Debian 11上部署SVN(即Subversion)服务,一般按以下步骤执行即可。

  • 安装。Debian使用apt命令即可。
  • 配置。包括创建运行用户、建立部署文件夹、设置Systemd自动启动。
  • “仓库”管理。包括“仓库”的创建、备份、导出、迁移等。
  • “踩坑”。记录遇到的问题。

参考

2. 安装

通过apt命令安装“subversion”即可。

sudo apt install subversion

“subversion”包含3类程序:

  • 服务程序

    • svnserve:提供SVN的网络服务。
    • svnsync:仓库镜像工具,仓库都可以是远程的或本地的。
  • 管理程序

    • svnadmin:管理SVN仓库。
    • svnlook:查看SVN仓库,纯只读。
    • svnrdump:远程仓库数据迁移。相当于svnadmin dumpsvnadmin load的网络版。
    • svndumpfilter:历史过滤工具,从SVN的转储文件中移除某些历史 (即版本号)。
  • 客户端程序

    • svn:SVN命令行客户端。
    • svnversion:显示工作副本版本信息。
    • svnmucc:多URL命令行客户端,允许用户在没有工作副本的情况下,向仓库提交任意修改。

以上程序的使用,可参考:Subversion 命令行参考手册

3. 配置

  • 创建系统用户“svn”,用于运行SVN服务。
useradd -r -M -s /bin/false svn
  • 创建SVN“仓库”的路径。
sudo mkdir -p /opt/svn/repos
sudo chown svn /opt/svn/repos
  • 创建Systemd的服务配置文件/etc/systemd/system/svnserve.service。其内容如下:
[Unit]
Description=Subversion protocol daemon
After=syslog.target network.target

[Service]
Type=forking
RuntimeDirectory=svnserve
PIDFile=/run/svnserve/svnserve.pid
EnvironmentFile=/etc/default/svnserve
ExecStart=/usr/bin/svnserve $DAEMON_ARGS
User=svn
Group=svn
KillMode=control-group
Restart=on-failure

[Install]
WantedBy=multi-user.target
  • 创建设置SVN服务启动参数的配置文件/etc/default/svnserve。其内容如下:
# svnserve options
DAEMON_ARGS="--daemon --pid-file /run/svnserve/svnserve.pid --root /srv/svn/repos --log-file /var/log/svnserve/svnserve.log"
  • 创建Log日志文件目录。
sudo mkdir /var/log/svnserve
sudo chown svn /var/log/svnserve
  • 创建Log日志文件自动转存的配置文件/etc/logrotate.d/svnserve。设置每天按天进行转存并压缩日志,最多保留最近30天。其内容如下:
/var/log/svnserve/*.log {
    daily
    missingok
    rotate 30
    compress
    notifempty
    create 640 svn adm
    sharedscripts
    postrotate
            if /bin/systemctl status svnserve > /dev/null ; then \
                /bin/systemctl restart svnserve > /dev/null; \
            fi;
    endscript
}
  • 最后SVN服务设置为自动开机启动,并立即启动它。
sudo systemctl daemon-reload
sudo systemctl enable svnserve.service
sudo systemctl start svnserve.service

4. 仓库管理

SVN的“仓库”(Repository),是一系列需要进行版本管理的目录和文件的管理单位。例如一个包含几个子项目的大型项目。“仓库”没有限制组织结构,只是以树状形式保存相关目录和文件。

  • 创建“仓库”。在所有“仓库”的存放目录下,建立指定“仓库”目录“project1”。
sudo -u svn svnadmin create /opt/svn/repos/project1
  • 设置“仓库”访问权限。修改仓库的“conf”目录下的文件。

    • svnserve.conf,配置“仓库”的认证、授权策略、访问控制等。
    • authz-db,“仓库”的访问控制文件,包括配置可访问“仓库”的用户。
    • password-db,可访问“仓库”的用户的密码。
  • 迁移“仓库”。把“仓库”导出,再导入到另一个新的仓库。可用于升级SVN服务软件后,把已有“仓库”迁移过去。
# 导出仓库project1
sudo -u svn svnadmin dump project1 > project1.dump

# 导入到仓库project2。project2需要提前新建。
sudo -u svn svnadmin load project2 < project1.dump
  • 备份“仓库”。使用“svnadmin hotcopy”命令,对“仓库”进行在线备份(不用担心是否还有其他进程在访问“仓库”),并能够在原“仓库”出现故障时顶替上去。此命令也可用于不同机器的同版本SVN服务之间迁移数据。
# 执行备份
sudo -u svn mkdir -p /var/svn/backups/ 
sudo -u svn svnadmin hotcopy /opt/svn/repos/project1 /var/svn/backups/project1.bak

# 执行还原
sudo -u svn svnadmin hotcopy /var/svn/backups/project1.bak /opt/svn/repos/project1

5. 踩坑

5.1. 默认权限不能查看历史记录

使用svnadmin create创建“仓库”后,默认不能查看历史记录。需要修改“仓库”下的conf/svnserve.conf,设置以下两个配置项,并重启SVN服务:

anon-access = none
auth-access = write

5.2. 影响MySQL部署

Debian 11的服务器上,通过MySQL官方源安装了“mysql-server”服务,再安装“subversion”后,MySQL的配置文件被改为指向了MariaDB的(如下),导致MySQL服务不能运行。手工修正解决问题。

/etc/alternatives/my.cnf -> /etc/mysql/mariadb.cnf

由于需要开发移动端App,有机会用上了Apple M1 CPU的Mac Mini。对比过Windows 11、Ubuntu 22.04,感觉是目前最好的 App开发机。

1. 优点

1.1. 唯一支持两大移动平台

没办法,Build iOS App和提交Apple Store,都只能使用Xcode ,也就离不开Mac OS。

Android就随意了。

1.2. 支持两大平台的模拟器,且很爽

支持iOS和Android的模拟器,除了Android SDK下载镜像慢,创建很快、运行很流畅。能满足大部分需要兼容多个系统版本的开发测试的场景。相比购买和使用一堆实体机器进行开发测试,这个方便太多。

但是模拟器具体有哪些不支持的功能,需要使用实体机验证的,暂时没遇到。

2. 缺点

2.1. 快捷键不习惯

估计Mac OS新手用户,特别是从Windows或Linux切换过来的,都有这个问题吧。网上有解决方案,调整快捷键。但我选择了去适应它。

2.2. 随机弹出屏幕保护程序

这个屏幕保护程序,即使设置了关闭,也会弹出。主要是使用VNC远程时关不掉,导致不能进入桌面。解决方案见“附录”。

2.3. 使用 VNC 远程桌面比较卡

由于使用非Mac的电脑远程过去,只能使用VNC。用Mac远程到Mac,可能是另一种体验。

2.4. 不支持多人同时远程桌面

由于习惯了Windows、Linux的服务器,可以多人同时远程上去,特别是Windows远程桌面可以多人同时使用,所以提出这个问题。某个程序员的社交网站发过贴请教,结果被吐槽为什么不是人手一台Mac……当然,人手一台Mac的话,就不会有这个应用场景和问题了。

另外,Mac的SSH服务是可以多人同时登录。

3. 附录

踩过的坑,参考:Headless Mac Mini 折腾记

3.1 远程访问

最好是同时开启屏幕共享(vnc)远程登录(ssh)。遇到万不得已的情况,可以SSH进去执行sudo reboot重启。

3.2 关闭屏幕保护

在图形界面“设置”关闭了屏幕保护后,仍然会随机运行。可使用以下命令设置关闭:

sudo defaults write /Library/Preferences/com.apple.screensaver loginWindowIdleTime 0

如果屏幕保护程序在运行且不能退出的情况,可以使用以下命令(可以通过SSH执行)去关闭其进程,实现退出屏幕保护:

killall ScreenSaverEngine

1 背景

烘焙咖啡豆的过程中,为了更好地测量和记录温度及其变化,使用ESP32C3制作了一个温度监控模块。

2 需求

如果从烘焙咖啡豆的角度去考虑,参照商用机器的设计,这个温度监测会有很多功能要实现。作为起始的设计,还是先简化需求,逐步实现更多功能。所以第一版先确定以下需求:

  • 计时。这个实现起来简单,记录的温度也需要跟时间关联。
  • 读取测量出的温度值。用于展示、分析、记录等。
  • 展示温度变化。显示当前温度值,和温度与时间的曲线。

3 设计

3.1 元件

  • 主控:ESP32C3-Core,刷上MicroPython固件

    • 此开发板很廉价,最低9.9 RMB包邮。
    • 支持WiFi和蓝牙,数据可以很方便地同步到其它设备。
    • 基于MicroPython开发程序,调试很方便。
  • 温度检测模块:MAX6675,K型热电偶温度传感模块,SPI接口。

    • 廉价。
    • 最低精度为0.25 °C,够用。
  • 温度检测探头:K型铠装热电偶,探头为可弯曲、接壳式、304不锈钢材质。

    • 这个探头比温度检测模块附送的灵敏很多。
    • 一般型号为WRNK191。
    • 参考规格:直径1mm,插深50mm,线长500mm。
  • 显示模块:SSD1306,单色OLED屏,0.96英寸,分辨率128x64,两线I2C接口。

    • 廉价。
    • I2C接口的数据线只有两根,减少GPIO的占用。

3.2 接线

这里忽略电源(VCC 3.3V)和接地(GND)的连接。详细如下:

ESP32C3的接口模块接口
GPIO10MAX6675SO
GPIO02MAX6675SCK
GPIO12MAX6675CS
GPIO05SSD1306SCL
GPIO04SSD1306SDA

接线不是固定的,可以根据实际调整,但是要改main.py的对应配置。

4 程序

相关代码文件:

  • max6675.py:MAX6675的驱动程序。
  • ssd1306.py:SSD1306的驱动程序。
  • series_list.py:温度数据类,方便后面扩展对温度数据的保存。
  • main.py:主程序。

4.1 MAX6675的驱动程序

文件名max6675.py

# from https://github.com/BetaRavener/micropython-hw-lib/blob/master/MAX6675/max6675.py

import time


class MAX6675:
    MEASUREMENT_PERIOD_MS = 220

    def __init__(self, sck, cs, so):
        """
        Creates new object for controlling MAX6675
        :param sck: SCK (clock) pin, must be configured as Pin.OUT
        :param cs: CS (select) pin, must be configured as Pin.OUT
        :param so: SO (data) pin, must be configured as Pin.IN
        """
        # Thermocouple
        self._sck = sck
        self._sck.off()

        self._cs = cs
        self._cs.on()

        self._so = so
        self._so.off()

        self._last_measurement_start = 0
        self._last_read_temp = 0
        self._error = 0

    def _cycle_sck(self):
        self._sck.on()
        time.sleep_us(1)
        self._sck.off()
        time.sleep_us(1)

    def refresh(self):
        """
        Start a new measurement.
        """
        self._cs.off()
        time.sleep_us(10)
        self._cs.on()
        self._last_measurement_start = time.ticks_ms()

    def ready(self):
        """
        Signals if measurement is finished.
        :return: True if measurement is ready for reading.
        """
        return time.ticks_ms() - self._last_measurement_start > MAX6675.MEASUREMENT_PERIOD_MS

    def error(self):
        """
        Returns error bit of last reading. If this bit is set (=1), there's problem with the
        thermocouple - it can be damaged or loosely connected
        :return: Error bit value
        """
        return self._error

    def read(self):
        """
        Reads last measurement and starts a new one. If new measurement is not ready yet, returns last value.
        Note: The last measurement can be quite old (e.g. since last call to `read`).
        To refresh measurement, call `refresh` and wait for `ready` to become True before reading.
        :return: Measured temperature
        """
        # Check if new reading is available
        if self.ready():
            # Bring CS pin low to start protocol for reading result of
            # the conversion process. Forcing the pin down outputs
            # first (dummy) sign bit 15.
            self._cs.off()
            time.sleep_us(10)

            # Read temperature bits 14-3 from MAX6675.
            value = 0
            for i in range(12):
                # SCK should resemble clock signal and new SO value
                # is presented at falling edge
                self._cycle_sck()
                value += self._so.value() << (11 - i)

            # Read the TC Input pin to check if the input is open
            self._cycle_sck()
            self._error = self._so.value()

            # Read the last two bits to complete protocol
            for i in range(2):
                self._cycle_sck()

            # Finish protocol and start new measurement
            self._cs.on()
            self._last_measurement_start = time.ticks_ms()

            self._last_read_temp = value * 0.25

        return self._last_read_temp

4.2 SSD1306的驱动程序

文件名ssd1306.py

这个驱动继承了FrameBuffer类,显示文字或绘图的方法,参考FrameBuffer类的文档即可(代码有说明)。

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
# from https://github.com/micropython/micropython-lib/blob/master/micropython/drivers/display/ssd1306/ssd1306.py

from micropython import const
import framebuf


# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)

# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        self.buffer = bytearray(self.pages * self.width)
        super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
        self.init_display()

    def init_display(self):
        for cmd in (
            SET_DISP,  # display off
            # address setting
            SET_MEM_ADDR,
            0x00,  # horizontal
            # resolution and layout
            SET_DISP_START_LINE,  # start at line 0
            SET_SEG_REMAP | 0x01,  # column addr 127 mapped to SEG0
            SET_MUX_RATIO,
            self.height - 1,
            SET_COM_OUT_DIR | 0x08,  # scan from COM[N] to COM0
            SET_DISP_OFFSET,
            0x00,
            SET_COM_PIN_CFG,
            0x02 if self.width > 2 * self.height else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV,
            0x80,
            SET_PRECHARGE,
            0x22 if self.external_vcc else 0xF1,
            SET_VCOM_DESEL,
            0x30,  # 0.83*Vcc
            # display
            SET_CONTRAST,
            0xFF,  # maximum
            SET_ENTIRE_ON,  # output follows RAM contents
            SET_NORM_INV,  # not inverted
            SET_IREF_SELECT,
            0x30,  # enable internal IREF during display on
            # charge pump
            SET_CHARGE_PUMP,
            0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01,  # display on
        ):  # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()

    def poweroff(self):
        self.write_cmd(SET_DISP)

    def poweron(self):
        self.write_cmd(SET_DISP | 0x01)

    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)

    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))

    def rotate(self, rotate):
        self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
        self.write_cmd(SET_SEG_REMAP | (rotate & 1))

    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width != 128:
            # narrow displays use centred columns
            col_offset = (128 - self.width) // 2
            x0 += col_offset
            x1 += col_offset
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_data(self.buffer)


class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        self.write_list = [b"\x40", None]  # Co=0, D/C#=1
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.temp[0] = 0x80  # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)

    def write_data(self, buf):
        self.write_list[1] = buf
        self.i2c.writevto(self.addr, self.write_list)


class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        import time

        self.res(1)
        time.sleep_ms(1)
        self.res(0)
        time.sleep_ms(10)
        self.res(1)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(0)
        self.cs(0)
        self.spi.write(bytearray([cmd]))
        self.cs(1)

    def write_data(self, buf):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(1)
        self.cs(0)
        self.spi.write(buf)
        self.cs(1)

4.3 温度数据类

文件名series_list.py

想模拟成时序数据库,目前只是保存了屏幕可以显示的数据数量。

class SeriesList:
    
    def __init__(self, maxLen: int, firstVal: float):
        self._maxLen = 1 if maxLen <= 0 else maxLen
        self._list = [firstVal]
        self._len = len(self._list)
    
    def append(self, val: float):
        self._list.append(val)
        if (self._len + 1) > self._maxLen:
            delVal = self._list.pop(0)
        else:
            self._len += 1
    
    def last(self, index: int = 0) -> int:
        return self._list[self._len - index -1]
    
    def histogram(self, maxRange: int) -> list:
        hList = self._list.copy()
        hMax = int(max(self._list))
        hMin = int(min(self._list))
        hRange = hMax - hMin + 1
        rate = 1 if hRange <= maxRange else (maxRange / hRange)
        if rate == 1:
            for i, v in enumerate(hList):
                hList[i] = int(v) - hMin
        else:
            for i, v in enumerate(hList):
                hList[i] = int((int(v) - hMin) * rate)
        return hList

4.4 主程序

文件名main.py

总结一下:

  • 使用了定时任务(Timer)去探测温度和更新显示,每秒执行一次。
  • 每次执行最大耗时约150毫秒,绘画曲线的代码可以再优化。
  • 绘画曲线时,如果把整个区域置黑,再画曲线,性能比较高,但是会闪屏。目前是逐列置黑,再画,虽然慢了点,但观感良好。
  • 展示的曲线是为了直观看到温度变化,如果显示范围内的温度差超过屏幕区域的分辨率,曲线会按比例压缩。
from micropython import const
import time
from machine import Pin, SoftI2C, Timer
from max6675 import MAX6675
from ssd1306 import SSD1306_I2C
from series_list import SeriesList

UNIT_60 = const(60)
SCREEN_W = const(128)
SCREEN_H = const(64)
HISTOGRAM_X = const(0)
HISTOGRAM_Y = const(10)
HISTOGRAM_W = const(SCREEN_W)
HISTOGRAM_H = const(SCREEN_H - HISTOGRAM_Y)

""" init MAX6675 ################################## """
print('init MAX6675')
so = Pin(10, Pin.IN) # GPIO10
sck = Pin(2, Pin.OUT) # GPIO02
cs = Pin(12, Pin.OUT) # GPIO12
max = MAX6675(sck, cs, so)
time.sleep(1)
curTemp = max.read()

""" init OLED ################################## """
print('init OLED')
i2c = SoftI2C(scl=Pin(5), sda=Pin(4))
oled = SSD1306_I2C(SCREEN_W, SCREEN_H, i2c)

""" init Ticks ################################## """
ticksStart = time.ticks_ms()

""" init Temperature list ################################## """
sList = SeriesList(SCREEN_W, curTemp)

""" init Timer ################################## """
def timerRefresh(t):
    global max, curTemp, oled, timeSec, sList
    
    duration = 0 if ticksStart==None else int(time.ticks_diff(time.ticks_ms(), ticksStart) / 1000)
    dSec = duration % UNIT_60
    dMin = int(duration / UNIT_60)
    
    curTemp = max.read() # Current temperature
    sList.append(curTemp)
    
    oled.fill(0)
    oled.text('{:02d}:{:02d} | {:>6.2f} C'.format(dMin, dSec, curTemp) , 0, 0)
    hList = sList.histogram(HISTOGRAM_H)
    hLen = len(hList)
    hPre = 0
    hCur = 0
    startX = SCREEN_W - hLen
    startY = HISTOGRAM_Y + HISTOGRAM_H - 1
    for i, v in enumerate(hList):
        hPre = v if i == 0 else hCur
        hCur = v
        oled.vline(startX + i, startY - hCur, (hCur - hPre if hPre < hCur else 1), 1)
        if hPre > hCur:
            oled.vline(startX + i - 1, startY - hPre, hPre - hCur, 1)
    oled.show()

timerTemp = Timer(0)
timerTemp.init(period=500, mode=Timer.PERIODIC, callback=timerRefresh) # Every 1 second

print('start run')

为了在MicroPython上实现Web服务,找了几个框架,最终选择tinyweb

tinyweb主要是小巧、符合编程习惯、做好错误处理、实现了HTTP 1.0基本功能等。相关信息如下:

但是tinyweb采用logging做log记录,而logging依赖的库有点多,于是改为使用ulooger。修改后的源码如下:

"""
Tiny Web - pretty simple and powerful web server for tiny platforms like ESP8266 / ESP32
MIT license
(C) Konstantin Belyalov 2017-2018

- project: https://github.com/belyalov/tinyweb
- source code: https://github.com/belyalov/tinyweb/blob/master/tinyweb/server.py
- version: e92546054b905de46f42157282677f56764fb2f0

edit: replace logging with ulogger
"""
import ulogger
import uasyncio as asyncio
import uasyncio.core
import ujson as json
import gc
import uos as os
import sys
import uerrno as errno
import usocket as socket


#log = logging.getLogger('WEB')
class UtcClock(ulogger.BaseClock):
    def __call__(self) -> str:
        # UTC date time: (year, month, mday, hour, minute, second, weekday, yearday)
        dt = time.gmtime()
        return f'{dt[0]}{dt[1]:02d}{dt[2]:02d}_{dt[3]:02d}{dt[4]:02d}{dt[5]:02d}'

handler_to_term = ulogger.Handler(
    level=ulogger.INFO,
    colorful=True,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_TERM,
)

handler_to_file = ulogger.Handler(
    level=ulogger.INFO,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_FILE,
    file_name="tinyweb.log",
    max_file_size=102400 # max for 100KB
)
logger = ulogger.Logger(
    name = __name__,
    handlers = (
        handler_to_term,
        handler_to_file
    )
)

type_gen = type((lambda: (yield))())

# uasyncio v3 is shipped with MicroPython 1.13, and contains some subtle
# but breaking changes. See also https://github.com/peterhinch/micropython-async/blob/master/v3/README.md
IS_UASYNCIO_V3 = hasattr(asyncio, "__version__") and asyncio.__version__ >= (3,)


def urldecode_plus(s):
    """Decode urlencoded string (including '+' char).

    Returns decoded string
    """
    s = s.replace('+', ' ')
    arr = s.split('%')
    res = arr[0]
    for it in arr[1:]:
        if len(it) >= 2:
            res += chr(int(it[:2], 16)) + it[2:]
        elif len(it) == 0:
            res += '%'
        else:
            res += it
    return res


def parse_query_string(s):
    """Parse urlencoded string into dict.

    Returns dict
    """
    res = {}
    pairs = s.split('&')
    for p in pairs:
        vals = [urldecode_plus(x) for x in p.split('=', 1)]
        if len(vals) == 1:
            res[vals[0]] = ''
        else:
            res[vals[0]] = vals[1]
    return res


class HTTPException(Exception):
    """HTTP protocol exceptions"""

    def __init__(self, code=400):
        self.code = code


class request:
    """HTTP Request class"""

    def __init__(self, _reader):
        self.reader = _reader
        self.headers = {}
        self.method = b''
        self.path = b''
        self.query_string = b''

    async def read_request_line(self):
        """Read and parse first line (AKA HTTP Request Line).
        Function is generator.

        Request line is something like:
        GET /something/script?param1=val1 HTTP/1.1
        """
        while True:
            rl = await self.reader.readline()
            # skip empty lines
            if rl == b'\r\n' or rl == b'\n':
                continue
            break
        rl_frags = rl.split()
        if len(rl_frags) != 3:
            raise HTTPException(400)
        self.method = rl_frags[0]
        url_frags = rl_frags[1].split(b'?', 1)
        self.path = url_frags[0]
        if len(url_frags) > 1:
            self.query_string = url_frags[1]

    async def read_headers(self, save_headers=[]):
        """Read and parse HTTP headers until \r\n\r\n:
        Optional argument 'save_headers' controls which headers to save.
            This is done mostly to deal with memory constrains.

        Function is generator.

        HTTP headers could be like:
        Host: google.com
        Content-Type: blah
        \r\n
        """
        while True:
            gc.collect()
            line = await self.reader.readline()
            if line == b'\r\n':
                break
            frags = line.split(b':', 1)
            if len(frags) != 2:
                raise HTTPException(400)
            if frags[0] in save_headers:
                self.headers[frags[0]] = frags[1].strip()

    async def read_parse_form_data(self):
        """Read HTTP form data (payload), if any.
        Function is generator.

        Returns:
            - dict of key / value pairs
            - None in case of no form data present
        """
        # TODO: Probably there is better solution how to handle
        # request body, at least for simple urlencoded forms - by processing
        # chunks instead of accumulating payload.
        gc.collect()
        if b'Content-Length' not in self.headers:
            return {}
        # Parse payload depending on content type
        if b'Content-Type' not in self.headers:
            # Unknown content type, return unparsed, raw data
            return {}
        size = int(self.headers[b'Content-Length'])
        if size > self.params['max_body_size'] or size < 0:
            raise HTTPException(413)
        data = await self.reader.readexactly(size)
        # Use only string before ';', e.g:
        # application/x-www-form-urlencoded; charset=UTF-8
        ct = self.headers[b'Content-Type'].split(b';', 1)[0]
        try:
            if ct == b'application/json':
                return json.loads(data)
            elif ct == b'application/x-www-form-urlencoded':
                return parse_query_string(data.decode())
        except ValueError:
            # Re-generate exception for malformed form data
            raise HTTPException(400)


class response:
    """HTTP Response class"""

    def __init__(self, _writer):
        self.writer = _writer
        self.send = _writer.awrite
        self.code = 200
        self.version = '1.0'
        self.headers = {}

    async def _send_headers(self):
        """Compose and send:
        - HTTP request line
        - HTTP headers following by \r\n.
        This function is generator.

        P.S.
        Because of usually we have only a few HTTP headers (2-5) it doesn't make sense
        to send them separately - sometimes it could increase latency.
        So combining headers together and send them as single "packet".
        """
        # Request line
        hdrs = 'HTTP/{} {} MSG\r\n'.format(self.version, self.code)
        # Headers
        for k, v in self.headers.items():
            hdrs += '{}: {}\r\n'.format(k, v)
        hdrs += '\r\n'
        # Collect garbage after small mallocs
        gc.collect()
        await self.send(hdrs)

    async def error(self, code, msg=None):
        """Generate HTTP error response
        This function is generator.

        Arguments:
            code - HTTP response code

        Example:
            # Not enough permissions. Send HTTP 403 - Forbidden
            await resp.error(403)
        """
        self.code = code
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    async def redirect(self, location, msg=None):
        """Generate HTTP redirect response to 'location'.
        Basically it will generate HTTP 302 with 'Location' header

        Arguments:
            location - URL to redirect to

        Example:
            # Redirect to /something
            await resp.redirect('/something')
        """
        self.code = 302
        self.add_header('Location', location)
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    def add_header(self, key, value):
        """Add HTTP response header

        Arguments:
            key - header name
            value - header value

        Example:
            resp.add_header('Content-Encoding', 'gzip')
        """
        self.headers[key] = value

    def add_access_control_headers(self):
        """Add Access Control related HTTP response headers.
        This is required when working with RestApi (JSON requests)
        """
        self.add_header('Access-Control-Allow-Origin', self.params['allowed_access_control_origins'])
        self.add_header('Access-Control-Allow-Methods', self.params['allowed_access_control_methods'])
        self.add_header('Access-Control-Allow-Headers', self.params['allowed_access_control_headers'])

    async def start_html(self):
        """Start response with HTML content type.
        This function is generator.

        Example:
            await resp.start_html()
            await resp.send('<html><h1>Hello, world!</h1></html>')
        """
        self.add_header('Content-Type', 'text/html')
        await self._send_headers()

    async def send_file(self, filename, content_type=None, content_encoding=None, max_age=2592000, buf_size=128):
        """Send local file as HTTP response.
        This function is generator.

        Arguments:
            filename - Name of file which exists in local filesystem
        Keyword arguments:
            content_type - Filetype. By default - None means auto-detect.
            max_age - Cache control. How long browser can keep this file on disk.
                      By default - 30 days
                      Set to 0 - to disable caching.

        Example 1: Default use case:
            await resp.send_file('images/cat.jpg')

        Example 2: Disable caching:
            await resp.send_file('static/index.html', max_age=0)

        Example 3: Override content type:
            await resp.send_file('static/file.bin', content_type='application/octet-stream')
        """
        try:
            # Get file size
            stat = os.stat(filename)
            slen = str(stat[6])
            self.add_header('Content-Length', slen)
            # Find content type
            if content_type:
                self.add_header('Content-Type', content_type)
            # Add content-encoding, if any
            if content_encoding:
                self.add_header('Content-Encoding', content_encoding)
            # Since this is static content is totally make sense
            # to tell browser to cache it, however, you can always
            # override it by setting max_age to zero
            self.add_header('Cache-Control', 'max-age={}, public'.format(max_age))
            with open(filename) as f:
                await self._send_headers()
                gc.collect()
                buf = bytearray(min(stat[6], buf_size))
                while True:
                    size = f.readinto(buf)
                    if size == 0:
                        break
                    await self.send(buf, sz=size)
        except OSError as e:
            # special handling for ENOENT / EACCESS
            if e.args[0] in (errno.ENOENT, errno.EACCES):
                raise HTTPException(404)
            else:
                raise


async def restful_resource_handler(req, resp, param=None):
    """Handler for RESTful API endpoins"""
    # Gather data - query string, JSON in request body...
    data = await req.read_parse_form_data()
    # Add parameters from URI query string as well
    # This one is actually for simply development of RestAPI
    if req.query_string != b'':
        data.update(parse_query_string(req.query_string.decode()))
    # Call actual handler
    _handler, _kwargs = req.params['_callmap'][req.method]
    # Collect garbage before / after handler execution
    gc.collect()
    if param:
        res = _handler(data, param, **_kwargs)
    else:
        res = _handler(data, **_kwargs)
    gc.collect()
    # Handler result could be:
    # 1. generator - in case of large payload
    # 2. string - just string :)
    # 2. dict - meaning client what tinyweb to convert it to JSON
    # it can also return error code together with str / dict
    # res = {'blah': 'blah'}
    # res = {'blah': 'blah'}, 201
    if isinstance(res, type_gen):
        # Result is generator, use chunked response
        # NOTICE: HTTP 1.0 by itself does not support chunked responses, so, making workaround:
        # Response is HTTP/1.1 with Connection: close
        resp.version = '1.1'
        resp.add_header('Connection', 'close')
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Transfer-Encoding', 'chunked')
        resp.add_access_control_headers()
        await resp._send_headers()
        # Drain generator
        for chunk in res:
            chunk_len = len(chunk.encode('utf-8'))
            await resp.send('{:x}\r\n'.format(chunk_len))
            await resp.send(chunk)
            await resp.send('\r\n')
            gc.collect()
        await resp.send('0\r\n\r\n')
    else:
        if type(res) == tuple:
            resp.code = res[1]
            res = res[0]
        elif res is None:
            raise Exception('Result expected')
        # Send response
        if type(res) is dict:
            res_str = json.dumps(res)
        else:
            res_str = res
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Content-Length', str(len(res_str)))
        resp.add_access_control_headers()
        await resp._send_headers()
        await resp.send(res_str)


class webserver:

    def __init__(self, request_timeout=3, max_concurrency=3, backlog=16, debug=False):
        """Tiny Web Server class.
        Keyword arguments:
            request_timeout - Time for client to send complete request
                              after that connection will be closed.
            max_concurrency - How many connections can be processed concurrently.
                              It is very important to limit this number because of
                              memory constrain.
                              Default value depends on platform
            backlog         - Parameter to socket.listen() function. Defines size of
                              pending to be accepted connections queue.
                              Must be greater than max_concurrency
            debug           - Whether send exception info (text + backtrace)
                              to client together with HTTP 500 or not.
        """
        self.loop = asyncio.get_event_loop()
        self.request_timeout = request_timeout
        self.max_concurrency = max_concurrency
        self.backlog = backlog
        self.debug = debug
        self.explicit_url_map = {}
        self.catch_all_handler = None
        self.parameterized_url_map = {}
        # Currently opened connections
        self.conns = {}
        # Statistics
        self.processed_connections = 0

    def _find_url_handler(self, req):
        """Helper to find URL handler.
        Returns tuple of (function, opts, param) or (None, None) if not found.
        """
        # First try - lookup in explicit (non parameterized URLs)
        if req.path in self.explicit_url_map:
            return self.explicit_url_map[req.path]
        # Second try - strip last path segment and lookup in another map
        idx = req.path.rfind(b'/') + 1
        path2 = req.path[:idx]
        if len(path2) > 0 and path2 in self.parameterized_url_map:
            # Save parameter into request
            req._param = req.path[idx:].decode()
            return self.parameterized_url_map[path2]

        if self.catch_all_handler:
            return self.catch_all_handler

        # No handler found
        return (None, None)

    async def _handle_request(self, req, resp):
        await req.read_request_line()
        # Find URL handler
        req.handler, req.params = self._find_url_handler(req)
        if not req.handler:
            # No URL handler found - read response and issue HTTP 404
            await req.read_headers()
            raise HTTPException(404)
        # req.params = params
        # req.handler = han
        resp.params = req.params
        # Read / parse headers
        await req.read_headers(req.params['save_headers'])

    async def _handler(self, reader, writer):
        """Handler for TCP connection with
        HTTP/1.0 protocol implementation
        """
        gc.collect()

        try:
            req = request(reader)
            resp = response(writer)
            # Read HTTP Request with timeout
            await asyncio.wait_for(self._handle_request(req, resp),
                                   self.request_timeout)

            # OPTIONS method is handled automatically
            if req.method == b'OPTIONS':
                resp.add_access_control_headers()
                # Since we support only HTTP 1.0 - it is important
                # to tell browser that there is no payload expected
                # otherwise some webkit based browsers (Chrome)
                # treat this behavior as an error
                resp.add_header('Content-Length', '0')
                await resp._send_headers()
                return

            # Ensure that HTTP method is allowed for this path
            if req.method not in req.params['methods']:
                raise HTTPException(405)

            # Handle URL
            gc.collect()
            if hasattr(req, '_param'):
                await req.handler(req, resp, req._param)
            else:
                await req.handler(req, resp)
            # Done here
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass
        except OSError as e:
            # Do not send response for connection related errors - too late :)
            # P.S. code 32 - is possible BROKEN PIPE error (TODO: is it true?)
            if e.args[0] not in (errno.ECONNABORTED, errno.ECONNRESET, 32):
                try:
                    await resp.error(500)
                except Exception as e:
                    #log.exc(e, "")
                    logger.error(f'OSError: {e}')
        except HTTPException as e:
            try:
                await resp.error(e.code)
            except Exception as e:
                #log.exc(e)
                logger.error(f'HTTPException: {e}')
        except Exception as e:
            # Unhandled expection in user's method
            #log.error(req.path.decode())
            #log.exc(e, "")
            logger.error(f'Unhandled expection. URL: {req.path.decode()}, exception: {e}')
            try:
                await resp.error(500)
                # Send exception info if desired
                if self.debug:
                    sys.print_exception(e, resp.writer.s)
            except Exception as e:
                pass
        finally:
            await writer.aclose()
            # Max concurrency support -
            # if queue is full schedule resume of TCP server task
            if len(self.conns) == self.max_concurrency:
                self.loop.create_task(self._server_coro)
            # Delete connection, using socket as a key
            del self.conns[id(writer.s)]

    def add_route(self, url, f, **kwargs):
        """Add URL to function mapping.

        Arguments:
            url - url to map function with
            f - function to map

        Keyword arguments:
            methods - list of allowed methods. Defaults to ['GET', 'POST']
            save_headers - contains list of HTTP headers to be saved. Case sensitive. Default - empty.
            max_body_size - Max HTTP body size (e.g. POST form data). Defaults to 1024
            allowed_access_control_headers - Default value for the same name header. Defaults to *
            allowed_access_control_origins - Default value for the same name header. Defaults to *
        """
        if url == '' or '?' in url:
            raise ValueError('Invalid URL')
        # Initial params for route
        params = {'methods': ['GET'],
                  'save_headers': [],
                  'max_body_size': 1024,
                  'allowed_access_control_headers': '*',
                  'allowed_access_control_origins': '*',
                  }
        params.update(kwargs)
        params['allowed_access_control_methods'] = ', '.join(params['methods'])
        # Convert methods/headers to bytestring
        params['methods'] = [x.encode() for x in params['methods']]
        params['save_headers'] = [x.encode() for x in params['save_headers']]
        # If URL has a parameter
        if url.endswith('>'):
            idx = url.rfind('<')
            path = url[:idx]
            idx += 1
            param = url[idx:-1]
            if path.encode() in self.parameterized_url_map:
                raise ValueError('URL exists')
            params['_param_name'] = param
            self.parameterized_url_map[path.encode()] = (f, params)

        if url.encode() in self.explicit_url_map:
            raise ValueError('URL exists')
        self.explicit_url_map[url.encode()] = (f, params)

    def add_resource(self, cls, url, **kwargs):
        """Map resource (RestAPI) to URL

        Arguments:
            cls - Resource class to map to
            url - url to map to class
            kwargs - User defined key args to pass to the handler.

        Example:
            class myres():
                def get(self, data):
                    return {'hello': 'world'}


            app.add_resource(myres, '/api/myres')
        """
        methods = []
        callmap = {}
        # Create instance of resource handler, if passed as just class (not instance)
        try:
            obj = cls()
        except TypeError:
            obj = cls
        # Get all implemented HTTP methods and make callmap
        for m in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            fn = m.lower()
            if hasattr(obj, fn):
                methods.append(m)
                callmap[m.encode()] = (getattr(obj, fn), kwargs)
        self.add_route(url, restful_resource_handler,
                       methods=methods,
                       save_headers=['Content-Length', 'Content-Type'],
                       _callmap=callmap)

    def catchall(self):
        """Decorator for catchall()

        Example:
            @app.catchall()
            def catchall_handler(req, resp):
                response.code = 404
                await response.start_html()
                await response.send('<html><body><h1>My custom 404!</h1></html>\n')
        """
        params = {'methods': [b'GET'], 'save_headers': [], 'max_body_size': 1024, 'allowed_access_control_headers': '*', 'allowed_access_control_origins': '*'}

        def _route(f):
            self.catch_all_handler = (f, params)
            return f
        return _route

    def route(self, url, **kwargs):
        """Decorator for add_route()

        Example:
            @app.route('/')
            def index(req, resp):
                await resp.start_html()
                await resp.send('<html><body><h1>Hello, world!</h1></html>\n')
        """
        def _route(f):
            self.add_route(url, f, **kwargs)
            return f
        return _route

    def resource(self, url, method='GET', **kwargs):
        """Decorator for add_resource() method

        Examples:
            @app.resource('/users')
            def users(data):
                return {'a': 1}

            @app.resource('/messages/<topic_id>')
            async def index(data, topic_id):
                yield '{'
                yield '"topic_id": "{}",'.format(topic_id)
                yield '"message": "test",'
                yield '}'
        """
        def _resource(f):
            self.add_route(url, restful_resource_handler,
                           methods=[method],
                           save_headers=['Content-Length', 'Content-Type'],
                           _callmap={method.encode(): (f, kwargs)})
            return f
        return _resource

    async def _tcp_server(self, host, port, backlog):
        """TCP Server implementation.
        Opens socket for accepting connection and
        creates task for every new accepted connection
        """
        addr = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][-1]
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(addr)
        sock.listen(backlog)
        try:
            while True:
                if IS_UASYNCIO_V3:
                    yield uasyncio.core._io_queue.queue_read(sock)
                else:
                    yield asyncio.IORead(sock)
                csock, caddr = sock.accept()
                csock.setblocking(False)
                # Start handler / keep it in the map - to be able to
                # shutdown gracefully - by close all connections
                self.processed_connections += 1
                hid = id(csock)
                handler = self._handler(asyncio.StreamReader(csock),
                                        asyncio.StreamWriter(csock, {}))
                self.conns[hid] = handler
                self.loop.create_task(handler)
                # In case of max concurrency reached - temporary pause server:
                # 1. backlog must be greater than max_concurrency, otherwise
                #    client will got "Connection Reset"
                # 2. Server task will be resumed whenever one active connection finished
                if len(self.conns) == self.max_concurrency:
                    # Pause
                    yield False
        except asyncio.CancelledError:
            return
        finally:
            sock.close()

    def run(self, host="127.0.0.1", port=8081, loop_forever=True):
        """Run Web Server. By default it runs forever.

        Keyword arguments:
            host - host to listen on. By default - localhost (127.0.0.1)
            port - port to listen on. By default - 8081
            loop_forever - run loo.loop_forever(), otherwise caller must run it by itself.
        """
        self._server_coro = self._tcp_server(host, port, self.backlog)
        self.loop.create_task(self._server_coro)
        if loop_forever:
            self.loop.run_forever()

    def shutdown(self):
        """Gracefully shutdown Web Server"""
        asyncio.cancel(self._server_coro)
        for hid, coro in self.conns.items():
            asyncio.cancel(coro)

使用时,其实也有不少东西需要注意。

  1. 处理POST请求的方法,其注解需要设置save_headers = ['Content-Length','Content-Type'],才能获取到请求的参数。例如:
app = tinyweb.webserver()

@app.route('/save', methods = ['POST'], save_headers = ['Content-Length','Content-Type'])
async def save(request, response):
    params = await request.read_parse_form_data()
    await response.send(params)
  1. 一个方法处理多个请求链接,加多行注解即可。例如:
app = tinyweb.webserver()

@app.route('/')
@app.route('/index')
async def index(request, response):
    await response.send("Hello world!")
  1. 如果需要浏览器或对方客户端不要缓存,设置响应头即可。相关代码:
    response.add_header('Cache-Control', 'no-cache')  # for HTTP 1.1
    response.add_header('Pragma', 'no-cache')  # for HTTP 1.0
    response.add_header('Expires', '0')  # for proxies
  1. Web服务的max_concurrency不要设置太大,以降低内存占用。
  2. 更多的示例,可参考官方文档。

采用MicroPython编写的定时任务,特别是在实际环境测试,一般不能看到错误信息。因此,需要做log记录。

找到一个实现Logger功能的项目ulogger,其代码没有依赖其它库,使用方式也符合一般的Logger用法。相关信息如下:

相关代码复制了一份过来,取消了TextIOWrapper的引用:

"""
- project: micropython-ulogger
  https://github.com/whales-chen/micropython-ulogger
- code from
  https://github.com/whales-chen/micropython-ulogger/blob/main/ulogger/__init__.py
- version
  ec4f6b3842c677fbb457f6bc6d88afd8a82eeed6
"""
try:    import time
except: import utime as time

try:    from micropython import const
except: const = lambda x:x # for debug

#from io import TextIOWrapper
import io

__version__ = "v1.2"

DEBUG:    int = const(10)
INFO:     int = const(20)
WARN:     int = const(30)
ERROR:    int = const(40)
CRITICAL: int = const(50)

TO_FILE = const(100)
TO_TERM = const(200)

# fmt map 的可选参数
_level  = const(0)
_msg    = const(1)
_time   = const(2)
_name   = const(3)
_fnname = const(4)


def level_name(level: int, color: bool = False) -> str:
    if not color:
        if level == INFO:
            return "INFO"
        elif level == DEBUG:
            return "DEBUG"
        elif level == WARN:
            return "WARN"
        elif level == ERROR:
            return "ERROR"
        elif level == CRITICAL:
            return "CRITICAL"
    else:
        if level == INFO:
            return "\033[97mINFO\033[0m"
        elif level == DEBUG:
            return "\033[37mDEBUG\033[0m"
        elif level == WARN:
            return "\033[93mWARN\033[0m"
        elif level == ERROR:
            return "\033[35mERROR\033[0m"
        elif level == CRITICAL:
            return "\033[91mCRITICAL\033[0m"


class BaseClock ():
    """
    This is a BaseClock for the logger.
    Please inherit this class by your custom.
    """

    def __call__(self) -> str:
        """
        Acquire the time of now, please inherit this method.
        We will use the return value of this function as the time format of the log,
        such as `2021 - 6 - 13` or `12:45:23` and so on.

        :return: the time string.
        """
        return '%d' % time.time()


class Handler():
    """The Handler for logger.
    """
    _template: str
    _map: bytes
    level: int
    _direction: int
    _clock: BaseClock
    _color: bool
    _file_name: str
    _max_size: int
    #_file = TextIOWrapper
    _file = None

    def __init__(self,
        level: int = INFO,
        colorful: bool = True,
        fmt: str = "&(time)% - &(level)% - &(name)% - &(msg)%",
        clock: BaseClock = None,
        direction: int = TO_TERM,
        file_name: str = "logging.log",
        max_file_size: int = 4096
        ):
        """
        Create a Handler that you can add to the logger later

        ## Options available for fmt.
        - &(level)%  : the log level
        - &(msg)%    : the log message
        - &(time)%   : the time acquire from clock, see `BaseClock`
        - &(name)%   : the logger's name
        - &(fnname)%  : the function name which you will pass on.
        - more optional is developing.

        ## Options available for level.
        - DEBUG
        - INFO
        - WARN
        - ERROR
        - CRITICAL

        ## Options available for direction.
        - TO_FILE : output to a file
        - TO_TERM : output to terminal

        :param level: Set a minimum level you want to be log
        :type level: int(see the consts in this module)

        :param colorful: Whether to use color display information to terminal(Not applicable to files)
        :type colorful: bool

        :param fmt: the format string like: "&(time)% - &(level)% - &(name)% - &(fnname)% - &(msg)%"(default)
        :type fmt: str

        :param clock: The Clock which will provide time str. see `BaseClock`
        :type clock: BaseClock(can be inherit )

        :param direction: Set the direction where logger will output
        :type direction: int (`TO_FILE` or `TO_TERM`)

        :param file_name: available when you set `TO_FILE` to param `direction`. (default for `logging.log`)
        :type file_name: str
        :param max_file_size: available when you set `TO_FILE` to param `direction`. The unit is `byte`, (default for 4k)
        :type max_file_size: str
        """
        #TODO: 文件按日期存储, 最大份数的设置.
        self._direction = direction
        self.level = level
        self._clock = clock if clock else BaseClock()
        self._color = colorful
        self._file_name = file_name if direction == TO_FILE else ''
        self._max_size = max_file_size if direction == TO_FILE else 0

        if direction == TO_FILE:
            self._file = open(file_name, 'a+')

        # 特么的re居然不能全局匹配, 烦了, 只能自己来.
        # m = re.match(r"&\((.*?)\)%", fmt)
        # i = 0
        # while True:
        #     # 由于蛋疼的 ure 不能直接获取匹配结果的数量, 只能使用这种蠢蛋方法来循环.
        #     try:
        #         text = m.group(i)

        #     except:
        #         # 发生错误说明已经遍历完毕
        #         break

        #     # 使用指针代替文本来减少开销
        #     if text == "level":
        #         self._map.append(_level)
        #     elif text == "msg":
        #         self._map.append(_msg)
        #     elif text == "time":
        #         self._map.append(_time)
        #     elif text == "name":
        #         self._map.append(_name)
        #     elif text == "fnname":
        #         self._map.append(_fnname)

        #     i += 1

        # 添加映射
        self._map = bytearray()
        idx = 0
        while True:
            idx = fmt.find("&(", idx)
            if idx >= 0:  # 有找到
                a_idx = fmt.find(")%", idx+2)
                if a_idx < 0:
                    # 没找到后缀, 报错
                    raise Exception(
                        "Unable to parse text format successfully.")
                text = fmt[idx+2:a_idx]
                idx = a_idx+2  # 交换位置
                if text == "level":
                    self._map.append(_level)
                elif text == "msg":
                    self._map.append(_msg)
                elif text == "time":
                    self._map.append(_time)
                elif text == "name":
                    self._map.append(_name)
                elif text == "fnname":
                    self._map.append(_fnname)
            else:  # 没找到, 代表后面没有了
                break

        # 将 template 变成可被格式化的文本
        # 确保最后一个是换行字符

        self._template = fmt.replace("&(level)%", "%s")\
            .replace("&(msg)%", "%s")\
            .replace("&(time)%", "%s")\
            .replace("&(name)%", "%s")\
            .replace("&(fnname)%", "%s")\
            + "\n" if fmt[:-1] != '\n' else ''

    def _msg(self, *args, level: int, name: str, fnname: str):
        """
        Log a msg
        """
        
        if level < self.level:
            return
        # generate msg
        temp_map = []
        text = ''
        for item in self._map:
            if item == _msg:
                for text_ in args:  # 将元组内的文本添加到一起
                    text = "%s%s" % (text, text_)  # 防止用户输入其他类型(int, float)
                temp_map.append(text)
            elif item == _level:
                if self._direction == TO_TERM:  # only terminal can use color.
                    temp_map.append(level_name(level, self._color))
                else:
                    temp_map.append(level_name(level))
            elif item == _time:
                temp_map.append(self._clock())
            elif item == _name:
                temp_map.append(name)
            elif item == _fnname:
                temp_map.append(fnname if fnname else "unknownfn")

        if self._direction == TO_TERM:
            self._to_term(tuple(temp_map))
        else:
            self._to_file(tuple(temp_map))
        # TODO: 待验证: 转换为 tuple 和使用 fromat 谁更快

    def _to_term(self, map: tuple):
        print(self._template % map, end='')

    def _to_file(self, map: tuple):
        fp = self._file
        # 检查是否超出大小限制.
        prev_idx = fp.tell()  # 保存原始指针位置
        # 将读写指针跳到文件最大限制的地方,
        # 如果能读出数据, 说明文件大于指定的大小
        fp.seek(self._max_size)
        if fp.read(1):  # 能读到数据, 说明超出大小限制了
            fp = self._file = open(self._file_name, 'w')  # 使用 w 选项清空文件内容
        else:
            # 没有超出限制
            fp.seek(prev_idx)  # 指针回到原来的地方

        # 检查完毕, 开始写入数据
        fp.write(self._template % map)
        fp.flush()


class Logger():
    _handlers: list

    def __init__(self,
        name: str,
        handlers: list = None,
        ):

        self.name = name
        if not handlers:
            # 如果没有指定处理器, 自动创建一个默认的
            self._handlers = [Handler()]
        else:
            self._handlers = handlers

    @property
    def handlers(self):
        return self._handlers

    def _msg(self, *args, level: int, fn: str):

        for item in self._handlers:
            #try:
            item._msg(*args, level=level, fnname=fn, name=self.name)
            #except:
            #    print("Failed while trying to record")

    def debug(self, *args, fn: str = None):
        self._msg(*args, level=DEBUG, fn=fn)

    def info(self, *args, fn: str = None):
        self._msg(*args, level=INFO, fn=fn)

    def warn(self, *args, fn: str = None):
        self._msg(*args, level=WARN, fn=fn)

    def error(self, *args, fn: str = None):
        self._msg(*args, level=ERROR, fn=fn)

    def critical(self, *args, fn: str = None):
        self._msg(*args, level=CRITICAL, fn=fn)


__all__ = [
    Logger,
    Handler,
    BaseClock,


    DEBUG,
    INFO,
    WARN,
    ERROR,
    CRITICAL,

    TO_FILE,
    TO_TERM,

    __version__
]

近来搞电脑的远程启动搞上瘾了。使用网络启动(Wake on Lan),确实带来很多玩法。为了进一步降低电费,减少电脑非使用时段(例如晚上睡觉时段)待机而产生的功耗,采用ESP32C3(刷上MicroPython)来作为远程开机设备。即:

  • 普通x86电脑,在晚上或无需使用的时间正常关机,并开启网络启动功能。
  • ESP32C3开发板保持24小时开机并联网,可在需要时远程启动所需x86电脑。

于是找了下,在MicroPython上发送Wake on Lan的实现代码。参考了以下文章:

整理出可用代码,保存为文件wol.py,如下:

"""
Small module for use with the wake on lan protocol.

Reference:
- https://pypi.org/project/wakeonlan/
- https://www.cnblogs.com/Yogile/p/16488281.html
"""
import socket
import ubinascii


BROADCAST_IP = "255.255.255.255"
DEFAULT_PORT = 9
SO_BROADCAST = 20

def create_magic_packet(macaddress: str) -> bytes:
    """
    Create a magic packet.

    A magic packet is a packet that can be used with the for wake on lan
    protocol to wake up a computer. The packet is constructed from the
    mac address given as a parameter.

    Args:
        macaddress: the mac address that should be parsed into a magic packet.

    """
    if len(macaddress) == 17:
        sep = macaddress[2]
        macaddress = macaddress.replace(sep, "")
    elif len(macaddress) == 14:
        sep = macaddress[4]
        macaddress = macaddress.replace(sep, "")
    if len(macaddress) != 12:
        raise ValueError("Incorrect MAC address format")
    #return bytes.fromhex("F" * 12 + macaddress * 16)
    return ubinascii.unhexlify("F" * 12 + macaddress * 16)


def send_magic_packet(
    *macs: str,
    ip_address: str = BROADCAST_IP,
    port: int = DEFAULT_PORT,
    interface: str = None
) -> None:
    """
    Wake up computers having any of the given mac addresses.

    Wake on lan must be enabled on the host device.

    Args:
        macs: One or more macaddresses of machines to wake.

    Keyword Args:
        ip_address: the ip address of the host to send the magic packet to.
        port: the port of the host to send the magic packet to.
        interface: the ip address of the network adapter to route the magic packet through.

    """
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 1)
        
        if interface is not None:
            sock.bind((interface, 0))
        sock.setsockopt(socket.SOL_SOCKET, SO_BROADCAST, 1)
        sock.connect((ip_address, port))
        for mac in macs:
            packet = create_magic_packet(mac)
            sock.send(packet)
            print("send magic packet to MAC [%s]" % (mac))
    except:
        print('send magic packet failed')
        pass
    finally:
        sock.close()

使用示例:

import wol

mac = '12:ab:12:ab:12:ab' # 必填参数。要启动电脑的网卡MAC
broadcastIp = '192.168.0.255'  # 可选参数。广播的地址,一般填对应网段的255地址

wol.send_magic_packet(mac, ip_address=broadcastIp)