2023年

近来搞电脑的远程启动搞上瘾了。使用网络启动(Wake on Lan),确实带来很多玩法。为了进一步降低电费,减少电脑非使用时段(例如晚上睡觉时段)待机而产生的功耗,采用ESP32C3(刷上MicroPython)来作为远程开机设备。即:

  • 普通x86电脑,在晚上或无需使用的时间正常关机,并开启网络启动功能。
  • ESP32C3开发板保持24小时开机并联网,可在需要时远程启动所需x86电脑。

于是找了下,在MicroPython上发送Wake on Lan的实现代码。参考了以下文章:

整理出可用代码,保存为文件wol.py,如下:

"""
Small module for use with the wake on lan protocol.

Reference:
- https://pypi.org/project/wakeonlan/
- https://www.cnblogs.com/Yogile/p/16488281.html
"""
import socket
import ubinascii


BROADCAST_IP = "255.255.255.255"
DEFAULT_PORT = 9
SO_BROADCAST = 20

def create_magic_packet(macaddress: str) -> bytes:
    """
    Create a magic packet.

    A magic packet is a packet that can be used with the for wake on lan
    protocol to wake up a computer. The packet is constructed from the
    mac address given as a parameter.

    Args:
        macaddress: the mac address that should be parsed into a magic packet.

    """
    if len(macaddress) == 17:
        sep = macaddress[2]
        macaddress = macaddress.replace(sep, "")
    elif len(macaddress) == 14:
        sep = macaddress[4]
        macaddress = macaddress.replace(sep, "")
    if len(macaddress) != 12:
        raise ValueError("Incorrect MAC address format")
    #return bytes.fromhex("F" * 12 + macaddress * 16)
    return ubinascii.unhexlify("F" * 12 + macaddress * 16)


def send_magic_packet(
    *macs: str,
    ip_address: str = BROADCAST_IP,
    port: int = DEFAULT_PORT,
    interface: str = None
) -> None:
    """
    Wake up computers having any of the given mac addresses.

    Wake on lan must be enabled on the host device.

    Args:
        macs: One or more macaddresses of machines to wake.

    Keyword Args:
        ip_address: the ip address of the host to send the magic packet to.
        port: the port of the host to send the magic packet to.
        interface: the ip address of the network adapter to route the magic packet through.

    """
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 1)
        
        if interface is not None:
            sock.bind((interface, 0))
        sock.setsockopt(socket.SOL_SOCKET, SO_BROADCAST, 1)
        sock.connect((ip_address, port))
        for mac in macs:
            packet = create_magic_packet(mac)
            sock.send(packet)
            print("send magic packet to MAC [%s]" % (mac))
    except:
        print('send magic packet failed')
        pass
    finally:
        sock.close()

使用示例:

import wol

mac = '12:ab:12:ab:12:ab' # 必填参数。要启动电脑的网卡MAC
broadcastIp = '192.168.0.255'  # 可选参数。广播的地址,一般填对应网段的255地址

wol.send_magic_packet(mac, ip_address=broadcastIp)

最近做的MicroPython小项目,需要用到ping命令的功能,检查指定IP的电脑是否有开机并联网。

找了一圈,找到这个ping功能的源码。没有深究代码,就是能用。而且跟常用的ping命令差不多。

源码没改,直接搬过来,避免以后找不到:

# µPing (MicroPing) for MicroPython
# copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
# License: MIT
#
# Internet Checksum Algorithm
# Author: Olav Morken
# https://github.com/olavmrk/python-ping/blob/master/ping.py
# @data: bytes
#
# source code from: https://forum.pycom.io/topic/4930/ping-command/15
#
def checksum(data):
    if len(data) & 0x1: # Odd number of bytes
        data += b'\0'
    cs = 0
    for pos in range(0, len(data), 2):
        b1 = data[pos]
        b2 = data[pos + 1]
        cs += (b1 << 8) + b2
    while cs >= 0x10000:
        cs = (cs & 0xffff) + (cs >> 16)
    cs = ~cs & 0xffff
    return cs

