diff options
Diffstat (limited to 'tinyusb/hw/mcu/sony/cxd56/tools')
m--------- | tinyusb | 0 | ||||
-rwxr-xr-x | tinyusb/hw/mcu/sony/cxd56/tools/__pycache__/xmodem.cpython-36.pyc | bin | 15257 -> 0 bytes | |||
-rwxr-xr-x | tinyusb/hw/mcu/sony/cxd56/tools/flash_writer.py | 580 | ||||
-rwxr-xr-x | tinyusb/hw/mcu/sony/cxd56/tools/xmodem.py | 590 |
4 files changed, 0 insertions, 1170 deletions
diff --git a/tinyusb b/tinyusb new file mode 160000 +Subproject f8288be03f28ad7b944e6925f49422dfa39202c diff --git a/tinyusb/hw/mcu/sony/cxd56/tools/__pycache__/xmodem.cpython-36.pyc b/tinyusb/hw/mcu/sony/cxd56/tools/__pycache__/xmodem.cpython-36.pyc Binary files differdeleted file mode 100755 index cfa917f7..00000000 --- a/tinyusb/hw/mcu/sony/cxd56/tools/__pycache__/xmodem.cpython-36.pyc +++ /dev/null diff --git a/tinyusb/hw/mcu/sony/cxd56/tools/flash_writer.py b/tinyusb/hw/mcu/sony/cxd56/tools/flash_writer.py deleted file mode 100755 index 840f10c3..00000000 --- a/tinyusb/hw/mcu/sony/cxd56/tools/flash_writer.py +++ /dev/null @@ -1,580 +0,0 @@ -#! /usr/bin/env python3 - -# Copyright (C) 2018 Sony Semiconductor Solutions Corp. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in -# the documentation and/or other materials provided with the -# distribution. -# 3. Neither the name NuttX nor the names of its contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE -# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, -# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS -# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED -# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# - -import time -import sys -import os -import struct -import glob -import fnmatch -import errno -import telnetlib -import argparse -import shutil -import subprocess -import re -import xmodem - -import_serial_module = True - -# When SDK release, plase set SDK_RELEASE as True. -SDK_RELEASE = False - -if SDK_RELEASE : - PRINT_RAW_COMMAND = False - REBOOT_AT_END = True -else : - PRINT_RAW_COMMAND = True - REBOOT_AT_END = True - -try: - import serial -except: - import_serial_module = False - -# supported environment various -# CXD56_PORT -# CXD56_TELNETSRV_PORT -# CXD56_TELNETSRV_IP - -PROTOCOL_SERIAL = 0 -PROTOCOL_TELNET = 1 - -MAX_DOT_COUNT = 70 - -# configure parameters and default value -class ConfigArgs: - PROTOCOL_TYPE = None - SERIAL_PORT = "COM1" - SERVER_PORT = 4569 - SERVER_IP = "localhost" - EOL = bytes([10]) - WAIT_RESET = True - AUTO_RESET = False - DTR_RESET = False - XMODEM_BAUD = 0 - NO_SET_BOOTABLE = False - PACKAGE_NAME = [] - FILE_NAME = [] - ERASE_NAME = [] - PKGSYS_NAME = [] - PKGAPP_NAME = [] - PKGUPD_NAME = [] - -ROM_MSG = [b"Welcome to nash"] -XMDM_MSG = "Waiting for XMODEM (CRC or 1K) transfer. Ctrl-X to cancel." - -class ConfigArgsLoader(): - def __init__(self): - self.parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) - self.parser.add_argument("package_name", help="the name of the package to install", nargs='*') - self.parser.add_argument("-f", "--file", dest="file_name", help="save file", action='append') - self.parser.add_argument("-e", "--erase", dest="erase_name", help="erase file", action='append') - - self.parser.add_argument("-S", "--sys", dest="pkgsys_name", help="the name of the system package to install", action='append') - self.parser.add_argument("-A", "--app", dest="pkgapp_name", help="the name of the application package to install", action='append') - self.parser.add_argument("-U", "--upd", dest="pkgupd_name", help="the name of the updater package to install", action='append') - - self.parser.add_argument("-a", "--auto-reset", dest="auto_reset", - action="store_true", default=None, - help="try to auto reset develop board if possible") - self.parser.add_argument("-d", "--dtr-reset", dest="dtr_reset", - action="store_true", default=None, - help="try to auto reset develop board if possible") - self.parser.add_argument("-n", "--no-set-bootable", dest="no_set_bootable", - action="store_true", default=None, - help="not to set bootable") - - group = self.parser.add_argument_group() - group.add_argument("-i", "--server-ip", dest="server_ip", - help="the ip address connected to the telnet server") - group.add_argument("-p", "--server-port", dest="server_port", type=int, - help="the port connected to the telnet server") - - group = self.parser.add_argument_group() - group.add_argument("-c", "--serial-port", dest="serial_port", help="the serial port") - group.add_argument("-b", "--xmodem-baudrate", dest="xmodem_baud", help="Use the faster baudrate in xmodem") - - mutually_group = self.parser.add_mutually_exclusive_group() - mutually_group.add_argument("-t", "--telnet-protocol", dest="telnet_protocol", - action="store_true", default=None, - help="use the telnet protocol for binary transmission") - mutually_group.add_argument("-s", "--serial-protocol", dest="serial_protocol", - action="store_true", default=None, - help="use the serial port for binary transmission, default options") - - mutually_group2 = self.parser.add_mutually_exclusive_group() - mutually_group2.add_argument("-F", "--force-wait-reset", dest="wait_reset", - action="store_true", default=None, - help="force wait for pressing RESET button") - mutually_group2.add_argument("-N", "--no-wait-reset", dest="wait_reset", - action="store_false", default=None, - help="if possible, skip to wait for pressing RESET button") - - def update_config(self): - args = self.parser.parse_args() - - ConfigArgs.PACKAGE_NAME = args.package_name - ConfigArgs.FILE_NAME = args.file_name - ConfigArgs.ERASE_NAME = args.erase_name - ConfigArgs.PKGSYS_NAME = args.pkgsys_name - ConfigArgs.PKGAPP_NAME = args.pkgapp_name - ConfigArgs.PKGUPD_NAME = args.pkgupd_name - - # Get serial port or telnet server ip etc - if args.serial_protocol == True: - ConfigArgs.PROTOCOL_TYPE = PROTOCOL_SERIAL - elif args.telnet_protocol == True: - ConfigArgs.PROTOCOL_TYPE = PROTOCOL_TELNET - - if ConfigArgs.PROTOCOL_TYPE == None: - proto = os.environ.get("CXD56_PROTOCOL") - if proto is not None: - if 's' in proto: - ConfigArgs.PROTOCOL_TYPE = PROTOCOL_SERIAL - elif 't' in proto: - ConfigArgs.PROTOCOL_TYPE = PROTOCOL_TELNET - - if ConfigArgs.PROTOCOL_TYPE == None: - ConfigArgs.PROTOCOL_TYPE = PROTOCOL_SERIAL - - if ConfigArgs.PROTOCOL_TYPE == PROTOCOL_SERIAL: - if args.serial_port is not None: - ConfigArgs.SERIAL_PORT = args.serial_port - else: - # Get serial port from the environment - port = os.environ.get("CXD56_PORT") - if port is not None: - ConfigArgs.SERIAL_PORT = port - else: - print("CXD56_PORT is not set, Use " + ConfigArgs.SERIAL_PORT + ".") - else: - ConfigArgs.PROTOCOL_TYPE = PROTOCOL_TELNET - if args.server_port is not None: - ConfigArgs.SERVER_PORT = args.server_port - else: - port = os.environ.get("CXD56_TELNETSRV_PORT") - if port is not None: - ConfigArgs.SERVER_PORT = port - else: - print("CXD56_TELNETSRV_PORT is not set, Use " + str(ConfigArgs.SERVER_PORT) + ".") - if args.server_ip is not None: - ConfigArgs.SERVER_IP = args.server_ip - else: - ip = os.environ.get("CXD56_TELNETSRV_IP") - if ip is not None: - ConfigArgs.SERVER_IP = ip - else: - print("CXD56_TELNETSRV_IP is not set, Use " + ConfigArgs.SERVER_IP + ".") - - if args.xmodem_baud is not None: - ConfigArgs.XMODEM_BAUD = args.xmodem_baud - - if args.auto_reset is not None: - ConfigArgs.AUTO_RESET = args.auto_reset - - if args.dtr_reset is not None: - ConfigArgs.DTR_RESET = args.dtr_reset - - if args.no_set_bootable is not None: - ConfigArgs.NO_SET_BOOTABLE = args.no_set_bootable - - if args.wait_reset is not None: - ConfigArgs.WAIT_RESET = args.wait_reset - -class TelnetDev: - def __init__(self): - srv_ipaddr = ConfigArgs.SERVER_IP - srv_port = ConfigArgs.SERVER_PORT - self.recvbuf = b''; - try: - self.telnet = telnetlib.Telnet(host=srv_ipaddr, port=srv_port, timeout=10) - # There is a ack to be sent after connecting to the telnet server. - self.telnet.write(b"\xff") - except Exception as e: - print("Cannot connect to the server %s:%d" % (srv_ipaddr, srv_port)) - sys.exit(e.args[0]) - - def readline(self, size=None): - res = b'' - ch = b'' - while ch != ConfigArgs.EOL: - ch = self.getc_raw(1, timeout=0.1) - if ch == b'': - return res - res += ch - return res - - def getc_raw(self, size, timeout=1): - res = b'' - tm = time.monotonic() - while size > 0: - while self.recvbuf == b'': - self.recvbuf = self.telnet.read_eager() - if self.recvbuf == b'': - if (time.monotonic() - tm) > timeout: - return res - time.sleep(0.1) - res += self.recvbuf[0:1] - self.recvbuf = self.recvbuf[1:] - size -= 1 - return res - - def write(self, buffer): - self.telnet.write(buffer) - - def discard_inputs(self, timeout=1.0): - while True: - ch = self.getc_raw(1, timeout=timeout) - if ch == b'': - break - - def getc(self, size, timeout=1): - c = self.getc_raw(size, timeout) - return c - - def putc(self, buffer, timeout=1): - self.telnet.write(buffer) - self.show_progress(len(buffer)) - - def reboot(self): - # no-op - pass - - def set_file_size(self, filesize): - self.bytes_transfered = 0 - self.filesize = filesize - self.count = 0 - - def show_progress(self, sendsize): - if PRINT_RAW_COMMAND: - if self.count < MAX_DOT_COUNT: - self.bytes_transfered = self.bytes_transfered + sendsize - cur_count = int(self.bytes_transfered * MAX_DOT_COUNT / self.filesize) - if MAX_DOT_COUNT < cur_count: - cur_count = MAX_DOT_COUNT - for idx in range(cur_count - self.count): - print('#',end='') - sys.stdout.flush() - self.count = cur_count - if self.count == MAX_DOT_COUNT: - print("\n") - -class SerialDev: - def __init__(self): - if import_serial_module is False: - print("Cannot import serial module, maybe it's not install yet.") - print("\n", end="") - print("Please install python-setuptool by Cygwin installer.") - print("After that use easy_intall command to install serial module") - print(" $ cd tool/") - print(" $ python3 -m easy_install pyserial-2.7.tar.gz") - quit() - else: - port = ConfigArgs.SERIAL_PORT - try: - self.serial = serial.Serial(port, baudrate=115200, - parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, - bytesize=serial.EIGHTBITS, timeout=0.1) - except Exception as e: - print("Cannot open port : " + port) - sys.exit(e.args[0]) - - def readline(self, size=None): - return self.serial.readline(size) - - def write(self, buffer): - self.serial.write(buffer) - self.serial.flush() - - def discard_inputs(self, timeout=1.0): - time.sleep(timeout) - self.serial.flushInput() - - def getc(self, size, timeout=1): - self.serial.timeout = timeout - c = self.serial.read(size) - self.serial.timeout = 0.1 - return c - - def putc(self, buffer, timeout=1): - self.serial.timeout = timeout - self.serial.write(buffer) - self.serial.flush() - self.serial.timeout = 0.1 - self.show_progress(len(buffer)) - - # Note: windows platform dependent code - def putc_win(self, buffer, timeout=1): - self.serial.write(buffer) - self.show_progress(len(buffer)) - while True: - if self.serial.out_waiting == 0: - break - - def setBaudrate(self, baudrate): -# self.serial.setBaudrate(baudrate) - self.serial.baudrate = baudrate - - def reboot(self): - # Target Reset by DTR - self.serial.setDTR(False) - self.serial.setDTR(True) - self.serial.setDTR(False) - - def set_file_size(self, filesize): - self.bytes_transfered = 0 - self.filesize = filesize - self.count = 0 - - def show_progress(self, sendsize): - if PRINT_RAW_COMMAND: - if self.count < MAX_DOT_COUNT: - self.bytes_transfered = self.bytes_transfered + sendsize - cur_count = int(self.bytes_transfered * MAX_DOT_COUNT / self.filesize) - if MAX_DOT_COUNT < cur_count: - cur_count = MAX_DOT_COUNT - for idx in range(cur_count - self.count): - print('#',end='') - sys.stdout.flush() - self.count = cur_count - if self.count == MAX_DOT_COUNT: - print("\n") - -class FlashWriter: - def __init__(self, protocol_sel=PROTOCOL_SERIAL): - if protocol_sel == PROTOCOL_TELNET: - self.serial = TelnetDev() - else: - self.serial = SerialDev() - - def cancel_autoboot(self) : - boot_msg = '' - self.serial.reboot() # Target reboot before send 'r' - while boot_msg == '' : - rx = self.serial.readline().strip() - self.serial.write(b"r") # Send "r" key to avoid auto boot - for msg in ROM_MSG : - if msg in rx : - boot_msg = msg - break - while True : - rx = self.serial.readline().decode(errors="replace").strip() - if "updater" in rx : - # Workaround : Sometime first character is dropped. - # Send line feed as air shot before actual command. - self.serial.write(b"\n") # Send line feed - self.serial.discard_inputs()# Clear input buffer to sync - return boot_msg.decode(errors="ignore") - - def recv(self): - rx = self.serial.readline() - if PRINT_RAW_COMMAND : - serial_line = rx.decode(errors="replace") - if serial_line.strip() != "" and not serial_line.startswith(XMDM_MSG): - print(serial_line, end="") - return rx - - def wait(self, string): - while True: - rx = self.recv() - if string.encode() in rx: - time.sleep(0.1) - break - - def wait_for_prompt(self): - prompt_pat = re.compile(b"updater") - while True: - rx = self.recv() - if prompt_pat.search(rx): - time.sleep(0.1) - break - - def send(self, string): - self.serial.write(str(string).encode() + b"\n") - rx = self.serial.readline() - if PRINT_RAW_COMMAND : - print(rx.decode(errors="replace"), end="") - - def read_output(self, prompt_text) : - output = [] - while True : - rx = self.serial.readline() - if prompt_text.encode() in rx : - time.sleep(0.1) - break - if rx != "" : - output.append(rx.decode(errors="ignore").rstrip()) - return output - - def install_files(self, files, command) : - if ConfigArgs.XMODEM_BAUD: - command += " -b " + ConfigArgs.XMODEM_BAUD - if os.name == 'nt': - modem = xmodem.XMODEM(self.serial.getc, self.serial.putc_win, 'xmodem1k') - else: - modem = xmodem.XMODEM(self.serial.getc, self.serial.putc, 'xmodem1k') - for file in files: - with open(file, "rb") as bin : - self.send(command) - print("Install " + file) - self.wait(XMDM_MSG) - print("|0%" + - "-" * (int(MAX_DOT_COUNT / 2) - 6) + - "50%" + - "-" * (MAX_DOT_COUNT - int(MAX_DOT_COUNT / 2) - 5) + - "100%|") - if ConfigArgs.XMODEM_BAUD: - self.serial.setBaudrate(ConfigArgs.XMODEM_BAUD) - self.serial.discard_inputs() # Clear input buffer to sync - self.serial.set_file_size(os.path.getsize(file)) - modem.send(bin) - if ConfigArgs.XMODEM_BAUD: - self.serial.setBaudrate(115200) - self.wait_for_prompt() - - def save_files(self, files) : - if ConfigArgs.XMODEM_BAUD: - command = "save_file -b " + ConfigArgs.XMODEM_BAUD + " -x " - else: - command = "save_file -x " - if os.name == 'nt': - modem = xmodem.XMODEM(self.serial.getc, self.serial.putc_win, 'xmodem1k') - else: - modem = xmodem.XMODEM(self.serial.getc, self.serial.putc, 'xmodem1k') - for file in files: - with open(file, "rb") as bin : - self.send(command + os.path.basename(file)) - print("Save " + file) - self.wait(XMDM_MSG) - if ConfigArgs.XMODEM_BAUD: - self.serial.setBaudrate(ConfigArgs.XMODEM_BAUD) - self.serial.discard_inputs() # Clear input buffer to sync - self.serial.set_file_size(os.path.getsize(file)) - modem.send(bin) - if ConfigArgs.XMODEM_BAUD: - self.serial.setBaudrate(115200) - self.wait_for_prompt() - self.send("chmod d+rw " + os.path.basename(file)) - self.wait_for_prompt() - - def delete_files(self, files) : - for file in files : - self.delete_binary(file) - - def delete_binary(self, bin_name) : - self.send("rm " + bin_name) - self.wait_for_prompt() - -def main(): - try: - config_loader = ConfigArgsLoader() - config_loader.update_config() - except: - return errno.EINVAL - - # Wait to reset the board - writer = FlashWriter(ConfigArgs.PROTOCOL_TYPE) - - do_wait_reset = True - if ConfigArgs.AUTO_RESET: - if subprocess.call("cd " + sys.path[0] + "; ./reset_board.sh", shell=True) == 0: - print("auto reset board sucess!!") - do_wait_reset = False - bootrom_msg = writer.cancel_autoboot() - - if ConfigArgs.DTR_RESET: - do_wait_reset = False - bootrom_msg = writer.cancel_autoboot() - - if ConfigArgs.WAIT_RESET == False and do_wait_reset == True: - rx = writer.recv() - time.sleep(1) - for i in range(3): - writer.send("") - rx = writer.recv() - if "updater".encode() in rx: - # No need to wait for reset - do_wait_reset = False - break - time.sleep(1) - - if do_wait_reset: - # Wait to reset the board - print('Please press RESET button on target board') - sys.stdout.flush() - bootrom_msg = writer.cancel_autoboot() - - # Remove files - if ConfigArgs.ERASE_NAME : - print(">>> Remove exisiting files ...") - writer.delete_files(ConfigArgs.ERASE_NAME) - - # Install files - if ConfigArgs.PACKAGE_NAME or ConfigArgs.PKGSYS_NAME or ConfigArgs.PKGAPP_NAME or ConfigArgs.PKGUPD_NAME: - print(">>> Install files ...") - if ConfigArgs.PACKAGE_NAME : - writer.install_files(ConfigArgs.PACKAGE_NAME, "install") - if ConfigArgs.PKGSYS_NAME : - writer.install_files(ConfigArgs.PKGSYS_NAME, "install") - if ConfigArgs.PKGAPP_NAME : - writer.install_files(ConfigArgs.PKGAPP_NAME, "install") - if ConfigArgs.PKGUPD_NAME : - writer.install_files(ConfigArgs.PKGUPD_NAME, "install -k updater.key") - - # Save files - if ConfigArgs.FILE_NAME : - print(">>> Save files ...") - writer.save_files(ConfigArgs.FILE_NAME) - - # Set auto boot - if not ConfigArgs.NO_SET_BOOTABLE: - print(">>> Save Configuration to FlashROM ...") - writer.send("set bootable M0P") - writer.wait_for_prompt() - - # Sync all cached data to flash - writer.send("sync") - writer.wait_for_prompt() - - if REBOOT_AT_END : - print("Restarting the board ...") - writer.send("reboot") - - return 0 - -if __name__ == "__main__": - try: - sys.exit(main()) - except KeyboardInterrupt: - print("Canceled by keyboard interrupt.") - pass diff --git a/tinyusb/hw/mcu/sony/cxd56/tools/xmodem.py b/tinyusb/hw/mcu/sony/cxd56/tools/xmodem.py deleted file mode 100755 index c9343005..00000000 --- a/tinyusb/hw/mcu/sony/cxd56/tools/xmodem.py +++ /dev/null @@ -1,590 +0,0 @@ -''' -=============================== - XMODEM file transfer protocol -=============================== - -.. $Id$ - -This is a literal implementation of XMODEM.TXT_, XMODEM1K.TXT_ and -XMODMCRC.TXT_, support for YMODEM and ZMODEM is pending. YMODEM should -be fairly easy to implement as it is a hack on top of the XMODEM -protocol using sequence bytes ``0x00`` for sending file names (and some -meta data). - -.. _XMODEM.TXT: doc/XMODEM.TXT -.. _XMODEM1K.TXT: doc/XMODEM1K.TXT -.. _XMODMCRC.TXT: doc/XMODMCRC.TXT - -Data flow example including error recovery -========================================== - -Here is a sample of the data flow, sending a 3-block message. -It includes the two most common line hits - a garbaged block, -and an ``ACK`` reply getting garbaged. ``CRC`` or ``CSUM`` represents -the checksum bytes. - -XMODEM 128 byte blocks ----------------------- - -:: - - SENDER RECEIVER - - <-- NAK - SOH 01 FE Data[128] CSUM --> - <-- ACK - SOH 02 FD Data[128] CSUM --> - <-- ACK - SOH 03 FC Data[128] CSUM --> - <-- ACK - SOH 04 FB Data[128] CSUM --> - <-- ACK - SOH 05 FA Data[100] CPMEOF[28] CSUM --> - <-- ACK - EOT --> - <-- ACK - -XMODEM-1k blocks, CRC mode --------------------------- - -:: - - SENDER RECEIVER - - <-- C - STX 01 FE Data[1024] CRC CRC --> - <-- ACK - STX 02 FD Data[1024] CRC CRC --> - <-- ACK - STX 03 FC Data[1000] CPMEOF[24] CRC CRC --> - <-- ACK - EOT --> - <-- ACK - -Mixed 1024 and 128 byte Blocks ------------------------------- - -:: - - SENDER RECEIVER - - <-- C - STX 01 FE Data[1024] CRC CRC --> - <-- ACK - STX 02 FD Data[1024] CRC CRC --> - <-- ACK - SOH 03 FC Data[128] CRC CRC --> - <-- ACK - SOH 04 FB Data[100] CPMEOF[28] CRC CRC --> - <-- ACK - EOT --> - <-- ACK - -YMODEM Batch Transmission Session (1 file) ------------------------------------------- - -:: - - SENDER RECEIVER - <-- C (command:rb) - SOH 00 FF foo.c NUL[123] CRC CRC --> - <-- ACK - <-- C - SOH 01 FE Data[128] CRC CRC --> - <-- ACK - SOH 02 FC Data[128] CRC CRC --> - <-- ACK - SOH 03 FB Data[100] CPMEOF[28] CRC CRC --> - <-- ACK - EOT --> - <-- NAK - EOT --> - <-- ACK - <-- C - SOH 00 FF NUL[128] CRC CRC --> - <-- ACK - - -''' - -__author__ = 'Wijnand Modderman <maze@pyth0n.org>' -__copyright__ = ['Copyright (c) 2010 Wijnand Modderman', - 'Copyright (c) 1981 Chuck Forsberg'] -__license__ = 'MIT' -__version__ = '0.3.2' - -import logging -import time -import sys -from functools import partial -import collections - -# Loggerr -log = logging.getLogger('xmodem') - -# Protocol bytes -SOH = bytes([0x01]) -STX = bytes([0x02]) -EOT = bytes([0x04]) -ACK = bytes([0x06]) -DLE = bytes([0x10]) -NAK = bytes([0x15]) -CAN = bytes([0x18]) -CRC = bytes([0x43]) # C - - -class XMODEM(object): - ''' - XMODEM Protocol handler, expects an object to read from and an object to - write to. - - >>> def getc(size, timeout=1): - ... return data or None - ... - >>> def putc(data, timeout=1): - ... return size or None - ... - >>> modem = XMODEM(getc, putc) - - - :param getc: Function to retreive bytes from a stream - :type getc: callable - :param putc: Function to transmit bytes to a stream - :type putc: callable - :param mode: XMODEM protocol mode - :type mode: string - :param pad: Padding character to make the packets match the packet size - :type pad: char - - ''' - - # crctab calculated by Mark G. Mendel, Network Systems Corporation - crctable = [ - 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, - 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, - 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, - 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, - 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, - 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, - 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, - 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, - 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, - 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, - 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, - 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, - 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, - 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, - 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, - 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, - 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, - 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, - 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, - 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, - 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, - 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, - 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, - 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, - 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, - 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, - 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, - 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, - 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, - 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, - 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, - 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, - ] - - def __init__(self, getc, putc, mode='xmodem', pad=b'\x1a'): - self.getc = getc - self.putc = putc - self.mode = mode - self.pad = pad - - def abort(self, count=2, timeout=60): - ''' - Send an abort sequence using CAN bytes. - ''' - for counter in range(0, count): - self.putc(CAN, timeout) - - def send(self, stream, retry=32, timeout=360, quiet=0, callback=None): - ''' - Send a stream via the XMODEM protocol. - - >>> stream = file('/etc/issue', 'rb') - >>> print modem.send(stream) - True - - Returns ``True`` upon succesful transmission or ``False`` in case of - failure. - - :param stream: The stream object to send data from. - :type stream: stream (file, etc.) - :param retry: The maximum number of times to try to resend a failed - packet before failing. - :type retry: int - :param timeout: The number of seconds to wait for a response before - timing out. - :type timeout: int - :param quiet: If 0, it prints info to stderr. If 1, it does not print any info. - :type quiet: int - :param callback: Reference to a callback function that has the - following signature. This is useful for - getting status updates while a xmodem - transfer is underway. - Expected callback signature: - def callback(total_packets, success_count, error_count) - :type callback: callable - ''' - - # initialize protocol - try: - packet_size = dict( - xmodem = 128, - xmodem1k = 1024, - )[self.mode] - except AttributeError: - raise ValueError("An invalid mode was supplied") - - error_count = 0 - crc_mode = 0 - cancel = 0 - while True: - char = self.getc(1) - if char: - if char == NAK: - crc_mode = 0 - break - elif char == CRC: - crc_mode = 1 - break - elif char == CAN: - if not quiet: - print('received CAN', file=sys.stderr) - if cancel: - return False - else: - cancel = 1 - else: - log.error('send ERROR expected NAK/CRC, got %s' % \ - (ord(char),)) - - error_count += 1 - if error_count >= retry: - self.abort(timeout=timeout) - return False - - # send data - error_count = 0 - success_count = 0 - total_packets = 0 - sequence = 1 - while True: - data = stream.read(packet_size) - if not data: - log.info('sending EOT') - # end of stream - break - total_packets += 1 - data = data.ljust(packet_size, self.pad) - if crc_mode: - crc = self.calc_crc(data) - else: - crc = self.calc_checksum(data) - - # emit packet - while True: - if packet_size == 128: - self.putc(SOH) - else: # packet_size == 1024 - self.putc(STX) - self.putc(bytes([sequence])) - self.putc(bytes([0xff - sequence])) - self.putc(data) - if crc_mode: - self.putc(bytes([crc >> 8])) - self.putc(bytes([crc & 0xff])) - else: - self.putc(bytes([crc])) - - char = self.getc(1, timeout) - if char == ACK: - success_count += 1 - if isinstance(callback, collections.Callable): - callback(total_packets, success_count, error_count) - break - if char == NAK: - error_count += 1 - if isinstance(callback, collections.Callable): - callback(total_packets, success_count, error_count) - if error_count >= retry: - # excessive amounts of retransmissions requested, - # abort transfer - self.abort(timeout=timeout) - log.warning('excessive NAKs, transfer aborted') - return False - - # return to loop and resend - continue - else: - log.error('Not ACK, Not NAK') - error_count += 1 - if isinstance(callback, collections.Callable): - callback(total_packets, success_count, error_count) - if error_count >= retry: - # excessive amounts of retransmissions requested, - # abort transfer - self.abort(timeout=timeout) - log.warning('excessive protocol errors, transfer aborted') - return False - - # return to loop and resend - continue - - # protocol error - self.abort(timeout=timeout) - log.error('protocol error') - return False - - # keep track of sequence - sequence = (sequence + 1) % 0x100 - - while True: - # end of transmission - self.putc(EOT) - - #An ACK should be returned - char = self.getc(1, timeout) - if char == ACK: - break - else: - error_count += 1 - if error_count >= retry: - self.abort(timeout=timeout) - log.warning('EOT was not ACKd, transfer aborted') - return False - - return True - - def recv(self, stream, crc_mode=1, retry=16, timeout=60, delay=1, quiet=0): - ''' - Receive a stream via the XMODEM protocol. - - >>> stream = file('/etc/issue', 'wb') - >>> print modem.recv(stream) - 2342 - - Returns the number of bytes received on success or ``None`` in case of - failure. - ''' - - # initiate protocol - error_count = 0 - char = 0 - cancel = 0 - while True: - # first try CRC mode, if this fails, - # fall back to checksum mode - if error_count >= retry: - self.abort(timeout=timeout) - return None - elif crc_mode and error_count < (retry / 2): - if not self.putc(CRC): - time.sleep(delay) - error_count += 1 - else: - crc_mode = 0 - if not self.putc(NAK): - time.sleep(delay) - error_count += 1 - - char = self.getc(1, timeout) - if not char: - error_count += 1 - continue - elif char == SOH: - #crc_mode = 0 - break - elif char == STX: - break - elif char == CAN: - if cancel: - return None - else: - cancel = 1 - else: - error_count += 1 - - # read data - error_count = 0 - income_size = 0 - packet_size = 128 - sequence = 1 - cancel = 0 - while True: - while True: - if char == SOH: - packet_size = 128 - break - elif char == STX: - packet_size = 1024 - break - elif char == EOT: - # We received an EOT, so send an ACK and return the received - # data length - self.putc(ACK) - return income_size - elif char == CAN: - # cancel at two consecutive cancels - if cancel: - return None - else: - cancel = 1 - else: - if not quiet: - print('recv ERROR expected SOH/EOT, got', ord(char), file=sys.stderr) - error_count += 1 - if error_count >= retry: - self.abort() - return None - # read sequence - error_count = 0 - cancel = 0 - seq1 = ord(self.getc(1)) - seq2 = 0xff - ord(self.getc(1)) - if seq1 == sequence and seq2 == sequence: - # sequence is ok, read packet - # packet_size + checksum - data = self.getc(packet_size + 1 + crc_mode, timeout) - if crc_mode: - csum = (ord(data[-2]) << 8) + ord(data[-1]) - data = data[:-2] - log.debug('CRC (%04x <> %04x)' % \ - (csum, self.calc_crc(data))) - valid = csum == self.calc_crc(data) - else: - csum = data[-1] - data = data[:-1] - log.debug('checksum (checksum(%02x <> %02x)' % \ - (ord(csum), self.calc_checksum(data))) - valid = ord(csum) == self.calc_checksum(data) - - # valid data, append chunk - if valid: - income_size += len(data) - stream.write(data) - self.putc(ACK) - sequence = (sequence + 1) % 0x100 - char = self.getc(1, timeout) - continue - else: - # consume data - self.getc(packet_size + 1 + crc_mode) - self.debug('expecting sequence %d, got %d/%d' % \ - (sequence, seq1, seq2)) - - # something went wrong, request retransmission - self.putc(NAK) - - def calc_checksum(self, data, checksum=0): - ''' - Calculate the checksum for a given block of data, can also be used to - update a checksum. - - >>> csum = modem.calc_checksum('hello') - >>> csum = modem.calc_checksum('world', csum) - >>> hex(csum) - '0x3c' - - ''' - return (sum(map(ord, data)) + checksum) % 256 - - def calc_crc(self, data, crc=0): - ''' - Calculate the Cyclic Redundancy Check for a given block of data, can - also be used to update a CRC. - - >>> crc = modem.calc_crc('hello') - >>> crc = modem.calc_crc('world', crc) - >>> hex(crc) - '0xd5e3' - - ''' - for char in data: - crc = (crc << 8) ^ self.crctable[((crc >> 8) ^ int(char)) & 0xff] - return crc & 0xffff - - -XMODEM1k = partial(XMODEM, mode='xmodem1k') - - -def run(): - import optparse - import subprocess - - parser = optparse.OptionParser(usage='%prog [<options>] <send|recv> filename filename') - parser.add_option('-m', '--mode', default='xmodem', - help='XMODEM mode (xmodem, xmodem1k)') - - options, args = parser.parse_args() - if len(args) != 3: - parser.error('invalid arguments') - return 1 - - elif args[0] not in ('send', 'recv'): - parser.error('invalid mode') - return 1 - - def _func(so, si): - import select - import subprocess - - print('si', si) - print('so', so) - - def getc(size, timeout=3): - w,t,f = select.select([so], [], [], timeout) - if w: - data = so.read(size) - else: - data = None - - print('getc(', repr(data), ')') - return data - - def putc(data, timeout=3): - w,t,f = select.select([], [si], [], timeout) - if t: - si.write(data) - si.flush() - size = len(data) - else: - size = None - - print('putc(', repr(data), repr(size), ')') - return size - - return getc, putc - - def _pipe(*command): - pipe = subprocess.Popen(command, - stdout=subprocess.PIPE, stdin=subprocess.PIPE) - return pipe.stdout, pipe.stdin - - if args[0] == 'recv': - import io - getc, putc = _func(*_pipe('sz', '--xmodem', args[2])) - stream = open(args[1], 'wb') - xmodem = XMODEM(getc, putc, mode=options.mode) - status = xmodem.recv(stream, retry=8) - stream.close() - - elif args[0] == 'send': - getc, putc = _func(*_pipe('rz', '--xmodem', args[2])) - stream = open(args[1], 'rb') - xmodem = XMODEM(getc, putc, mode=options.mode) - status = xmodem.send(stream, retry=8) - stream.close() - -if __name__ == '__main__': - sys.exit(run()) |