为了在MicroPython上实现Web服务,找了几个框架,最终选择tinyweb

tinyweb主要是小巧、符合编程习惯、做好错误处理、实现了HTTP 1.0基本功能等。相关信息如下:

但是tinyweb采用logging做log记录,而logging依赖的库有点多,于是改为使用ulooger。修改后的源码如下:

"""
Tiny Web - pretty simple and powerful web server for tiny platforms like ESP8266 / ESP32
MIT license
(C) Konstantin Belyalov 2017-2018

- project: https://github.com/belyalov/tinyweb
- source code: https://github.com/belyalov/tinyweb/blob/master/tinyweb/server.py
- version: e92546054b905de46f42157282677f56764fb2f0

edit: replace logging with ulogger
"""
import ulogger
import uasyncio as asyncio
import uasyncio.core
import ujson as json
import gc
import uos as os
import sys
import uerrno as errno
import usocket as socket


#log = logging.getLogger('WEB')
class UtcClock(ulogger.BaseClock):
    def __call__(self) -> str:
        # UTC date time: (year, month, mday, hour, minute, second, weekday, yearday)
        dt = time.gmtime()
        return f'{dt[0]}{dt[1]:02d}{dt[2]:02d}_{dt[3]:02d}{dt[4]:02d}{dt[5]:02d}'

handler_to_term = ulogger.Handler(
    level=ulogger.INFO,
    colorful=True,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_TERM,
)

handler_to_file = ulogger.Handler(
    level=ulogger.INFO,
    fmt="&(time)%[&(level)%][&(name)%]: &(msg)%",
    clock=UtcClock(),
    direction=ulogger.TO_FILE,
    file_name="tinyweb.log",
    max_file_size=102400 # max for 100KB
)
logger = ulogger.Logger(
    name = __name__,
    handlers = (
        handler_to_term,
        handler_to_file
    )
)

type_gen = type((lambda: (yield))())

# uasyncio v3 is shipped with MicroPython 1.13, and contains some subtle
# but breaking changes. See also https://github.com/peterhinch/micropython-async/blob/master/v3/README.md
IS_UASYNCIO_V3 = hasattr(asyncio, "__version__") and asyncio.__version__ >= (3,)


def urldecode_plus(s):
    """Decode urlencoded string (including '+' char).

    Returns decoded string
    """
    s = s.replace('+', ' ')
    arr = s.split('%')
    res = arr[0]
    for it in arr[1:]:
        if len(it) >= 2:
            res += chr(int(it[:2], 16)) + it[2:]
        elif len(it) == 0:
            res += '%'
        else:
            res += it
    return res


def parse_query_string(s):
    """Parse urlencoded string into dict.

    Returns dict
    """
    res = {}
    pairs = s.split('&')
    for p in pairs:
        vals = [urldecode_plus(x) for x in p.split('=', 1)]
        if len(vals) == 1:
            res[vals[0]] = ''
        else:
            res[vals[0]] = vals[1]
    return res


class HTTPException(Exception):
    """HTTP protocol exceptions"""

    def __init__(self, code=400):
        self.code = code


class request:
    """HTTP Request class"""

    def __init__(self, _reader):
        self.reader = _reader
        self.headers = {}
        self.method = b''
        self.path = b''
        self.query_string = b''

    async def read_request_line(self):
        """Read and parse first line (AKA HTTP Request Line).
        Function is generator.

        Request line is something like:
        GET /something/script?param1=val1 HTTP/1.1
        """
        while True:
            rl = await self.reader.readline()
            # skip empty lines
            if rl == b'\r\n' or rl == b'\n':
                continue
            break
        rl_frags = rl.split()
        if len(rl_frags) != 3:
            raise HTTPException(400)
        self.method = rl_frags[0]
        url_frags = rl_frags[1].split(b'?', 1)
        self.path = url_frags[0]
        if len(url_frags) > 1:
            self.query_string = url_frags[1]

    async def read_headers(self, save_headers=[]):
        """Read and parse HTTP headers until \r\n\r\n:
        Optional argument 'save_headers' controls which headers to save.
            This is done mostly to deal with memory constrains.

        Function is generator.

        HTTP headers could be like:
        Host: google.com
        Content-Type: blah
        \r\n
        """
        while True:
            gc.collect()
            line = await self.reader.readline()
            if line == b'\r\n':
                break
            frags = line.split(b':', 1)
            if len(frags) != 2:
                raise HTTPException(400)
            if frags[0] in save_headers:
                self.headers[frags[0]] = frags[1].strip()

    async def read_parse_form_data(self):
        """Read HTTP form data (payload), if any.
        Function is generator.

        Returns:
            - dict of key / value pairs
            - None in case of no form data present
        """
        # TODO: Probably there is better solution how to handle
        # request body, at least for simple urlencoded forms - by processing
        # chunks instead of accumulating payload.
        gc.collect()
        if b'Content-Length' not in self.headers:
            return {}
        # Parse payload depending on content type
        if b'Content-Type' not in self.headers:
            # Unknown content type, return unparsed, raw data
            return {}
        size = int(self.headers[b'Content-Length'])
        if size > self.params['max_body_size'] or size < 0:
            raise HTTPException(413)
        data = await self.reader.readexactly(size)
        # Use only string before ';', e.g:
        # application/x-www-form-urlencoded; charset=UTF-8
        ct = self.headers[b'Content-Type'].split(b';', 1)[0]
        try:
            if ct == b'application/json':
                return json.loads(data)
            elif ct == b'application/x-www-form-urlencoded':
                return parse_query_string(data.decode())
        except ValueError:
            # Re-generate exception for malformed form data
            raise HTTPException(400)


class response:
    """HTTP Response class"""

    def __init__(self, _writer):
        self.writer = _writer
        self.send = _writer.awrite
        self.code = 200
        self.version = '1.0'
        self.headers = {}

    async def _send_headers(self):
        """Compose and send:
        - HTTP request line
        - HTTP headers following by \r\n.
        This function is generator.

        P.S.
        Because of usually we have only a few HTTP headers (2-5) it doesn't make sense
        to send them separately - sometimes it could increase latency.
        So combining headers together and send them as single "packet".
        """
        # Request line
        hdrs = 'HTTP/{} {} MSG\r\n'.format(self.version, self.code)
        # Headers
        for k, v in self.headers.items():
            hdrs += '{}: {}\r\n'.format(k, v)
        hdrs += '\r\n'
        # Collect garbage after small mallocs
        gc.collect()
        await self.send(hdrs)

    async def error(self, code, msg=None):
        """Generate HTTP error response
        This function is generator.

        Arguments:
            code - HTTP response code

        Example:
            # Not enough permissions. Send HTTP 403 - Forbidden
            await resp.error(403)
        """
        self.code = code
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    async def redirect(self, location, msg=None):
        """Generate HTTP redirect response to 'location'.
        Basically it will generate HTTP 302 with 'Location' header

        Arguments:
            location - URL to redirect to

        Example:
            # Redirect to /something
            await resp.redirect('/something')
        """
        self.code = 302
        self.add_header('Location', location)
        if msg:
            self.add_header('Content-Length', len(msg))
        await self._send_headers()
        if msg:
            await self.send(msg)

    def add_header(self, key, value):
        """Add HTTP response header

        Arguments:
            key - header name
            value - header value

        Example:
            resp.add_header('Content-Encoding', 'gzip')
        """
        self.headers[key] = value

    def add_access_control_headers(self):
        """Add Access Control related HTTP response headers.
        This is required when working with RestApi (JSON requests)
        """
        self.add_header('Access-Control-Allow-Origin', self.params['allowed_access_control_origins'])
        self.add_header('Access-Control-Allow-Methods', self.params['allowed_access_control_methods'])
        self.add_header('Access-Control-Allow-Headers', self.params['allowed_access_control_headers'])

    async def start_html(self):
        """Start response with HTML content type.
        This function is generator.

        Example:
            await resp.start_html()
            await resp.send('<html><h1>Hello, world!</h1></html>')
        """
        self.add_header('Content-Type', 'text/html')
        await self._send_headers()

    async def send_file(self, filename, content_type=None, content_encoding=None, max_age=2592000, buf_size=128):
        """Send local file as HTTP response.
        This function is generator.

        Arguments:
            filename - Name of file which exists in local filesystem
        Keyword arguments:
            content_type - Filetype. By default - None means auto-detect.
            max_age - Cache control. How long browser can keep this file on disk.
                      By default - 30 days
                      Set to 0 - to disable caching.

        Example 1: Default use case:
            await resp.send_file('images/cat.jpg')

        Example 2: Disable caching:
            await resp.send_file('static/index.html', max_age=0)

        Example 3: Override content type:
            await resp.send_file('static/file.bin', content_type='application/octet-stream')
        """
        try:
            # Get file size
            stat = os.stat(filename)
            slen = str(stat[6])
            self.add_header('Content-Length', slen)
            # Find content type
            if content_type:
                self.add_header('Content-Type', content_type)
            # Add content-encoding, if any
            if content_encoding:
                self.add_header('Content-Encoding', content_encoding)
            # Since this is static content is totally make sense
            # to tell browser to cache it, however, you can always
            # override it by setting max_age to zero
            self.add_header('Cache-Control', 'max-age={}, public'.format(max_age))
            with open(filename) as f:
                await self._send_headers()
                gc.collect()
                buf = bytearray(min(stat[6], buf_size))
                while True:
                    size = f.readinto(buf)
                    if size == 0:
                        break
                    await self.send(buf, sz=size)
        except OSError as e:
            # special handling for ENOENT / EACCESS
            if e.args[0] in (errno.ENOENT, errno.EACCES):
                raise HTTPException(404)
            else:
                raise


