| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797 |
- #!/usr/bin/env python
- #
- # This file is part of the MicroPython project, http://micropython.org/
- #
- # The MIT License (MIT)
- #
- # Copyright (c) 2014-2021 Damien P. George
- # Copyright (c) 2017 Paul Sokolovsky
- #
- # Permission is hereby granted, free of charge, to any person obtaining a copy
- # of this software and associated documentation files (the "Software"), to deal
- # in the Software without restriction, including without limitation the rights
- # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the Software is
- # furnished to do so, subject to the following conditions:
- #
- # The above copyright notice and this permission notice shall be included in
- # all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- # THE SOFTWARE.
- """
- pyboard interface
- This module provides the Pyboard class, used to communicate with and
- control a MicroPython device over a communication channel. Both real
- boards and emulated devices (e.g. running in QEMU) are supported.
- Various communication channels are supported, including a serial
- connection, telnet-style network connection, external process
- connection.
- Example usage:
- import pyboard
- pyb = pyboard.Pyboard('/dev/ttyACM0')
- Or:
- pyb = pyboard.Pyboard('192.168.1.1')
- Then:
- pyb.enter_raw_repl()
- pyb.exec('import pyb')
- pyb.exec('pyb.LED(1).on()')
- pyb.exit_raw_repl()
- Note: if using Python2 then pyb.exec must be written as pyb.exec_.
- To run a script from the local machine on the board and print out the results:
- import pyboard
- pyboard.execfile('test.py', device='/dev/ttyACM0')
- This script can also be run directly. To execute a local script, use:
- ./pyboard.py test.py
- Or:
- python pyboard.py test.py
- """
- import sys
- import time
- import os
- import ast
- try:
- stdout = sys.stdout.buffer
- except AttributeError:
- # Python2 doesn't have buffer attr
- stdout = sys.stdout
- def stdout_write_bytes(b):
- b = b.replace(b"\x04", b"")
- stdout.write(b)
- stdout.flush()
- class PyboardError(Exception):
- pass
- class TelnetToSerial:
- def __init__(self, ip, user, password, read_timeout=None):
- self.tn = None
- import telnetlib
- self.tn = telnetlib.Telnet(ip, timeout=15)
- self.read_timeout = read_timeout
- if b"Login as:" in self.tn.read_until(b"Login as:", timeout=read_timeout):
- self.tn.write(bytes(user, "ascii") + b"\r\n")
- if b"Password:" in self.tn.read_until(b"Password:", timeout=read_timeout):
- # needed because of internal implementation details of the telnet server
- time.sleep(0.2)
- self.tn.write(bytes(password, "ascii") + b"\r\n")
- if b"for more information." in self.tn.read_until(
- b'Type "help()" for more information.', timeout=read_timeout
- ):
- # login successful
- from collections import deque
- self.fifo = deque()
- return
- raise PyboardError("Failed to establish a telnet connection with the board")
- def __del__(self):
- self.close()
- def close(self):
- if self.tn:
- self.tn.close()
- def read(self, size=1):
- while len(self.fifo) < size:
- timeout_count = 0
- data = self.tn.read_eager()
- if len(data):
- self.fifo.extend(data)
- timeout_count = 0
- else:
- time.sleep(0.25)
- if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
- break
- timeout_count += 1
- data = b""
- while len(data) < size and len(self.fifo) > 0:
- data += bytes([self.fifo.popleft()])
- return data
- def write(self, data):
- self.tn.write(data)
- return len(data)
- def inWaiting(self):
- n_waiting = len(self.fifo)
- if not n_waiting:
- data = self.tn.read_eager()
- self.fifo.extend(data)
- return len(data)
- else:
- return n_waiting
- class ProcessToSerial:
- "Execute a process and emulate serial connection using its stdin/stdout."
- def __init__(self, cmd):
- import subprocess
- self.subp = subprocess.Popen(
- cmd,
- bufsize=0,
- shell=True,
- preexec_fn=os.setsid,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- )
- # Initially was implemented with selectors, but that adds Python3
- # dependency. However, there can be race conditions communicating
- # with a particular child process (like QEMU), and selectors may
- # still work better in that case, so left inplace for now.
- #
- # import selectors
- # self.sel = selectors.DefaultSelector()
- # self.sel.register(self.subp.stdout, selectors.EVENT_READ)
- import select
- self.poll = select.poll()
- self.poll.register(self.subp.stdout.fileno())
- def close(self):
- import signal
- os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
- def read(self, size=1):
- data = b""
- while len(data) < size:
- data += self.subp.stdout.read(size - len(data))
- return data
- def write(self, data):
- self.subp.stdin.write(data)
- return len(data)
- def inWaiting(self):
- # res = self.sel.select(0)
- res = self.poll.poll(0)
- if res:
- return 1
- return 0
- class ProcessPtyToTerminal:
- """Execute a process which creates a PTY and prints slave PTY as
- first line of its output, and emulate serial connection using
- this PTY."""
- def __init__(self, cmd):
- import subprocess
- import re
- import serial
- self.subp = subprocess.Popen(
- cmd.split(),
- bufsize=0,
- shell=False,
- preexec_fn=os.setsid,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- pty_line = self.subp.stderr.readline().decode("utf-8")
- m = re.search(r"/dev/pts/[0-9]+", pty_line)
- if not m:
- print("Error: unable to find PTY device in startup line:", pty_line)
- self.close()
- sys.exit(1)
- pty = m.group()
- # rtscts, dsrdtr params are to workaround pyserial bug:
- # http://stackoverflow.com/questions/34831131/pyserial-does-not-play-well-with-virtual-port
- self.ser = serial.Serial(pty, interCharTimeout=1, rtscts=True, dsrdtr=True)
- def close(self):
- import signal
- os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
- def read(self, size=1):
- return self.ser.read(size)
- def write(self, data):
- return self.ser.write(data)
- def inWaiting(self):
- return self.ser.inWaiting()
- class Pyboard:
- def __init__(
- self, device, baudrate=115200, user="micro", password="python", wait=0, exclusive=True
- ):
- self.in_raw_repl = False
- self.use_raw_paste = True
- if device.startswith("exec:"):
- self.serial = ProcessToSerial(device[len("exec:") :])
- elif device.startswith("execpty:"):
- self.serial = ProcessPtyToTerminal(device[len("qemupty:") :])
- elif device and device[0].isdigit() and device[-1].isdigit() and device.count(".") == 3:
- # device looks like an IP address
- self.serial = TelnetToSerial(device, user, password, read_timeout=10)
- else:
- import serial
- # Set options, and exclusive if pyserial supports it
- serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1}
- if serial.__version__ >= "3.3":
- serial_kwargs["exclusive"] = exclusive
- delayed = False
- for attempt in range(wait + 1):
- try:
- self.serial = serial.Serial(device, **serial_kwargs)
- break
- except (OSError, IOError): # Py2 and Py3 have different errors
- if wait == 0:
- continue
- if attempt == 0:
- sys.stdout.write("Waiting {} seconds for pyboard ".format(wait))
- delayed = True
- time.sleep(1)
- sys.stdout.write(".")
- sys.stdout.flush()
- else:
- if delayed:
- print("")
- raise PyboardError("failed to access " + device)
- if delayed:
- print("")
- def close(self):
- self.serial.close()
- def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
- # if data_consumer is used then data is not accumulated and the ending must be 1 byte long
- assert data_consumer is None or len(ending) == 1
- data = self.serial.read(min_num_bytes)
- if data_consumer:
- data_consumer(data)
- timeout_count = 0
- while True:
- if data.endswith(ending):
- break
- elif self.serial.inWaiting() > 0:
- new_data = self.serial.read(1)
- if data_consumer:
- data_consumer(new_data)
- data = new_data
- else:
- data = data + new_data
- timeout_count = 0
- else:
- timeout_count += 1
- if timeout is not None and timeout_count >= 100 * timeout:
- break
- time.sleep(0.01)
- return data
- def enter_raw_repl(self, soft_reset=True):
- self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program
- # flush input (without relying on serial.flushInput())
- n = self.serial.inWaiting()
- while n > 0:
- self.serial.read(n)
- n = self.serial.inWaiting()
- self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL
- if soft_reset:
- data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>")
- if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"):
- print(data)
- raise PyboardError("could not enter raw repl")
- self.serial.write(b"\x04") # ctrl-D: soft reset
- # Waiting for "soft reboot" independently to "raw REPL" (done below)
- # allows boot.py to print, which will show up after "soft reboot"
- # and before "raw REPL".
- data = self.read_until(1, b"soft reboot\r\n")
- if not data.endswith(b"soft reboot\r\n"):
- print(data)
- raise PyboardError("could not enter raw repl")
- data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n")
- if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"):
- print(data)
- raise PyboardError("could not enter raw repl")
- self.in_raw_repl = True
- def exit_raw_repl(self):
- self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL
- self.in_raw_repl = False
- def follow(self, timeout, data_consumer=None):
- # wait for normal output
- data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer)
- if not data.endswith(b"\x04"):
- raise PyboardError("timeout waiting for first EOF reception")
- data = data[:-1]
- # wait for error output
- data_err = self.read_until(1, b"\x04", timeout=timeout)
- if not data_err.endswith(b"\x04"):
- raise PyboardError("timeout waiting for second EOF reception")
- data_err = data_err[:-1]
- # return normal and error output
- return data, data_err
- def raw_paste_write(self, command_bytes):
- # Read initial header, with window size.
- data = self.serial.read(2)
- window_size = data[0] | data[1] << 8
- window_remain = window_size
- # Write out the command_bytes data.
- i = 0
- while i < len(command_bytes):
- while window_remain == 0 or self.serial.inWaiting():
- data = self.serial.read(1)
- if data == b"\x01":
- # Device indicated that a new window of data can be sent.
- window_remain += window_size
- elif data == b"\x04":
- # Device indicated abrupt end. Acknowledge it and finish.
- self.serial.write(b"\x04")
- return
- else:
- # Unexpected data from device.
- raise PyboardError("unexpected read during raw paste: {}".format(data))
- # Send out as much data as possible that fits within the allowed window.
- b = command_bytes[i : min(i + window_remain, len(command_bytes))]
- self.serial.write(b)
- window_remain -= len(b)
- i += len(b)
- # Indicate end of data.
- self.serial.write(b"\x04")
- # Wait for device to acknowledge end of data.
- data = self.read_until(1, b"\x04")
- if not data.endswith(b"\x04"):
- raise PyboardError("could not complete raw paste: {}".format(data))
- def exec_raw_no_follow(self, command):
- if isinstance(command, bytes):
- command_bytes = command
- else:
- command_bytes = bytes(command, encoding="utf8")
- # check we have a prompt
- data = self.read_until(1, b">")
- if not data.endswith(b">"):
- raise PyboardError("could not enter raw repl")
- if self.use_raw_paste:
- # Try to enter raw-paste mode.
- self.serial.write(b"\x05A\x01")
- data = self.serial.read(2)
- if data == b"R\x00":
- # Device understood raw-paste command but doesn't support it.
- pass
- elif data == b"R\x01":
- # Device supports raw-paste mode, write out the command using this mode.
- return self.raw_paste_write(command_bytes)
- else:
- # Device doesn't support raw-paste, fall back to normal raw REPL.
- data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>")
- if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"):
- print(data)
- raise PyboardError("could not enter raw repl")
- # Don't try to use raw-paste mode again for this connection.
- self.use_raw_paste = False
- # Write command using standard raw REPL, 256 bytes every 10ms.
- for i in range(0, len(command_bytes), 256):
- self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))])
- time.sleep(0.01)
- self.serial.write(b"\x04")
- # check if we could exec command
- data = self.serial.read(2)
- if data != b"OK":
- raise PyboardError("could not exec command (response: %r)" % data)
- def exec_raw(self, command, timeout=10, data_consumer=None):
- self.exec_raw_no_follow(command)
- return self.follow(timeout, data_consumer)
- def eval(self, expression):
- ret = self.exec_("print({})".format(expression))
- ret = ret.strip()
- return ret
- def exec_(self, command, data_consumer=None):
- ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
- if ret_err:
- raise PyboardError("exception", ret, ret_err)
- return ret
- def execfile(self, filename):
- with open(filename, "rb") as f:
- pyfile = f.read()
- return self.exec_(pyfile)
- def get_time(self):
- t = str(self.eval("pyb.RTC().datetime()"), encoding="utf8")[1:-1].split(", ")
- return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])
- def fs_ls(self, src):
- cmd = (
- "import uos\nfor f in uos.ilistdir(%s):\n"
- " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))"
- % (("'%s'" % src) if src else "")
- )
- self.exec_(cmd, data_consumer=stdout_write_bytes)
- def fs_cat(self, src, chunk_size=256):
- cmd = (
- "with open('%s') as f:\n while 1:\n"
- " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
- )
- self.exec_(cmd, data_consumer=stdout_write_bytes)
- def fs_get(self, src, dest, chunk_size=256, progress_callback=None):
- if progress_callback:
- src_size = int(self.exec_("import os\nprint(os.stat('%s')[6])" % src))
- written = 0
- self.exec_("f=open('%s','rb')\nr=f.read" % src)
- with open(dest, "wb") as f:
- while True:
- data = bytearray()
- self.exec_("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d))
- assert data.endswith(b"\r\n\x04")
- try:
- data = ast.literal_eval(str(data[:-3], "ascii"))
- if not isinstance(data, bytes):
- raise ValueError("Not bytes")
- except (UnicodeError, ValueError) as e:
- raise PyboardError("fs_get: Could not interpret received data: %s" % str(e))
- if not data:
- break
- f.write(data)
- if progress_callback:
- written += len(data)
- progress_callback(written, src_size)
- self.exec_("f.close()")
- def fs_put(self, src, dest, chunk_size=256, progress_callback=None):
- if progress_callback:
- src_size = os.path.getsize(src)
- written = 0
- self.exec_("f=open('%s','wb')\nw=f.write" % dest)
- with open(src, "rb") as f:
- while True:
- data = f.read(chunk_size)
- if not data:
- break
- if sys.version_info < (3,):
- self.exec_("w(b" + repr(data) + ")")
- else:
- self.exec_("w(" + repr(data) + ")")
- if progress_callback:
- written += len(data)
- progress_callback(written, src_size)
- self.exec_("f.close()")
- def fs_mkdir(self, dir):
- self.exec_("import uos\nuos.mkdir('%s')" % dir)
- def fs_rmdir(self, dir):
- self.exec_("import uos\nuos.rmdir('%s')" % dir)
- def fs_rm(self, src):
- self.exec_("import uos\nuos.remove('%s')" % src)
- # in Python2 exec is a keyword so one must use "exec_"
- # but for Python3 we want to provide the nicer version "exec"
- setattr(Pyboard, "exec", Pyboard.exec_)
- def execfile(filename, device="/dev/ttyACM0", baudrate=115200, user="micro", password="python"):
- pyb = Pyboard(device, baudrate, user, password)
- pyb.enter_raw_repl()
- output = pyb.execfile(filename)
- stdout_write_bytes(output)
- pyb.exit_raw_repl()
- pyb.close()
- def filesystem_command(pyb, args, progress_callback=None):
- def fname_remote(src):
- if src.startswith(":"):
- src = src[1:]
- return src
- def fname_cp_dest(src, dest):
- src = src.rsplit("/", 1)[-1]
- if dest is None or dest == "":
- dest = src
- elif dest == ".":
- dest = "./" + src
- elif dest.endswith("/"):
- dest += src
- return dest
- cmd = args[0]
- args = args[1:]
- try:
- if cmd == "cp":
- srcs = args[:-1]
- dest = args[-1]
- if srcs[0].startswith("./") or dest.startswith(":"):
- op = pyb.fs_put
- fmt = "cp %s :%s"
- dest = fname_remote(dest)
- else:
- op = pyb.fs_get
- fmt = "cp :%s %s"
- for src in srcs:
- src = fname_remote(src)
- dest2 = fname_cp_dest(src, dest)
- print(fmt % (src, dest2))
- op(src, dest2, progress_callback=progress_callback)
- else:
- op = {
- "ls": pyb.fs_ls,
- "cat": pyb.fs_cat,
- "mkdir": pyb.fs_mkdir,
- "rmdir": pyb.fs_rmdir,
- "rm": pyb.fs_rm,
- }[cmd]
- if cmd == "ls" and not args:
- args = [""]
- for src in args:
- src = fname_remote(src)
- print("%s :%s" % (cmd, src))
- op(src)
- except PyboardError as er:
- print(str(er.args[2], "ascii"))
- pyb.exit_raw_repl()
- pyb.close()
- sys.exit(1)
- _injected_import_hook_code = """\
- import uos, uio
- class _FS:
- class File(uio.IOBase):
- def __init__(self):
- self.off = 0
- def ioctl(self, request, arg):
- return 0
- def readinto(self, buf):
- buf[:] = memoryview(_injected_buf)[self.off:self.off + len(buf)]
- self.off += len(buf)
- return len(buf)
- mount = umount = chdir = lambda *args: None
- def stat(self, path):
- if path == '_injected.mpy':
- return tuple(0 for _ in range(10))
- else:
- raise OSError(-2) # ENOENT
- def open(self, path, mode):
- return self.File()
- uos.mount(_FS(), '/_')
- uos.chdir('/_')
- from _injected import *
- uos.umount('/_')
- del _injected_buf, _FS
- """
- def main():
- import argparse
- cmd_parser = argparse.ArgumentParser(description="Run scripts on the pyboard.")
- cmd_parser.add_argument(
- "-d",
- "--device",
- default=os.environ.get("PYBOARD_DEVICE", "/dev/ttyACM0"),
- help="the serial device or the IP address of the pyboard",
- )
- cmd_parser.add_argument(
- "-b",
- "--baudrate",
- default=os.environ.get("PYBOARD_BAUDRATE", "115200"),
- help="the baud rate of the serial device",
- )
- cmd_parser.add_argument("-u", "--user", default="micro", help="the telnet login username")
- cmd_parser.add_argument("-p", "--password", default="python", help="the telnet login password")
- cmd_parser.add_argument("-c", "--command", help="program passed in as string")
- cmd_parser.add_argument(
- "-w",
- "--wait",
- default=0,
- type=int,
- help="seconds to wait for USB connected board to become available",
- )
- group = cmd_parser.add_mutually_exclusive_group()
- group.add_argument(
- "--soft-reset",
- default=True,
- action="store_true",
- help="Whether to perform a soft reset when connecting to the board [default]",
- )
- group.add_argument(
- "--no-soft-reset",
- action="store_false",
- dest="soft_reset",
- )
- group = cmd_parser.add_mutually_exclusive_group()
- group.add_argument(
- "--follow",
- action="store_true",
- default=None,
- help="follow the output after running the scripts [default if no scripts given]",
- )
- group.add_argument(
- "--no-follow",
- action="store_false",
- dest="follow",
- )
- group = cmd_parser.add_mutually_exclusive_group()
- group.add_argument(
- "--exclusive",
- action="store_true",
- default=True,
- help="Open the serial device for exclusive access [default]",
- )
- group.add_argument(
- "--no-exclusive",
- action="store_false",
- dest="exclusive",
- )
- cmd_parser.add_argument(
- "-f",
- "--filesystem",
- action="store_true",
- help="perform a filesystem action: "
- "cp local :device | cp :device local | cat path | ls [path] | rm path | mkdir path | rmdir path",
- )
- cmd_parser.add_argument("files", nargs="*", help="input files")
- args = cmd_parser.parse_args()
- # open the connection to the pyboard
- try:
- pyb = Pyboard(
- args.device, args.baudrate, args.user, args.password, args.wait, args.exclusive
- )
- except PyboardError as er:
- print(er)
- sys.exit(1)
- # run any command or file(s)
- if args.command is not None or args.filesystem or len(args.files):
- # we must enter raw-REPL mode to execute commands
- # this will do a soft-reset of the board
- try:
- pyb.enter_raw_repl(args.soft_reset)
- except PyboardError as er:
- print(er)
- pyb.close()
- sys.exit(1)
- def execbuffer(buf):
- try:
- if args.follow is None or args.follow:
- ret, ret_err = pyb.exec_raw(
- buf, timeout=None, data_consumer=stdout_write_bytes
- )
- else:
- pyb.exec_raw_no_follow(buf)
- ret_err = None
- except PyboardError as er:
- print(er)
- pyb.close()
- sys.exit(1)
- except KeyboardInterrupt:
- sys.exit(1)
- if ret_err:
- pyb.exit_raw_repl()
- pyb.close()
- stdout_write_bytes(ret_err)
- sys.exit(1)
- # do filesystem commands, if given
- if args.filesystem:
- filesystem_command(pyb, args.files)
- del args.files[:]
- # run the command, if given
- if args.command is not None:
- execbuffer(args.command.encode("utf-8"))
- # run any files
- for filename in args.files:
- with open(filename, "rb") as f:
- pyfile = f.read()
- if filename.endswith(".mpy") and pyfile[0] == ord("M"):
- pyb.exec_("_injected_buf=" + repr(pyfile))
- pyfile = _injected_import_hook_code
- execbuffer(pyfile)
- # exiting raw-REPL just drops to friendly-REPL mode
- pyb.exit_raw_repl()
- # if asked explicitly, or no files given, then follow the output
- if args.follow or (args.command is None and not args.filesystem and len(args.files) == 0):
- try:
- ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes)
- except PyboardError as er:
- print(er)
- sys.exit(1)
- except KeyboardInterrupt:
- sys.exit(1)
- if ret_err:
- pyb.close()
- stdout_write_bytes(ret_err)
- sys.exit(1)
- # close the connection to the pyboard
- pyb.close()
- if __name__ == "__main__":
- main()
|