def ping(host, count=4, timeout=5000, interval=10, quiet=False, size=64):
    import utime
    import uselect
    import uctypes
    import usocket
    import ustruct
    import uos

    # prepare packet
    assert size >= 16, "pkt size too small"
    pkt = b'Q'*size
    pkt_desc = {
        "type": uctypes.UINT8 | 0,
        "code": uctypes.UINT8 | 1,
        "checksum": uctypes.UINT16 | 2,
        "id": (uctypes.ARRAY | 4, 2 | uctypes.UINT8),
        "seq": uctypes.INT16 | 6,
        "timestamp": uctypes.UINT64 | 8,
    } # packet header descriptor
    h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN)
    h.type = 8 # ICMP_ECHO_REQUEST
    h.code = 0
    h.checksum = 0
    h.id[0:2] = uos.urandom(2)
    h.seq = 1

    # init socket
    sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
    sock.setblocking(0)
    sock.settimeout(timeout/1000)
    try:
        addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
    except IndexError:
        not quiet and print("Could not determine the address of", host)
        return None
    sock.connect((addr, 1))
    not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))

    seqs = list(range(1, count+1)) # [1,2,...,count]
    c = 1
    t = 0
    n_trans = 0
    n_recv = 0
    finish = False
    while t < timeout:
        if t==interval and c<=count:
            # send packet
            h.checksum = 0
            h.seq = c
            h.timestamp = utime.ticks_us()
            h.checksum = checksum(pkt)
            if sock.send(pkt) == size:
                n_trans += 1
                t = 0 # reset timeout
            else:
                seqs.remove(c)
            c += 1

        # recv packet
        while 1:
            socks, _, _ = uselect.select([sock], [], [], 0)
            if socks:
                resp = socks[0].recv(4096)
                resp_mv = memoryview(resp)
                h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN)
                # TODO: validate checksum (optional)
                seq = h2.seq
                if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
                    t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
                    ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
                    n_recv += 1
                    not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped))
                    seqs.remove(seq)
                    if len(seqs) == 0:
                        finish = True
                        break
            else:
                break

        if finish:
            break

        utime.sleep_ms(1)
        t += 1

    # close
    sock.close()
    ret = (n_trans, n_recv)
    not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv))
    return (n_trans, n_recv)

最近利用“合宙ESP32C3-Core”做了个MicroPython的小项目,用到了HTTP请求的功能。

找到了以下文章介绍urllib.urequest。据说代码少、性能好,能满足80%的需求。能支持HTTPS的请求。但使用下来,发现功能不完善,最大问题是不支持超时(timeout)设置。

折腾一番后,改为使用urequests,相关功能都比较完善,可以设置timeout、请求头等。

源码没改,保存为文件名urequests.py即可:

"""
code from: https://github.com/micropython/micropython-lib/blob/master/python-ecosys/urequests/urequests.py
version: a3d6d29b1b9de2bb147e0751c08a39608ebe06c8
"""
import usocket


class Response:
    def __init__(self, f):
        self.raw = f
        self.encoding = "utf-8"
        self._cached = None

    def close(self):
        if self.raw:
            self.raw.close()
            self.raw = None
        self._cached = None

    @property
    def content(self):
        if self._cached is None:
            try:
                self._cached = self.raw.read()
            finally:
                self.raw.close()
                self.raw = None
        return self._cached

    @property
    def text(self):
        return str(self.content, self.encoding)

    def json(self):
        import ujson

        return ujson.loads(self.content)