async def restful_resource_handler(req, resp, param=None):
    """Handler for RESTful API endpoins"""
    # Gather data - query string, JSON in request body...
    data = await req.read_parse_form_data()
    # Add parameters from URI query string as well
    # This one is actually for simply development of RestAPI
    if req.query_string != b'':
        data.update(parse_query_string(req.query_string.decode()))
    # Call actual handler
    _handler, _kwargs = req.params['_callmap'][req.method]
    # Collect garbage before / after handler execution
    gc.collect()
    if param:
        res = _handler(data, param, **_kwargs)
    else:
        res = _handler(data, **_kwargs)
    gc.collect()
    # Handler result could be:
    # 1. generator - in case of large payload
    # 2. string - just string :)
    # 2. dict - meaning client what tinyweb to convert it to JSON
    # it can also return error code together with str / dict
    # res = {'blah': 'blah'}
    # res = {'blah': 'blah'}, 201
    if isinstance(res, type_gen):
        # Result is generator, use chunked response
        # NOTICE: HTTP 1.0 by itself does not support chunked responses, so, making workaround:
        # Response is HTTP/1.1 with Connection: close
        resp.version = '1.1'
        resp.add_header('Connection', 'close')
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Transfer-Encoding', 'chunked')
        resp.add_access_control_headers()
        await resp._send_headers()
        # Drain generator
        for chunk in res:
            chunk_len = len(chunk.encode('utf-8'))
            await resp.send('{:x}\r\n'.format(chunk_len))
            await resp.send(chunk)
            await resp.send('\r\n')
            gc.collect()
        await resp.send('0\r\n\r\n')
    else:
        if type(res) == tuple:
            resp.code = res[1]
            res = res[0]
        elif res is None:
            raise Exception('Result expected')
        # Send response
        if type(res) is dict:
            res_str = json.dumps(res)
        else:
            res_str = res
        resp.add_header('Content-Type', 'application/json')
        resp.add_header('Content-Length', str(len(res_str)))
        resp.add_access_control_headers()
        await resp._send_headers()
        await resp.send(res_str)


class webserver:

    def __init__(self, request_timeout=3, max_concurrency=3, backlog=16, debug=False):
        """Tiny Web Server class.
        Keyword arguments:
            request_timeout - Time for client to send complete request
                              after that connection will be closed.
            max_concurrency - How many connections can be processed concurrently.
                              It is very important to limit this number because of
                              memory constrain.
                              Default value depends on platform
            backlog         - Parameter to socket.listen() function. Defines size of
                              pending to be accepted connections queue.
                              Must be greater than max_concurrency
            debug           - Whether send exception info (text + backtrace)
                              to client together with HTTP 500 or not.
        """
        self.loop = asyncio.get_event_loop()
        self.request_timeout = request_timeout
        self.max_concurrency = max_concurrency
        self.backlog = backlog
        self.debug = debug
        self.explicit_url_map = {}
        self.catch_all_handler = None
        self.parameterized_url_map = {}
        # Currently opened connections
        self.conns = {}
        # Statistics
        self.processed_connections = 0

    def _find_url_handler(self, req):
        """Helper to find URL handler.
        Returns tuple of (function, opts, param) or (None, None) if not found.
        """
        # First try - lookup in explicit (non parameterized URLs)
        if req.path in self.explicit_url_map:
            return self.explicit_url_map[req.path]
        # Second try - strip last path segment and lookup in another map
        idx = req.path.rfind(b'/') + 1
        path2 = req.path[:idx]
        if len(path2) > 0 and path2 in self.parameterized_url_map:
            # Save parameter into request
            req._param = req.path[idx:].decode()
            return self.parameterized_url_map[path2]

        if self.catch_all_handler:
            return self.catch_all_handler

        # No handler found
        return (None, None)

    async def _handle_request(self, req, resp):
        await req.read_request_line()
        # Find URL handler
        req.handler, req.params = self._find_url_handler(req)
        if not req.handler:
            # No URL handler found - read response and issue HTTP 404
            await req.read_headers()
            raise HTTPException(404)
        # req.params = params
        # req.handler = han
        resp.params = req.params
        # Read / parse headers
        await req.read_headers(req.params['save_headers'])

    async def _handler(self, reader, writer):
        """Handler for TCP connection with
        HTTP/1.0 protocol implementation
        """
        gc.collect()

        try:
            req = request(reader)
            resp = response(writer)
            # Read HTTP Request with timeout
            await asyncio.wait_for(self._handle_request(req, resp),
                                   self.request_timeout)

            # OPTIONS method is handled automatically
            if req.method == b'OPTIONS':
                resp.add_access_control_headers()
                # Since we support only HTTP 1.0 - it is important
                # to tell browser that there is no payload expected
                # otherwise some webkit based browsers (Chrome)
                # treat this behavior as an error
                resp.add_header('Content-Length', '0')
                await resp._send_headers()
                return

            # Ensure that HTTP method is allowed for this path
            if req.method not in req.params['methods']:
                raise HTTPException(405)

            # Handle URL
            gc.collect()
            if hasattr(req, '_param'):
                await req.handler(req, resp, req._param)
            else:
                await req.handler(req, resp)
            # Done here
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass
        except OSError as e:
            # Do not send response for connection related errors - too late :)
            # P.S. code 32 - is possible BROKEN PIPE error (TODO: is it true?)
            if e.args[0] not in (errno.ECONNABORTED, errno.ECONNRESET, 32):
                try:
                    await resp.error(500)
                except Exception as e:
                    #log.exc(e, "")
                    logger.error(f'OSError: {e}')
        except HTTPException as e:
            try:
                await resp.error(e.code)
            except Exception as e:
                #log.exc(e)
                logger.error(f'HTTPException: {e}')
        except Exception as e:
            # Unhandled expection in user's method
            #log.error(req.path.decode())
            #log.exc(e, "")
            logger.error(f'Unhandled expection. URL: {req.path.decode()}, exception: {e}')
            try:
                await resp.error(500)
                # Send exception info if desired
                if self.debug:
                    sys.print_exception(e, resp.writer.s)
            except Exception as e:
                pass
        finally:
            await writer.aclose()
            # Max concurrency support -
            # if queue is full schedule resume of TCP server task
            if len(self.conns) == self.max_concurrency:
                self.loop.create_task(self._server_coro)
            # Delete connection, using socket as a key
            del self.conns[id(writer.s)]

    def add_route(self, url, f, **kwargs):
        """Add URL to function mapping.

        Arguments:
            url - url to map function with
            f - function to map

        Keyword arguments:
            methods - list of allowed methods. Defaults to ['GET', 'POST']
            save_headers - contains list of HTTP headers to be saved. Case sensitive. Default - empty.
            max_body_size - Max HTTP body size (e.g. POST form data). Defaults to 1024
            allowed_access_control_headers - Default value for the same name header. Defaults to *
            allowed_access_control_origins - Default value for the same name header. Defaults to *
        """
        if url == '' or '?' in url:
            raise ValueError('Invalid URL')
        # Initial params for route
        params = {'methods': ['GET'],
                  'save_headers': [],
                  'max_body_size': 1024,
                  'allowed_access_control_headers': '*',
                  'allowed_access_control_origins': '*',
                  }
        params.update(kwargs)
        params['allowed_access_control_methods'] = ', '.join(params['methods'])
        # Convert methods/headers to bytestring
        params['methods'] = [x.encode() for x in params['methods']]
        params['save_headers'] = [x.encode() for x in params['save_headers']]
        # If URL has a parameter
        if url.endswith('>'):
            idx = url.rfind('<')
            path = url[:idx]
            idx += 1
            param = url[idx:-1]
            if path.encode() in self.parameterized_url_map:
                raise ValueError('URL exists')
            params['_param_name'] = param
            self.parameterized_url_map[path.encode()] = (f, params)

        if url.encode() in self.explicit_url_map:
            raise ValueError('URL exists')
        self.explicit_url_map[url.encode()] = (f, params)

    def add_resource(self, cls, url, **kwargs):
        """Map resource (RestAPI) to URL

        Arguments:
            cls - Resource class to map to
            url - url to map to class
            kwargs - User defined key args to pass to the handler.

        Example:
            class myres():
                def get(self, data):
                    return {'hello': 'world'}


            app.add_resource(myres, '/api/myres')
        """
        methods = []
        callmap = {}
        # Create instance of resource handler, if passed as just class (not instance)
        try:
            obj = cls()
        except TypeError:
            obj = cls
        # Get all implemented HTTP methods and make callmap
        for m in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            fn = m.lower()
            if hasattr(obj, fn):
                methods.append(m)
                callmap[m.encode()] = (getattr(obj, fn), kwargs)
        self.add_route(url, restful_resource_handler,
                       methods=methods,
                       save_headers=['Content-Length', 'Content-Type'],
                       _callmap=callmap)

    def catchall(self):
        """Decorator for catchall()

        Example:
            @app.catchall()
            def catchall_handler(req, resp):
                response.code = 404
                await response.start_html()
                await response.send('<html><body><h1>My custom 404!</h1></html>\n')
        """
        params = {'methods': [b'GET'], 'save_headers': [], 'max_body_size': 1024, 'allowed_access_control_headers': '*', 'allowed_access_control_origins': '*'}

        def _route(f):
            self.catch_all_handler = (f, params)
            return f
        return _route

    def route(self, url, **kwargs):
        """Decorator for add_route()

        Example:
            @app.route('/')
            def index(req, resp):
                await resp.start_html()
                await resp.send('<html><body><h1>Hello, world!</h1></html>\n')
        """
        def _route(f):
            self.add_route(url, f, **kwargs)
            return f
        return _route

    def resource(self, url, method='GET', **kwargs):
        """Decorator for add_resource() method

        Examples:
            @app.resource('/users')
            def users(data):
                return {'a': 1}

            @app.resource('/messages/<topic_id>')
            async def index(data, topic_id):
                yield '{'
                yield '"topic_id": "{}",'.format(topic_id)
                yield '"message": "test",'
                yield '}'
        """
        def _resource(f):
            self.add_route(url, restful_resource_handler,
                           methods=[method],
                           save_headers=['Content-Length', 'Content-Type'],
                           _callmap={method.encode(): (f, kwargs)})
            return f
        return _resource

    async def _tcp_server(self, host, port, backlog):
        """TCP Server implementation.
        Opens socket for accepting connection and
        creates task for every new accepted connection
        """
        addr = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][-1]
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(addr)
        sock.listen(backlog)
        try:
            while True:
                if IS_UASYNCIO_V3:
                    yield uasyncio.core._io_queue.queue_read(sock)
                else:
                    yield asyncio.IORead(sock)
                csock, caddr = sock.accept()
                csock.setblocking(False)
                # Start handler / keep it in the map - to be able to
                # shutdown gracefully - by close all connections
                self.processed_connections += 1
                hid = id(csock)
                handler = self._handler(asyncio.StreamReader(csock),
                                        asyncio.StreamWriter(csock, {}))
                self.conns[hid] = handler
                self.loop.create_task(handler)
                # In case of max concurrency reached - temporary pause server:
                # 1. backlog must be greater than max_concurrency, otherwise
                #    client will got "Connection Reset"
                # 2. Server task will be resumed whenever one active connection finished
                if len(self.conns) == self.max_concurrency:
                    # Pause
                    yield False
        except asyncio.CancelledError:
            return
        finally:
            sock.close()

    def run(self, host="127.0.0.1", port=8081, loop_forever=True):
        """Run Web Server. By default it runs forever.

        Keyword arguments:
            host - host to listen on. By default - localhost (127.0.0.1)
            port - port to listen on. By default - 8081
            loop_forever - run loo.loop_forever(), otherwise caller must run it by itself.
        """
        self._server_coro = self._tcp_server(host, port, self.backlog)
        self.loop.create_task(self._server_coro)
        if loop_forever:
            self.loop.run_forever()

    def shutdown(self):
        """Gracefully shutdown Web Server"""
        asyncio.cancel(self._server_coro)
        for hid, coro in self.conns.items():
            asyncio.cancel(coro)

使用时,其实也有不少东西需要注意。

  1. 处理POST请求的方法,其注解需要设置save_headers = ['Content-Length','Content-Type'],才能获取到请求的参数。例如:
app = tinyweb.webserver()

@app.route('/save', methods = ['POST'], save_headers = ['Content-Length','Content-Type'])
async def save(request, response):
    params = await request.read_parse_form_data()
    await response.send(params)
  1. 一个方法处理多个请求链接,加多行注解即可。例如:
app = tinyweb.webserver()

@app.route('/')
@app.route('/index')
async def index(request, response):
    await response.send("Hello world!")
  1. 如果需要浏览器或对方客户端不要缓存,设置响应头即可。相关代码:
    response.add_header('Cache-Control', 'no-cache')  # for HTTP 1.1
    response.add_header('Pragma', 'no-cache')  # for HTTP 1.0
    response.add_header('Expires', '0')  # for proxies
  1. Web服务的max_concurrency不要设置太大,以降低内存占用。
  2. 更多的示例,可参考官方文档。

采用MicroPython编写的定时任务,特别是在实际环境测试,一般不能看到错误信息。因此,需要做log记录。

找到一个实现Logger功能的项目ulogger,其代码没有依赖其它库,使用方式也符合一般的Logger用法。相关信息如下:

相关代码复制了一份过来,取消了TextIOWrapper的引用:

"""
- project: micropython-ulogger
  https://github.com/whales-chen/micropython-ulogger
- code from
  https://github.com/whales-chen/micropython-ulogger/blob/main/ulogger/__init__.py
- version
  ec4f6b3842c677fbb457f6bc6d88afd8a82eeed6
"""
try:    import time
except: import utime as time

try:    from micropython import const
except: const = lambda x:x # for debug

#from io import TextIOWrapper
import io

__version__ = "v1.2"

DEBUG:    int = const(10)
INFO:     int = const(20)
WARN:     int = const(30)
ERROR:    int = const(40)
CRITICAL: int = const(50)

TO_FILE = const(100)
TO_TERM = const(200)

# fmt map 的可选参数
_level  = const(0)
_msg    = const(1)
_time   = const(2)
_name   = const(3)
_fnname = const(4)


def level_name(level: int, color: bool = False) -> str:
    if not color:
        if level == INFO:
            return "INFO"
        elif level == DEBUG:
            return "DEBUG"
        elif level == WARN:
            return "WARN"
        elif level == ERROR:
            return "ERROR"
        elif level == CRITICAL:
            return "CRITICAL"
    else:
        if level == INFO:
            return "\033[97mINFO\033[0m"
        elif level == DEBUG:
            return "\033[37mDEBUG\033[0m"
        elif level == WARN:
            return "\033[93mWARN\033[0m"
        elif level == ERROR:
            return "\033[35mERROR\033[0m"
        elif level == CRITICAL:
            return "\033[91mCRITICAL\033[0m"


class BaseClock ():
    """
    This is a BaseClock for the logger.
    Please inherit this class by your custom.
    """

    def __call__(self) -> str:
        """
        Acquire the time of now, please inherit this method.
        We will use the return value of this function as the time format of the log,
        such as `2021 - 6 - 13` or `12:45:23` and so on.

        :return: the time string.
        """
        return '%d' % time.time()