def request(
    method,
    url,
    data=None,
    json=None,
    headers={},
    stream=None,
    auth=None,
    timeout=None,
    parse_headers=True,
):
    redirect = None  # redirection url, None means no redirection
    chunked_data = data and getattr(data, "__iter__", None) and not getattr(data, "__len__", None)

    if auth is not None:
        import ubinascii

        username, password = auth
        formated = b"{}:{}".format(username, password)
        formated = str(ubinascii.b2a_base64(formated)[:-1], "ascii")
        headers["Authorization"] = "Basic {}".format(formated)

    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto == "http:":
        port = 80
    elif proto == "https:":
        import ussl

        port = 443
    else:
        raise ValueError("Unsupported protocol: " + proto)

    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)

    ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM)
    ai = ai[0]

    resp_d = None
    if parse_headers is not False:
        resp_d = {}

    s = usocket.socket(ai[0], usocket.SOCK_STREAM, ai[2])

    if timeout is not None:
        # Note: settimeout is not supported on all platforms, will raise
        # an AttributeError if not available.
        s.settimeout(timeout)

    try:
        s.connect(ai[-1])
        if proto == "https:":
            s = ussl.wrap_socket(s, server_hostname=host)
        s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
        if not "Host" in headers:
            s.write(b"Host: %s\r\n" % host)
        # Iterate over keys to avoid tuple alloc
        for k in headers:
            s.write(k)
            s.write(b": ")
            s.write(headers[k])
            s.write(b"\r\n")
        if json is not None:
            assert data is None
            import ujson

            data = ujson.dumps(json)
            s.write(b"Content-Type: application/json\r\n")
        if data:
            if chunked_data:
                s.write(b"Transfer-Encoding: chunked\r\n")
            else:
                s.write(b"Content-Length: %d\r\n" % len(data))
        s.write(b"Connection: close\r\n\r\n")
        if data:
            if chunked_data:
                for chunk in data:
                    s.write(b"%x\r\n" % len(chunk))
                    s.write(chunk)
                    s.write(b"\r\n")
                s.write("0\r\n\r\n")
            else:
                s.write(data)

        l = s.readline()
        # print(l)
        l = l.split(None, 2)
        if len(l) < 2:
            # Invalid response
            raise ValueError("HTTP error: BadStatusLine:\n%s" % l)
        status = int(l[1])
        reason = ""
        if len(l) > 2:
            reason = l[2].rstrip()
        while True:
            l = s.readline()
            if not l or l == b"\r\n":
                break
            # print(l)
            if l.startswith(b"Transfer-Encoding:"):
                if b"chunked" in l:
                    raise ValueError("Unsupported " + str(l, "utf-8"))
            elif l.startswith(b"Location:") and not 200 <= status <= 299:
                if status in [301, 302, 303, 307, 308]:
                    redirect = str(l[10:-2], "utf-8")
                else:
                    raise NotImplementedError("Redirect %d not yet supported" % status)
            if parse_headers is False:
                pass
            elif parse_headers is True:
                l = str(l, "utf-8")
                k, v = l.split(":", 1)
                resp_d[k] = v.strip()
            else:
                parse_headers(l, resp_d)
    except OSError:
        s.close()
        raise

    if redirect:
        s.close()
        if status in [301, 302, 303]:
            return request("GET", redirect, None, None, headers, stream)
        else:
            return request(method, redirect, data, json, headers, stream)
    else:
        resp = Response(s)
        resp.status_code = status
        resp.reason = reason
        if resp_d is not None:
            resp.headers = resp_d
        return resp


def head(url, **kw):
    return request("HEAD", url, **kw)


def get(url, **kw):
    return request("GET", url, **kw)


def post(url, **kw):
    return request("POST", url, **kw)


def put(url, **kw):
    return request("PUT", url, **kw)


def patch(url, **kw):
    return request("PATCH", url, **kw)


def delete(url, **kw):
    return request("DELETE", url, **kw)

最后,用的时候要注意:

  1. 响应结果需要手工关闭,避免再发起请求时会报错。
  2. POST的数据,需要进行URL编码。
  3. 除非POST的数据是JSON,否则最好加上请求头“Content-type”。

示例代码如下:

import urequests

r = urequests.post("https://abc.com/path", data='id=123&name=apple', timeout=10, headers={'User-Agent': 'Micropython(ESP32C3)', 'Content-type': 'application/x-www-form-urlencoded'})
print(r.status_code)  # 打印相应状态,整数,正常为200
print(r.content)  # 打印响应数据
r.close() #  关闭连接