class Handler():
    """The Handler for logger.
    """
    _template: str
    _map: bytes
    level: int
    _direction: int
    _clock: BaseClock
    _color: bool
    _file_name: str
    _max_size: int
    #_file = TextIOWrapper
    _file = None

    def __init__(self,
        level: int = INFO,
        colorful: bool = True,
        fmt: str = "&(time)% - &(level)% - &(name)% - &(msg)%",
        clock: BaseClock = None,
        direction: int = TO_TERM,
        file_name: str = "logging.log",
        max_file_size: int = 4096
        ):
        """
        Create a Handler that you can add to the logger later

        ## Options available for fmt.
        - &(level)%  : the log level
        - &(msg)%    : the log message
        - &(time)%   : the time acquire from clock, see `BaseClock`
        - &(name)%   : the logger's name
        - &(fnname)%  : the function name which you will pass on.
        - more optional is developing.

        ## Options available for level.
        - DEBUG
        - INFO
        - WARN
        - ERROR
        - CRITICAL

        ## Options available for direction.
        - TO_FILE : output to a file
        - TO_TERM : output to terminal

        :param level: Set a minimum level you want to be log
        :type level: int(see the consts in this module)

        :param colorful: Whether to use color display information to terminal(Not applicable to files)
        :type colorful: bool

        :param fmt: the format string like: "&(time)% - &(level)% - &(name)% - &(fnname)% - &(msg)%"(default)
        :type fmt: str

        :param clock: The Clock which will provide time str. see `BaseClock`
        :type clock: BaseClock(can be inherit )

        :param direction: Set the direction where logger will output
        :type direction: int (`TO_FILE` or `TO_TERM`)

        :param file_name: available when you set `TO_FILE` to param `direction`. (default for `logging.log`)
        :type file_name: str
        :param max_file_size: available when you set `TO_FILE` to param `direction`. The unit is `byte`, (default for 4k)
        :type max_file_size: str
        """
        #TODO: 文件按日期存储, 最大份数的设置.
        self._direction = direction
        self.level = level
        self._clock = clock if clock else BaseClock()
        self._color = colorful
        self._file_name = file_name if direction == TO_FILE else ''
        self._max_size = max_file_size if direction == TO_FILE else 0

        if direction == TO_FILE:
            self._file = open(file_name, 'a+')

        # 特么的re居然不能全局匹配, 烦了, 只能自己来.
        # m = re.match(r"&\((.*?)\)%", fmt)
        # i = 0
        # while True:
        #     # 由于蛋疼的 ure 不能直接获取匹配结果的数量, 只能使用这种蠢蛋方法来循环.
        #     try:
        #         text = m.group(i)

        #     except:
        #         # 发生错误说明已经遍历完毕
        #         break

        #     # 使用指针代替文本来减少开销
        #     if text == "level":
        #         self._map.append(_level)
        #     elif text == "msg":
        #         self._map.append(_msg)
        #     elif text == "time":
        #         self._map.append(_time)
        #     elif text == "name":
        #         self._map.append(_name)
        #     elif text == "fnname":
        #         self._map.append(_fnname)

        #     i += 1

        # 添加映射
        self._map = bytearray()
        idx = 0
        while True:
            idx = fmt.find("&(", idx)
            if idx >= 0:  # 有找到
                a_idx = fmt.find(")%", idx+2)
                if a_idx < 0:
                    # 没找到后缀, 报错
                    raise Exception(
                        "Unable to parse text format successfully.")
                text = fmt[idx+2:a_idx]
                idx = a_idx+2  # 交换位置
                if text == "level":
                    self._map.append(_level)
                elif text == "msg":
                    self._map.append(_msg)
                elif text == "time":
                    self._map.append(_time)
                elif text == "name":
                    self._map.append(_name)
                elif text == "fnname":
                    self._map.append(_fnname)
            else:  # 没找到, 代表后面没有了
                break

        # 将 template 变成可被格式化的文本
        # 确保最后一个是换行字符

        self._template = fmt.replace("&(level)%", "%s")\
            .replace("&(msg)%", "%s")\
            .replace("&(time)%", "%s")\
            .replace("&(name)%", "%s")\
            .replace("&(fnname)%", "%s")\
            + "\n" if fmt[:-1] != '\n' else ''

    def _msg(self, *args, level: int, name: str, fnname: str):
        """
        Log a msg
        """
        
        if level < self.level:
            return
        # generate msg
        temp_map = []
        text = ''
        for item in self._map:
            if item == _msg:
                for text_ in args:  # 将元组内的文本添加到一起
                    text = "%s%s" % (text, text_)  # 防止用户输入其他类型(int, float)
                temp_map.append(text)
            elif item == _level:
                if self._direction == TO_TERM:  # only terminal can use color.
                    temp_map.append(level_name(level, self._color))
                else:
                    temp_map.append(level_name(level))
            elif item == _time:
                temp_map.append(self._clock())
            elif item == _name:
                temp_map.append(name)
            elif item == _fnname:
                temp_map.append(fnname if fnname else "unknownfn")

        if self._direction == TO_TERM:
            self._to_term(tuple(temp_map))
        else:
            self._to_file(tuple(temp_map))
        # TODO: 待验证: 转换为 tuple 和使用 fromat 谁更快

    def _to_term(self, map: tuple):
        print(self._template % map, end='')

    def _to_file(self, map: tuple):
        fp = self._file
        # 检查是否超出大小限制.
        prev_idx = fp.tell()  # 保存原始指针位置
        # 将读写指针跳到文件最大限制的地方,
        # 如果能读出数据, 说明文件大于指定的大小
        fp.seek(self._max_size)
        if fp.read(1):  # 能读到数据, 说明超出大小限制了
            fp = self._file = open(self._file_name, 'w')  # 使用 w 选项清空文件内容
        else:
            # 没有超出限制
            fp.seek(prev_idx)  # 指针回到原来的地方

        # 检查完毕, 开始写入数据
        fp.write(self._template % map)
        fp.flush()


class Logger():
    _handlers: list

    def __init__(self,
        name: str,
        handlers: list = None,
        ):

        self.name = name
        if not handlers:
            # 如果没有指定处理器, 自动创建一个默认的
            self._handlers = [Handler()]
        else:
            self._handlers = handlers

    @property
    def handlers(self):
        return self._handlers

    def _msg(self, *args, level: int, fn: str):

        for item in self._handlers:
            #try:
            item._msg(*args, level=level, fnname=fn, name=self.name)
            #except:
            #    print("Failed while trying to record")

    def debug(self, *args, fn: str = None):
        self._msg(*args, level=DEBUG, fn=fn)

    def info(self, *args, fn: str = None):
        self._msg(*args, level=INFO, fn=fn)

    def warn(self, *args, fn: str = None):
        self._msg(*args, level=WARN, fn=fn)

    def error(self, *args, fn: str = None):
        self._msg(*args, level=ERROR, fn=fn)

    def critical(self, *args, fn: str = None):
        self._msg(*args, level=CRITICAL, fn=fn)


__all__ = [
    Logger,
    Handler,
    BaseClock,


    DEBUG,
    INFO,
    WARN,
    ERROR,
    CRITICAL,

    TO_FILE,
    TO_TERM,

    __version__
]

近来搞电脑的远程启动搞上瘾了。使用网络启动(Wake on Lan),确实带来很多玩法。为了进一步降低电费,减少电脑非使用时段(例如晚上睡觉时段)待机而产生的功耗,采用ESP32C3(刷上MicroPython)来作为远程开机设备。即:

  • 普通x86电脑,在晚上或无需使用的时间正常关机,并开启网络启动功能。
  • ESP32C3开发板保持24小时开机并联网,可在需要时远程启动所需x86电脑。

于是找了下,在MicroPython上发送Wake on Lan的实现代码。参考了以下文章:

整理出可用代码,保存为文件wol.py,如下:

"""
Small module for use with the wake on lan protocol.

Reference:
- https://pypi.org/project/wakeonlan/
- https://www.cnblogs.com/Yogile/p/16488281.html
"""
import socket
import ubinascii


BROADCAST_IP = "255.255.255.255"
DEFAULT_PORT = 9
SO_BROADCAST = 20

def create_magic_packet(macaddress: str) -> bytes:
    """
    Create a magic packet.

    A magic packet is a packet that can be used with the for wake on lan
    protocol to wake up a computer. The packet is constructed from the
    mac address given as a parameter.

    Args:
        macaddress: the mac address that should be parsed into a magic packet.

    """
    if len(macaddress) == 17:
        sep = macaddress[2]
        macaddress = macaddress.replace(sep, "")
    elif len(macaddress) == 14:
        sep = macaddress[4]
        macaddress = macaddress.replace(sep, "")
    if len(macaddress) != 12:
        raise ValueError("Incorrect MAC address format")
    #return bytes.fromhex("F" * 12 + macaddress * 16)
    return ubinascii.unhexlify("F" * 12 + macaddress * 16)


def send_magic_packet(
    *macs: str,
    ip_address: str = BROADCAST_IP,
    port: int = DEFAULT_PORT,
    interface: str = None
) -> None:
    """
    Wake up computers having any of the given mac addresses.

    Wake on lan must be enabled on the host device.

    Args:
        macs: One or more macaddresses of machines to wake.

    Keyword Args:
        ip_address: the ip address of the host to send the magic packet to.
        port: the port of the host to send the magic packet to.
        interface: the ip address of the network adapter to route the magic packet through.

    """
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 1)
        
        if interface is not None:
            sock.bind((interface, 0))
        sock.setsockopt(socket.SOL_SOCKET, SO_BROADCAST, 1)
        sock.connect((ip_address, port))
        for mac in macs:
            packet = create_magic_packet(mac)
            sock.send(packet)
            print("send magic packet to MAC [%s]" % (mac))
    except:
        print('send magic packet failed')
        pass
    finally:
        sock.close()

使用示例:

import wol

mac = '12:ab:12:ab:12:ab' # 必填参数。要启动电脑的网卡MAC
broadcastIp = '192.168.0.255'  # 可选参数。广播的地址,一般填对应网段的255地址

wol.send_magic_packet(mac, ip_address=broadcastIp)

最近做的MicroPython小项目,需要用到ping命令的功能,检查指定IP的电脑是否有开机并联网。

找了一圈,找到这个ping功能的源码。没有深究代码,就是能用。而且跟常用的ping命令差不多。

源码没改,直接搬过来,避免以后找不到:

# µPing (MicroPing) for MicroPython
# copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
# License: MIT
#
# Internet Checksum Algorithm
# Author: Olav Morken
# https://github.com/olavmrk/python-ping/blob/master/ping.py
# @data: bytes
#
# source code from: https://forum.pycom.io/topic/4930/ping-command/15
#
def checksum(data):
    if len(data) & 0x1: # Odd number of bytes
        data += b'\0'
    cs = 0
    for pos in range(0, len(data), 2):
        b1 = data[pos]
        b2 = data[pos + 1]
        cs += (b1 << 8) + b2
    while cs >= 0x10000:
        cs = (cs & 0xffff) + (cs >> 16)
    cs = ~cs & 0xffff
    return cs

def ping(host, count=4, timeout=5000, interval=10, quiet=False, size=64):
    import utime
    import uselect
    import uctypes
    import usocket
    import ustruct
    import uos

    # prepare packet
    assert size >= 16, "pkt size too small"
    pkt = b'Q'*size
    pkt_desc = {
        "type": uctypes.UINT8 | 0,
        "code": uctypes.UINT8 | 1,
        "checksum": uctypes.UINT16 | 2,
        "id": (uctypes.ARRAY | 4, 2 | uctypes.UINT8),
        "seq": uctypes.INT16 | 6,
        "timestamp": uctypes.UINT64 | 8,
    } # packet header descriptor
    h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN)
    h.type = 8 # ICMP_ECHO_REQUEST
    h.code = 0
    h.checksum = 0
    h.id[0:2] = uos.urandom(2)
    h.seq = 1

    # init socket
    sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
    sock.setblocking(0)
    sock.settimeout(timeout/1000)
    try:
        addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
    except IndexError:
        not quiet and print("Could not determine the address of", host)
        return None
    sock.connect((addr, 1))
    not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))

    seqs = list(range(1, count+1)) # [1,2,...,count]
    c = 1
    t = 0
    n_trans = 0
    n_recv = 0
    finish = False
    while t < timeout:
        if t==interval and c<=count:
            # send packet
            h.checksum = 0
            h.seq = c
            h.timestamp = utime.ticks_us()
            h.checksum = checksum(pkt)
            if sock.send(pkt) == size:
                n_trans += 1
                t = 0 # reset timeout
            else:
                seqs.remove(c)
            c += 1

        # recv packet
        while 1:
            socks, _, _ = uselect.select([sock], [], [], 0)
            if socks:
                resp = socks[0].recv(4096)
                resp_mv = memoryview(resp)
                h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN)
                # TODO: validate checksum (optional)
                seq = h2.seq
                if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
                    t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
                    ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
                    n_recv += 1
                    not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped))
                    seqs.remove(seq)
                    if len(seqs) == 0:
                        finish = True
                        break
            else:
                break

        if finish:
            break

        utime.sleep_ms(1)
        t += 1

    # close
    sock.close()
    ret = (n_trans, n_recv)
    not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv))
    return (n_trans, n_recv)

最近利用“合宙ESP32C3-Core”做了个MicroPython的小项目,用到了HTTP请求的功能。

找到了以下文章介绍urllib.urequest。据说代码少、性能好,能满足80%的需求。能支持HTTPS的请求。但使用下来,发现功能不完善,最大问题是不支持超时(timeout)设置。

折腾一番后,改为使用urequests,相关功能都比较完善,可以设置timeout、请求头等。

源码没改,保存为文件名urequests.py即可:

"""
code from: https://github.com/micropython/micropython-lib/blob/master/python-ecosys/urequests/urequests.py
version: a3d6d29b1b9de2bb147e0751c08a39608ebe06c8
"""
import usocket


class Response:
    def __init__(self, f):
        self.raw = f
        self.encoding = "utf-8"
        self._cached = None

    def close(self):
        if self.raw:
            self.raw.close()
            self.raw = None
        self._cached = None

    @property
    def content(self):
        if self._cached is None:
            try:
                self._cached = self.raw.read()
            finally:
                self.raw.close()
                self.raw = None
        return self._cached

    @property
    def text(self):
        return str(self.content, self.encoding)

    def json(self):
        import ujson

        return ujson.loads(self.content)


def request(
    method,
    url,
    data=None,
    json=None,
    headers={},
    stream=None,
    auth=None,
    timeout=None,
    parse_headers=True,
):
    redirect = None  # redirection url, None means no redirection
    chunked_data = data and getattr(data, "__iter__", None) and not getattr(data, "__len__", None)

    if auth is not None:
        import ubinascii

        username, password = auth
        formated = b"{}:{}".format(username, password)
        formated = str(ubinascii.b2a_base64(formated)[:-1], "ascii")
        headers["Authorization"] = "Basic {}".format(formated)

    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto == "http:":
        port = 80
    elif proto == "https:":
        import ussl

        port = 443
    else:
        raise ValueError("Unsupported protocol: " + proto)

    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)

    ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM)
    ai = ai[0]

    resp_d = None
    if parse_headers is not False:
        resp_d = {}

    s = usocket.socket(ai[0], usocket.SOCK_STREAM, ai[2])

    if timeout is not None:
        # Note: settimeout is not supported on all platforms, will raise
        # an AttributeError if not available.
        s.settimeout(timeout)

    try:
        s.connect(ai[-1])
        if proto == "https:":
            s = ussl.wrap_socket(s, server_hostname=host)
        s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
        if not "Host" in headers:
            s.write(b"Host: %s\r\n" % host)
        # Iterate over keys to avoid tuple alloc
        for k in headers:
            s.write(k)
            s.write(b": ")
            s.write(headers[k])
            s.write(b"\r\n")
        if json is not None:
            assert data is None
            import ujson

            data = ujson.dumps(json)
            s.write(b"Content-Type: application/json\r\n")
        if data:
            if chunked_data:
                s.write(b"Transfer-Encoding: chunked\r\n")
            else:
                s.write(b"Content-Length: %d\r\n" % len(data))
        s.write(b"Connection: close\r\n\r\n")
        if data:
            if chunked_data:
                for chunk in data:
                    s.write(b"%x\r\n" % len(chunk))
                    s.write(chunk)
                    s.write(b"\r\n")
                s.write("0\r\n\r\n")
            else:
                s.write(data)

        l = s.readline()
        # print(l)
        l = l.split(None, 2)
        if len(l) < 2:
            # Invalid response
            raise ValueError("HTTP error: BadStatusLine:\n%s" % l)
        status = int(l[1])
        reason = ""
        if len(l) > 2:
            reason = l[2].rstrip()
        while True:
            l = s.readline()
            if not l or l == b"\r\n":
                break
            # print(l)
            if l.startswith(b"Transfer-Encoding:"):
                if b"chunked" in l:
                    raise ValueError("Unsupported " + str(l, "utf-8"))
            elif l.startswith(b"Location:") and not 200 <= status <= 299:
                if status in [301, 302, 303, 307, 308]:
                    redirect = str(l[10:-2], "utf-8")
                else:
                    raise NotImplementedError("Redirect %d not yet supported" % status)
            if parse_headers is False:
                pass
            elif parse_headers is True:
                l = str(l, "utf-8")
                k, v = l.split(":", 1)
                resp_d[k] = v.strip()
            else:
                parse_headers(l, resp_d)
    except OSError:
        s.close()
        raise

    if redirect:
        s.close()
        if status in [301, 302, 303]:
            return request("GET", redirect, None, None, headers, stream)
        else:
            return request(method, redirect, data, json, headers, stream)
    else:
        resp = Response(s)
        resp.status_code = status
        resp.reason = reason
        if resp_d is not None:
            resp.headers = resp_d
        return resp


def head(url, **kw):
    return request("HEAD", url, **kw)


def get(url, **kw):
    return request("GET", url, **kw)


def post(url, **kw):
    return request("POST", url, **kw)


def put(url, **kw):
    return request("PUT", url, **kw)


def patch(url, **kw):
    return request("PATCH", url, **kw)


def delete(url, **kw):
    return request("DELETE", url, **kw)

最后,用的时候要注意:

  1. 响应结果需要手工关闭,避免再发起请求时会报错。
  2. POST的数据,需要进行URL编码。
  3. 除非POST的数据是JSON,否则最好加上请求头“Content-type”。