使用Termux多年了。主要是可以安装N多Linux命令,一个应用即可实现N多功能。目前最新版是0.118.0,推荐从F-Droid安装。

这里记录一下相关的初始化动作。

1 相关资料

2 设置国内源

跟大多数Linux发行版类似,软件源设置为国内镜像,有效提高安装、更新软件的效率。官方收录了软件源的中国镜像网站:

Termux Packages - Mirrors Hosted in China

我选择了清华大学的镜像:

Termux | 镜像使用帮助 | 清华大学开源软件镜像站

跟Debian类似,直接修改apt的source.list文件即可。即修改$PREFIX/etc/apt/sources.list文件的内容如下:

# The termux repository mirror from TUNA:
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-main stable main
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-root root stable
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-x11 x11 main

保存后,执行apt update,更新一下本地的缓存。

3 安装Termux工具

执行以下命令,安装Termux的相关工具。如果默认安装了,则不用再装。

pkg install termux-tools

4 开启储存访问

执行termux-setup-storage即可。对应Android的sdcard目录为~/storage/shared

5 关闭震动

本人不喜欢以震动方式提示错误,习惯关闭它。编辑文件~/.termux/termux.properties,末尾添加:

bell-character = ignore

6 设置辅助键盘(the extra keys)

官方详细说明: Termux Wiki - Extra Keys Row

值得一提,可以设置按键上滑,让辅助键盘拥有更多的功能。参考配置,修改文件~/.termux/termux.properties,末尾添加:

extra-keys = [ \
   [{key: ESC, popup: {macro: "CTRL d", display: "exit"}}, \
   {key: '/', popup: '\'}, \
   {key: '-', popup: '_'}, \
   {key: '(', popup: '{'}, \
   {key: UP, popup: PGUP}, \
   {key: ')', popup: '}'}, \
   {key: QUOTE, popup: '*'}], \
   [{key: TAB, popup: ':'}, \
   {key: CTRL}, \
   {key: ALT}, \
   {key: LEFT, popup: HOME}, \
   {key: DOWN, popup: PGDN}, \
   {key: RIGHT, popup: END}, \
   {key: APOSTROPHE, popup: '`'}] \
]

7 开启root

  • 如果系统已root,安装tsu,可以切换root用户,或者使用sudo以root用户执行命令。
pkg install tsu
sudo 命令
  • 如果Android系统没有root,安装proot,即可执行需要root权限的命令。
pkg install proot
proot 命令

8 安装vim及配置

本人习惯vim,其它编辑程序,可以选择nanoemacs等。安装命令如下:

pkg install vim

重点配置,编辑文件~/.vimrc,末尾添加:

" 解决中文显示
set fileencodings=utf-8,gb2312,gb18030,gbk,ucs-bom,cp936,latin1
set enc=utf8
set fencs=utf8,gbk,gb2312,gb18030

" 显示行号
set nu

" 颜色主题
colorscheme desert

" 语法高亮
syntax on

最后使配置生效:

source .vimrc

9 Termux相关应用

  • Termux:API

    从Termux访问Android功能。可以实现脚本控制Android。
  • Termux:Boot

    允许程序在启动时运行的Termux扩展应用。
  • Termux:Float

    在浮动终端窗口中使用Termux。
  • Termux:Styling

    自定义Termux终端的样式。
  • Termux:Tasker

    从Tasker运行Termux脚本的Tasker插件。需要配合Tasker使用。
  • Termux:Widget

    从主屏幕启动Termux命令。实现通过点击桌面图标执行相关的sh脚本。

10 其它常用命令

  • OpenSSH

    • 说明:完整的SSH客户端。
    • 安装命令:pkg install openssh
  • ADB

    • 说明:可以使用ADB连接本机,无需root权限就能执行input命令等。
    • 安装命令:pkg install android-tools