示例代码如下:

import urequests

r = urequests.post("https://abc.com/path", data='id=123&name=apple', timeout=10, headers={'User-Agent': 'Micropython(ESP32C3)', 'Content-type': 'application/x-www-form-urlencoded'})
print(r.status_code)  # 打印相应状态,整数,正常为200
print(r.content)  # 打印响应数据
r.close() #  关闭连接

使用Termux多年了。主要是可以安装N多Linux命令,一个应用即可实现N多功能。目前最新版是0.118.0,推荐从F-Droid安装。

这里记录一下相关的初始化动作。

1 相关资料

2 设置国内源

跟大多数Linux发行版类似,软件源设置为国内镜像,有效提高安装、更新软件的效率。官方收录了软件源的中国镜像网站:

Termux Packages - Mirrors Hosted in China

我选择了清华大学的镜像:

Termux | 镜像使用帮助 | 清华大学开源软件镜像站

跟Debian类似,直接修改apt的source.list文件即可。即修改$PREFIX/etc/apt/sources.list文件的内容如下:

# The termux repository mirror from TUNA:
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-main stable main
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-root root stable
deb https://mirrors.tuna.tsinghua.edu.cn/termux/apt/termux-x11 x11 main

保存后,执行apt update,更新一下本地的缓存。

3 安装Termux工具

执行以下命令,安装Termux的相关工具。如果默认安装了,则不用再装。

pkg install termux-tools

4 开启储存访问

执行termux-setup-storage即可。对应Android的sdcard目录为~/storage/shared

5 关闭震动

本人不喜欢以震动方式提示错误,习惯关闭它。编辑文件~/.termux/termux.properties,末尾添加:

bell-character = ignore

6 设置辅助键盘(the extra keys)

官方详细说明: Termux Wiki - Extra Keys Row

值得一提,可以设置按键上滑,让辅助键盘拥有更多的功能。参考配置,修改文件~/.termux/termux.properties,末尾添加:

extra-keys = [ \
   [{key: ESC, popup: {macro: "CTRL d", display: "exit"}}, \
   {key: '/', popup: '\'}, \
   {key: '-', popup: '_'}, \
   {key: '(', popup: '{'}, \
   {key: UP, popup: PGUP}, \
   {key: ')', popup: '}'}, \
   {key: QUOTE, popup: '*'}], \
   [{key: TAB, popup: ':'}, \
   {key: CTRL}, \
   {key: ALT}, \
   {key: LEFT, popup: HOME}, \
   {key: DOWN, popup: PGDN}, \
   {key: RIGHT, popup: END}, \
   {key: APOSTROPHE, popup: '`'}] \
]

7 开启root

  • 如果系统已root,安装tsu,可以切换root用户,或者使用sudo以root用户执行命令。
pkg install tsu
sudo 命令
  • 如果Android系统没有root,安装proot,即可执行需要root权限的命令。
pkg install proot
proot 命令

8 安装vim及配置

本人习惯vim,其它编辑程序,可以选择nanoemacs等。安装命令如下:

pkg install vim

重点配置,编辑文件~/.vimrc,末尾添加:

" 解决中文显示
set fileencodings=utf-8,gb2312,gb18030,gbk,ucs-bom,cp936,latin1
set enc=utf8
set fencs=utf8,gbk,gb2312,gb18030

" 显示行号
set nu

" 颜色主题
colorscheme desert

" 语法高亮
syntax on

最后使配置生效:

source .vimrc

9 Termux相关应用

  • Termux:API

    从Termux访问Android功能。可以实现脚本控制Android。
  • Termux:Boot

    允许程序在启动时运行的Termux扩展应用。
  • Termux:Float

    在浮动终端窗口中使用Termux。
  • Termux:Styling

    自定义Termux终端的样式。
  • Termux:Tasker

    从Tasker运行Termux脚本的Tasker插件。需要配合Tasker使用。
  • Termux:Widget

    从主屏幕启动Termux命令。实现通过点击桌面图标执行相关的sh脚本。

10 其它常用命令

  • OpenSSH

    • 说明:完整的SSH客户端。
    • 安装命令:pkg install openssh
  • ADB

    • 说明:可以使用ADB连接本机,无需root权限就能执行input命令等。
    • 安装命令:pkg install android-tools

1 背景

受“新冠肺炎疫情”影响,出现不能回办公室上班的问题,所以制定一套安全的远程办公方案。

由于办公室有外网IP,原来的方案就是利用路由器的端口映射功能,把各个台式机(操作系统是Windows)的“远程桌面”端口直接映射到外网。这方案缺点如下:

  • “远程桌面”如果存在漏洞,比如绕过登录,电脑上的资料就可能被随意访问。
  • 需要远程访问的台式机,要24小时开机,否则连不上。
  • 需要远程访问的台式机,起码占用路由器的一个端口。

2 解决方案

结合SSH服务、wake on lan、远程桌面,实现更安全和灵活的远程办公。

  • 部署Linux服务器,只映射其SSH服务端口到外网,作为安全入口。
  • SSH客户端几乎覆盖所有平台(包括移动平台),且其功能强大。
  • 使用SSH的端口转发(Port Forward)功能,连上办公室内网的指定IP的“远程桌面”端口。
  • 各个台式机开启wake on lan功能,实现按需开机,工作完关机。
  • 各种操作系统有对应的远程客户端。Windows,使用微软的“远程桌面”客户端,全平台支持;Linux,使用SSH客户端;Mac操作系统,使用VNC客户端。

但是此方案仍有缺点:

  • 需要用户理解SSH及其功能。
  • 使用Linux远程开机命令(wakeonlan),即使把命令简化为Shell脚本,也不是普通人会用。
  • M系列CPU的Mac电脑,不能使用wake on lan,目前只能长期开机。

3 办公室部署

3.1 路由器

路由器的网络需要可外网访问,并且支持端口映射功能。基本路由器都支持端口映射,具体配置参考路由器说明书。

配置路由器外网端口,映射到Linux服务器的SSH服务端口。

3.2 Linux服务器

  1. 安装wakeonlan命令。

Debian或Ubuntu,执行以下命令安装

sudo apt install wakeonlan
  1. 部署SSH服务,作为安全入口。需要SSH服务的安全配置,例如:
  • 仅使用SSHv2协议

    Protocol 2
  • 禁止root用户登录。

    PermitRootLogin no
  • 禁止用户空密码登录。

    PermitEmptyPasswords no
  • 指定白名单用户。

    AllowUsers user1 user2 user3
  • 指定禁止登录的用户(一般指定白名单即可)。

    DenyUsers root user4 user5
  • 限制身份验证最大重试次数。

    MaxAuthTries 3
  • 登录用户的密码,使用强密码,甚至配置使用“密钥”验证登录。
  • 显示最后一次登录的日期和时间。

    PrintLastLog yes
  • 防止特权升级(一般默认配置)

    UsePrivilegeSeparation sandbox
  • 禁用 GSSAPI 认证

    GSSAPIAuthentication no

更详细的设置,可以搜索“Secure SSH”或者“SSH安全加固”等内容。

另外,最好配置一下服务器保持TCP连接的选项,避免客户端自动断开:

  • 开启保持TCP连接

    TCPKeepAlive yes
  • 向客户端发送是否存活的消息的时间间隔,单位是秒,默认是0,不发送

    ClientAliveInterval 30
  • 请求后客户端无响应则自动断开的最大次数

    ClientAliveCountMax 3

3.3 台式机

  1. 主板开启wake on lan功能。具体BIOS设置,需要查询主板的说明书。一般注意以下几点:

    • 板载有线网卡设置启用。
    • wake on lan设置启用。
    • 启动项,允许PCIE设备启动。
    • 启动项,出现pxe rom可选。
  2. 操作系统开启wake on lan功能。即操作系统执行关机时,让主板不要完全断电,并允许网卡运行于可接收Magic Package的状态,用于网络启动电脑。

  3. 开启远程访问服务。各个操作系统配置如下:

    • windows,开启“远程桌面”服务。
    • Linux,开启SSH服务。一般默认开启的。
    • Mac OS,开启“远程访问”服务,可以SSH客户端访问,即字符界面。
    • Mac OS,开启“远程桌面”服务,可以VNC客户端访问,即图形界面。

4 客户端部署

主要就是SSH客户端 + 远程客户端。

4.1 SSH客户端

4.1.1 Linux

一般Linux操作系统默认安装SSH客户端,如果没有,安装“OpenSSH”或者“Dropbear SSH”的客户端即可。

4.1.2 Windows

Windows 10或11可以通过“WinGet”命令安装“OpenSSH”客户端。例如:

winget install opensssh

Windows 7可以使用“PuTTY”。Windows都可以安装这个。

4.1.3 Android

可以使用“Termux”,再安装“OpenSSH”。

pkg install openssh

或者使用其它SSH客户端App。

4.1.4 iOS

安装Termius。需要注册账号,免费版可以使用SSH客户端和端口转发功能。

4.2 远程桌面客户端

  • Windows,自带“远程桌面”客户端。
  • Linux,推荐安装“Remmina”。
  • Android,安装微软官方“远程桌面”App。
  • iOS,安装微软官方“远程桌面”App。

4.3 VNC客户端

  • Windows,使用开源的“TightVNC”。
  • 其它,待补充。

5 客户端使用

以Windows远程桌面为例,其默认端口为3389,并假设该台式机的IP为192.168.0.123。其它服务类似操作。

  1. 远程开机。

    启动SSH客户端并登录,使用wakeonlan命令 + MAC地址,启动对应的台式机。注意,需要记录该台式机有线网卡的MAC地址。
  2. 开启端口转发。

    启动SSH客户端,设置本地端口(例如 43389)转发到办公室内网指定电脑端口(例如 192.168.0.123:3389)。
  3. 连接远程桌面。

    远程桌面客户端连接到本机端口(例如 127.0.0.1:43389),即可访问。如果是管理员帐号登录,需勾选“管理员模式”。

5.2 远程开机

普通用户执行wakeonlan命令,参数是对应台式机网卡的MAC地址。然后使用ping命令,检查该台式机是否开机成功。

要注意,Windows操作系统,不要使用shutdown /s命令关机,会导致wakeonlan命令无法开机。

5.3 开启端口转发

假设,办公室的外网域名为remote.office.com,SSH映射外网端口为22222,SSH登录用户为r-user,需要通过访问192.168.0.123:3389的“远程桌面”服务,并且本机开启43389端口去访问。

5.3.1 SSH命令

使用SSH客户端(例如OpenSSH客户端)的,直接执行以下命令,然后输入密码,让其一直运行即可。

ssh -f -N -L 43389:192.168.0.123:3389 r-user@remote.office.com -p 22222 -o ServerAliveInterval=30

关键参数说明如下:

  • -f后台运行。
  • -N不执行命令。
  • -L 43389:192.168.0.123:3389是把本机43389端口转发到办公室内网的192.168.0.123:3389端口。
  • -o ServerAliveInterval=30是每30秒向服务器发生一条表示客户端存活的消息,用于保持连接。

关于客户端保持连接,可以修改/etc/ssh/ssh_config文件,在Host *的配置下,加入以下配置。然后运行ssh命令,不用加上-o ServerAliveInterval=30这个参数。

ServerAliveInterval 30
ServerAliveCountMax 3

5.3.2 PuTTY设置

  1. 点Category -> Session,在Host name填remote.office.com,Port填22222,Connection Type选SSH。
  2. 点Category -> Connection -> Data,在Auto-login username填r-user
  3. 点Category -> Connection -> SSH -> Tunnels,Add new forward port下,Source port填43389,Destination填192.168.0.123:3389,勾选下面的“Local”和“Auto”,再点“Add”。
  4. 点Category -> Connection,在Seconds between keepalives (0 to turn off)填10,并勾选Enable TCP keepalives (SO_KEEPALIVE option)选项。这一步是设置客户端保持连接。
  5. 点Category -> Session,在Saved Sessions填remote_office,再点“Save”保存配置。
  6. 连接时,点Category -> Session,选中remote_office,点“Open”。输入密码后让其保持运行即可。

5.3.3 iOS设置Termius

  1. 安装Termius,并注册账户。
  2. 设置保持后台运行。

    • 在Settings -> SESSIONS -> 开启”Active Connect Saver“和”Save Location Data“。
    • 据说是使用了“获取地理位置”权限,实现App保持后台运行。
  3. 新建Hosts。

    • 填写连接到办公室的域名remote.office.com和SSH端口22222,然后命名为remote_office
  4. 新建Port Forwarding。

    • 在Port Forwarding,点“+”新建。
    • -> 选Local,点Continue。
    • ->“Set the local port and binding address”的Port number填写映射到本机的端口,例如3389,点CONTINUE。
    • -> 点Select a host,并选sdoffice。
    • -> “Set the destination host”填写目标电脑的内网IP和远程桌面端口,例如address为192.168.0.123,port为3389,点CONTINUE。
    • -> 最后填写标签,例如101-rdp,点DONE
  5. 连接。

    • 在Port Forwarding,长按101-rdp,点Connect。

5.4 远程桌面客户端

添加电脑,电脑名称为127.0.0.1:43389。如果是使用管理员账号,记得开启“管理员模式”。

6 其它方案

6.1 前端安全替代

  1. 使用虚拟内网,即VPN。连上VPN就等于进入办公室内网。

    • Android和iOS原生支持L2TP、IPSec、IKEv2等协议,不用安装客户端。
    • 路由器同样只需映射VPN服务的端口。
  2. 使用堡垒机做入口。

    • JumpServer。未了解。
    • Next Terminal。了解过,当前版本安全方面考虑不足,手机访问“远程桌面”不支持触屏等。
  3. 其它商业解决方案

    • TeamViewer
    • 向日葵远程控制软件

6.2 网络启动功能替代

可以使用WiFi开关 + 电脑通电启动,实现替代,但需要购买WiFi开关硬件。

本来为了ESP32-C3刷上LVGL,才玩Arduino。但是Arduino确实没MicroPython好玩,而且,我真的需要LVGL吗?这里先记录一下相关操作。

1 刷Arduino固件

使用Arduino IDE操作,最简单。

参考教程:

1.1 设置开发板为ESP32-C3

安装好Arduino IDE(本文所用版本是1.8.19),运行。进入“File” -> “Preferences” -> “Settings”,在“Additional Boards Manager URLs”输入以下网址,并点“OK”。

https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_dev_index.json

要注意,如果本机不能访问以上链接,可在“File” -> “Preferences” -> “Network”设置代理。

然后进入“Tools” -> “Board: xxx” -> “Boards Manager…”。在“Boards Manager”弹出框,搜“esp32”,并选择最高版本,点“Install”。

安装完成后,再次进入“Tools” -> “Board: xxx”,选中“ESP32C3 Dev Module”即可。可以看到“Tools”菜单显示“Board: ESP32C3 Dev Module”,并在下面显示硬件相关信息。

1.2 刷入固件

先在“Tools” -> “Flash Mode”要选“DIO”(这个很重要), 再点“Tools” -> “Burn Bootloader”,等待刷入成功即可。

“Tools”显示的硬件信息参考:

Board: "ESP32C3 Dev Module"
Upload Speed: "921600"
USB CDC on Boot: "Disabled"
CPU Frequency: "160MHz (WiFi)"
Flash Frequency: "80MHz"
Flash Mode: "DIO"
Flash Size: "4MB (32Mb)"
Partition Scheme: "Default 4MB with spiffs (1.2MB APP/1.5MB SPIFFS)"
Core Debug Level: "None"
Erase All Flash before Sketch Upload: "Disabled"
Port: "/dev/ttyACM0"

2 使用I2C OLED屏

这里使用的I2C OLED屏,SSD1315(可用SSD1306的驱动),0.96寸,4针。详细参考如下:

2.1 接线

OLED屏 ESP32-C3
GND<-->25, GND
VCC<-->26, 3.3V
SCL<-->27, GPIO05, I2C_SCL
SDA<-->28, GPIO04, I2C_SDA

2.2 示例代码

以下示例是在屏幕上显示一行文字”Hello, Fox!“。其最麻烦的地方,是找个合适的字体。上传程序前,记得“Tools” -> “Flash Mode”要选“DIO”。

#include <Arduino.h>
#include <U8g2lib.h>

#ifdef U8X8_HAVE_HW_SPI
#include <SPI.h>
#endif
#ifdef U8X8_HAVE_HW_I2C
#include <Wire.h>
#endif

U8G2_SSD1306_128X64_NONAME_F_HW_I2C u8g2(U8G2_R0, /* reset=*/ U8X8_PIN_NONE, /* clock=*/ 5, /* data=*/ 4);

void setup(void) {
  u8g2.begin();
}

void loop(void) {
  u8g2.clearBuffer();         // clear the internal memory
  u8g2.setFont(u8g2_font_chargen_92_mf); // choose a suitable font
  u8g2.drawStr(0,14,"Hello, Fox!");  // write something to the internal memory
  u8g2.sendBuffer();          // transfer internal memory to the display
  delay(1000);  
}

终于用上了Windows 11,版本是22h2。记录一下针对开发人员的相关优化。

1 安装时跳过TPM限制

安装过程,在提示“这台电脑无法安装Windows11”的界面,按Shift + F10,弹出CMD窗口输入“regedit”,打开注册表编辑器。

在注册表编辑器进入[HKEY_LOCAL_MACHINE\SYSTEM\Setup],新建“项”,名为“LabConfig”(注意大小写一致)。

在“LabConfig”下,新建两个“DWORD (32位)值”,如下(记得注意大小写一致):

  • 数值名称:BypassTPMCheck,数值数据:00000001,基数:十六进制(H)
  • 数值名称:BypassSecureBootCheck,数值数据:00000001,基数:十六进制(H)

完成后关闭“注册表编辑器”、“CMD窗口”,按返回上一步的按钮,再按下一页,就通过系统限制检测。

2 安装时避免强制登录账户

按Shift + F10,弹出CMD窗口输入“regedit”,打开注册表编辑器。

在注册表编辑器进入[HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\OOBE],新建一个“DWORD (32位)值”,如下(记得注意大小写一致),如下。

  • 数值名称:BypassNRO,数值数据:00000001,基数:十六进制(H)

退出注册表编辑器后,输入命令“logoff”,即可即可跳过强制联网登录账号。

如果不想编辑注册表,在CMD窗口输入命令“oobe\BypassNRO.cmd”,系统重启后即可。

3 创建本地用户

安装成功后,需要创建本地用户。最好不要登录微软账户。

最简单的是创建和使用管理员账号(Administrator)登录,避免访问其它硬盘的已存在文件时,提示需要管理员账号。

如果使用自定义账号(即拥有管理员权限的非Administrator账号),需要修改电脑上已存在文件的权限,改为新增账号拥有“完全控制”的权限。如果存在大量零碎文件,这个过程超级漫长。

4 解锁任务栏

在Windows 11中,用户无法通过“任务栏”使用“快速启动工具栏”,通过安装“ExplorerPatcher”解锁相关功能。

ExplorerPatcher - GitHub
https://github.com/valinet/ExplorerPatcher

我整理了一堆快捷方式,大多是常用的,利用“快速启动工具栏”,实现快速点击,类似开始菜单的效果。

5 优化内存占用

1)关闭不必要的自启动

依次进入:设置 -> 应用 -> 启动。把不需要自启软件关闭。

2)关闭动画效果

依次进入:设置 -> 辅助功能 -> 视觉效果。“动画效果”设置关闭。这个优化,大概省了1GB内存。

6 恢复旧的右键菜单

新的右键菜单过于简单,隐藏了太多的功能。使用管理员运行CMD,输入以下命令。执行成功后重启,即可看到效果。

reg.exe add "HKCU\Software\Classes\CLSID\{86ca1aa0-34aa-4e8b-a509-50c905bae2a2}\InprocServer32" /f /ve

7 安装 WinMerge

开源的文件对比和合并工具,用于替代Beyond Compare。

启用“高级菜单”,选择文件对比时更方便。运行WinMerge,依次进入“编辑”->“选项”->左侧菜单点“系统集成”,在“资源管理器”下,勾选“添加到上下文菜单”、“启用高级菜单”。

8 显示多个时区的时间

依次进入:设置 -> 时间和语言 -> 日期和时间 -> 相关链接 -> 附加时钟。设置完毕后按“确定”。建议设置以下时间:

  • (UTC) 协调世界时。

设置完成后,点击任务栏的时间,即可看到新增的时区时间。

9 Office软件

目前使用开源的LibreOffice,代替微软Office,用着还行。

10 安装WSL2

Window上运行Linux软件,包括GUI软件。这样连SSH客户端都不用安装了。

主要过程是,开启Windows功能、安装内核、安装Linux发行版。相关文档如下:

11 解决不能自动关机

关机时,会提示有进程正在运行,需要点按钮才能关机。这问题导致使用shutdown命令和远程桌面都不能关机。

解决方案:打开注册表,进入“\HKEY_USERS.DEFAULT\Control Panel\Desktop”,新建“字符串值(S)”,名称为“AutoEndTasks”,值为“1”。

经过一段时间的爆米花机烘焙咖啡豆,对烘豆有了进一步的认识和感受,于是决心升级一下装备。

1 烘豆的原因

首先还是要来个灵魂敲问,为什么要烘豆?

通过亲自烘焙咖啡豆,对咖啡的味道有很多的把控,而不局限于手冲的过程。另外,烘焙出来的豆子更多的了解,可以更好地针对目标味道而配合不同的手冲方法。

如果想再进一步对咖啡出品的把控,就需要参与到咖啡树的种植和咖啡生豆的处理。显然,对大多咖啡爱好者来说,都是遥不可及的事情。烘焙咖啡豆有适合个人玩家的器具、方式,处理好的咖啡生豆也容易买到,所以适合去玩。

不过,重点是喜欢和享受烘焙咖啡豆这个事情。

2 烘豆操作

烘焙咖啡豆,就是通过对咖啡生豆进行加热,制作出具有独特香气和味道的咖啡熟豆的过程。

碍于个人的认识和篇幅所限,只能点出重点,详细内容需要参考相关资料,或者亲自体会。

2.1 整个操作

主要操作是挑豆、暖机、进豆、加热和翻炒、下豆冷却、养豆。

挑豆是把咖啡生豆中的杂质和瑕疵豆挑选出来,避免咖啡冲煮出杂味。一般买到的咖啡生豆,都已经去除了杂质,包括石头、沙子等。瑕疵豆,包括带壳豆、发霉豆、发酵豆、未成熟豆、虫蛀豆、贝壳豆、残缺豆、黑豆、死豆等。

暖机是进豆前,把机器加热到一定的温度。一般认为,热风烘豆机加热速度快,不需要暖机。

进豆就是把生豆倒入机器。此时烘焙过程开始,并进行计时。

加热和翻炒,目的是均匀地给咖啡豆加热,避免某些豆子或豆子的某部分过度加热而碳化(就是变得焦黑)。这过程就是烘焙过程。

下豆冷却,是为了降低咖啡豆的梅纳反应,锁住咖啡豆风味。这个时间越短越好。一般带风晒的“冷却盘”,还能实现去掉银皮的效果。

养豆,是由于咖啡豆本身的梅纳反应还在继续,会排出二氧化碳。一般养豆3~7天,让其味道稳定下来。拿刚烘焙和养豆后的咖啡豆,冲煮对比一下,就能明显感受到区别。

2.2 烘焙过程

主要分为脱水、转黄、一爆、二爆这几个阶段。比较专业的划分是:

  • 脱水期,从开始烘焙到转黄,决定豆子是否容易夹生。
  • 梅纳反应期,又叫美娜德反应,从转黄到一爆开始,产生大部分芳香物质。
  • 发展期,从一爆到下豆,决定豆子的风味。

爆米花机一般10分钟左右下豆(完成烘焙),具体时间要看豆子的状态,以及烘焙师想要达到的烘焙度。

DTR,即Development Time Ratio,是指“发展期”占烘豆总时间的百分比。《咖啡烘豆的科學》一书中,作者Scott Rao认为,DTR应于20~25%之间。

3 上一版的不足

升级,就是为了弥补上一版的不足。上一版的缺点包括:

  • 1)只能烘焙50~60g生豆,份量太少。
  • 2)想要提升风力和热量,需要更换配件,比较折腾。
  • 3)热风采用侧风出口,不仅噪音大,还翻动不均匀。
  • 4)组装的零件多,主要是玻璃和不锈钢材质,烘焙过程由于爆米花机震动导致噪音大,影响一爆的声音。

4 新版的解决方案

针对上一版的缺点,解决方案如下:

  • 1)爆米花机改为“北欧欧慕(Nathome)NBM001”。
    热风采用底部直风出口,功率达到1400w,能够翻动100g生豆且受热均匀。其改装过程跟上一个一样,只需把直流电机的电源线,从主板上分离,再接上直流变压器的母头(注意正负极)。
  • 2)采用透明玻璃酒瓶和过滤豆浆渣的布袋,作为爆米花机上盖,避免噪音。
    把酒瓶去底,直接插到爆米花机豆仓上。酒瓶出口就套上过滤豆浆渣的布袋,用于收集银皮。这个简化的装置,几乎没有噪音,而且能清晰观察豆子的烘焙程度。
  • 3)热风温度很高,能在冬天烘豆。
    发热管70%的电压,能吹出220°C热风。

5 总结

升级后的烘豆机,出品很均匀,能够实现不同程度的烘焙度。使用中深烘的云南廉价豆(练习豆),能逼出焦糖风味,甜感爆棚。也试过浅烘的“云南佐园经典日晒”,酸味很突出。

烘焙时,可以试试慢烘与快烘。慢烘是参照别人的烘焙曲线,通过调整发热管电压,实现爬温效果。快烘就是一直高温吹豆,大概3分钟出锅。