Add demo for uart-passthrough

This commit is contained in:
Wladimir J. van der Laan 2019-05-21 16:51:58 +02:00
parent 72a5434397
commit 8ca44a288f
4 changed files with 385 additions and 0 deletions

View File

@ -22,3 +22,5 @@ The connection from the K210 to the ESP can handle up to `115200*40=4608000` bau
however the connection to the host seems to get stuck at a lower number. Use
`AT+UART_CUR=` (not `UART_DEF` !) to set the baudrate at the ESP side so that
it is always possible to reset the MCU to get back to 115200 baud.
There's a demo in `demo/weather.py`.

1
rust/uart-passthrough/demo/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__

View File

@ -0,0 +1,252 @@
# based on https://github.com/rogerdahl/python-esp8266/blob/master/esp8266.py
# https://github.com/espressif/esp8266_at/wiki/AT_Description
import logging
import re
logging.basicConfig(level=logging.DEBUG)
MAX_CIPSEND_BUFFER_SIZE = 2048
class ESP8266Exception(Exception):
pass
class ESP8266ExceptionUnresolvable(Exception):
pass
CLOSED = object()
class ESP8266(object):
_successResponse_list = (b'ok', b'ready', b'no change', b'send ok')
_unresolvableFailureResponse_list = (
b'error', b'type error', b'alreay connect')
def __init__(self, serial):
assert serial.isOpen(), "Need a connected pySerial object"
self._serial = serial
self.sendCmd(b'AT')
# General
def sendCmd(self, cmd, retries=3):
'''Send an AT command with automatic retries. If retries are exhausted, the final exception is
forwarded to the client. If successful, the response lines are returned in a list.'''
for i in range(retries):
try:
return self._sendCmd(cmd)
except ESP8266Exception:
if i == retries - 1:
raise
def closeCurrent(self):
currentStatus_str, currentProtocol_str, currentHost_str, currentPort_int = self.getCipStatus()
if currentStatus_str in (b'CONNECTED'):
self.closeCip()
def sendBuffer(self, buf):
while buf:
self._sendBuffer(buf[:MAX_CIPSEND_BUFFER_SIZE])
buf = buf[MAX_CIPSEND_BUFFER_SIZE:]
def recvBuffer(self):
'''
State machine for parsing incoming received data.
'''
STATE_IGNORE_TO_EOL = -2
STATE_RECV_DATA = -3
STATE_EXPECT_LF = -4
STATE_EXPECT_CR = -5
STATE_EXPECT = 0
STATE_NUMBER = 1
expect = [b'+IPD,', b'CLOSED'] # prefixes
ofs = 0
state = STATE_EXPECT
state_n = 3 # bit field of expect[N]
recv_size = 0
data = None
rv = None
line = b''
while True:
ch = self._serial.read(1)
line += ch # for debugging only
if ch == b'':
return None
if state != STATE_RECV_DATA and state != STATE_EXPECT_LF:
if ch == b'\r':
if state == STATE_IGNORE_TO_EOL:
print('Ignored unknown response: %s' % line)
line = b''
state = STATE_EXPECT_LF
continue
elif state == STATE_RECV_DATA:
data += ch
if len(data) == recv_size:
return data
if state == STATE_EXPECT_LF:
if ch == b'\n':
return rv
else:
raise IOError('Improperly terminated line')
if state == STATE_EXPECT_CR:
if ch == b'\r':
state = STATE_EXPECT_LF
else:
raise IOError('Improperly terminated line')
elif state == STATE_EXPECT:
new_state_n = 0
terminate = None
for n,e in enumerate(expect):
if (state_n & (1<<n)) and ord(ch) == e[ofs]:
if ofs == len(e) - 1:
terminate = n
else:
new_state_n |= (1<<n)
if terminate is not None:
if terminate == 0:
state = STATE_NUMBER
recv_size = 0
else: # Closed
state = STATE_EXPECT_CR
rv = CLOSED
elif new_state_n:
ofs = ofs + 1
state_n = new_state_n
else:
state = STATE_IGNORE_TO_EOL
elif state == STATE_NUMBER:
if ord(ch) >= 0x30 and ord(ch) <= 0x39:
recv_size = recv_size * 10 + ord(ch) - 0x30
elif ch == b':':
state = STATE_RECV_DATA
data = b''
else:
raise IOError('Length must end with :')
# Access Point
def scanForAccessPoints(self):
return self.sendCmd(b'AT+CWLAP')
def connectToAccessPoint(self, ssid_str, password_str):
'''Call is ignored if already connected to the given access point. If
already connected to another access point, the old access point is
automatically disconnected first.'''
current_ssid_str = self.getConnectedAccessPoint()
if current_ssid_str == ssid_str:
logging.info(
'Already connected to access point: {}'.format(ssid_str))
return
if current_ssid_str != b'<NOT CONNECTED>':
self.disconnectFromAccessPoint()
self.setDeviceMode(1)
self.sendCmd(b'AT+CWJAP_CUR="' + ssid_str +
b'","' + password_str + b'"')
def disconnectFromAccessPoint(self):
self.sendCmd(b'AT+CWQAP')
def getConnectedAccessPoint(self):
try:
responseLines_list = self.sendCmd(b'AT+CWJAP_CUR?')
except ESP8266ExceptionUnresolvable:
return b'<NOT CONNECTED>'
else:
if responseLines_list[0] == b'No AP':
return None
m = re.match(rb'\+CWJAP_CUR:"(.*?)"', responseLines_list[0])
return m.group(1)
# CIP
def startCip(self, protocol_str, host_str, port_int):
self.sendCmd(b'AT+CIPMUX=0') # can only one connection at a time
cmd = b'AT+CIPSTART="%s","%s",%d' % (protocol_str, host_str, port_int)
self.sendCmd(cmd)
def getCipStatus(self):
responseLines_list = self._sendCmd(b'AT+CIPSTATUS')
m = re.match(rb'STATUS:(\d)', responseLines_list[0])
status_int = int(m.group(1)) - 1
status_str = (b'<INVALID>', b'GOTIP', b'CONNECTED',
b'DISCONNECTED', b'<INVALID>')[status_int]
protocol_str, host_str, port_int = None, None, None
if len(responseLines_list) >= 2:
m = re.match(rb'\+CIPSTATUS:0,"(.+)","(.+)",(\d+),0', responseLines_list[1])
if m:
protocol_str, host_str, port_int = m.group(
1), m.group(2), int(m.group(3))
return status_str, protocol_str, host_str, port_int
def closeCip(self):
return self._sendCmd(b'AT+CIPCLOSE')
# Device Mode
def getDeviceMode(self):
responseLines_list = self.sendCmd(b'AT+CWMODE?')
m = re.match(rb'\+CWMODE:(\d)', responseLines_list[0])
return int(m.group(1))
def setDeviceMode(self, deviceMode_int):
'''1=client, 2=AP, 3=both'''
currentDeviceMode_int = self.getDeviceMode()
if currentDeviceMode_int != deviceMode_int:
self.sendCmd(b'AT+CWMODE=%d' % (deviceMode_int))
# Misc
def getIPAddress(self):
responseLines_list = self.sendCmd(b'AT+CIFSR')
return responseLines_list[0]
#
# Private.
#
def _sendCmd(self, cmd):
self._sendStr(cmd)
return self._getResponse()
def _sendStr(self, s):
logging.debug('> %s' % s)
self._serial.flushInput()
self._serial.write(s + b'\r\n')
# eat echo
self._serial.readline()
def _getResponse(self):
responseLines_list = []
while True:
r = self._serial.readline()
if not r:
raise ESP8266Exception('Timeout. Possible partial response: {}'.format(
' / '.format(responseLines_list)))
r = r.strip()
if r:
logging.debug('< {}'.format(r))
responseLines_list.append(r)
if (r.lower() in self._successResponse_list):
return responseLines_list
if (r.lower() in self._unresolvableFailureResponse_list):
raise ESP8266ExceptionUnresolvable(
'Failed with unresolvable response: {}'.format(' / '.format(responseLines_list)))
raise ESP8266Exception('Failed with unknown response: {}'.format(
' / '.format(responseLines_list)))
def _sendBuffer(self, s):
assert len(s) <= MAX_CIPSEND_BUFFER_SIZE
self._sendCmd(b'AT+CIPSEND=%d' % (len(s)))
# eat the send prompt ("> ")
#sendPrompt_str = self._serial.read(2)
#assert sendPrompt_str == b'> '
# send the buffer
self._serial.write(s)
return self._getResponse()

View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
import argparse
import os
import serial
import struct
import sys
import time
from esp8266 import ESP8266, CLOSED
def expect(a, b):
if a == b:
return
print(f'Mismatch: {a} versus {b}', file=sys.stderr)
assert(0)
def sanity_check(ser):
'''Check sanity of current rate'''
retries = 0
ser.write(b'AT\r\n')
while retries < 5:
line1 = ser.readline()
if line1 == b'AT\r\n':
expect(ser.readline(), b'\r\n')
expect(ser.readline(), b'OK\r\n')
return
retries += 1
raise IOError('Basic connection check failed')
def send_oob(ser, buf):
ser.baudrate = 115200
# Toggle DTR for OoB command
ser.dtr = False
time.sleep(0.01)
ser.dtr = True
ser.write(buf)
time.sleep(0.01)
def reset_esp8266(ser):
send_oob(ser, b'\x42') # reset
retries = 0
while retries < 5:
line = ser.readline()
# print('Reset: ', line)
if line == b'ready\r\n':
return
retries += 1
raise IOError('Reset failed')
def set_rate(ser, newbaud):
'''Set baudrate'''
# Need to do three things here:
# - Set WIFI module baudrate
# - Set K210<->WIFI baudrate
# - Set Host<->K210 baudrate
sanity_check(ser)
# Set WIFI module to new rate
cmd = b'AT+UART_CUR=%d,8,1,0,0\r\n' % newbaud
ser.write(cmd)
try:
expect(ser.readline(), cmd)
expect(ser.readline(), b'\r\n')
expect(ser.readline(), b'OK\r\n')
except IOError as e:
reset_esp8266(ser)
raise
send_oob(ser, b'\x23' + struct.pack('<I', newbaud))
ser.baudrate = newbaud
sanity_check(ser)
def parse_args():
parser = argparse.ArgumentParser(description='Fetch the current weather from wttr.in through ESP8266 passthrough')
parser.add_argument('--skip-reset', help='Skip reset and baudrate setting', action='store_true')
parser.add_argument('--baud', help='Set baudrate (default 1382400)', type=int, default=12*115200)
return parser.parse_args()
ser = serial.Serial('/dev/ttyUSB1', 115200, timeout=1)
try:
args = parse_args()
TARGET_BAUD = args.baud
wifi_ap = os.getenv('WIFI_AP')
wifi_pass = os.getenv('WIFI_PASS')
if wifi_ap is None or wifi_pass is None:
print('Set environment variables WIFI_AP and (optionally) WIFI_PASS')
exit(1)
if not args.skip_reset:
print('\x1b[95mNote: if reset fails, the only way to reset the ESP8285 is to power off and on the board\x1b[0m')
print('\x1b[35mReset ESP8266\x1b[0m')
# first, try resetting the chip
reset_esp8266(ser)
sanity_check(ser)
set_rate(ser, TARGET_BAUD)
print('\x1b[35mSwitching to rate %d was successful\x1b[0m' % TARGET_BAUD)
else:
ser.baud = TARGET_BAUD
ser.timeout = 5 # longer timeout while using the device, allow some time to actually connect to AP
esp = ESP8266(ser)
#print(esp.scanForAccessPoints())
esp.connectToAccessPoint(wifi_ap.encode(), wifi_pass.encode())
print('\x1b[35mIP address: {}\x1b[0m'.format(esp.getIPAddress()))
esp.closeCurrent()
#esp.startCip(b'TCP', b'192.168.1.110', 8000)
esp.startCip(b'TCP', b'wttr.in', 80)
esp.sendBuffer(b'GET /?0qA HTTP/1.1\r\nHost: wttr.in\r\nConnection: close\r\nUser-Agent: Weather-Spy\r\n\r\n')
data = b''
while True:
rv = esp.recvBuffer()
if rv is CLOSED:
break
elif rv is not None:
data += rv
print('Received %d bytes' % len(data))
else:
print('(timeout or empty)')
idx = data.find(b'\r\n\r\n')
print()
print(data[idx+4:].decode())
finally:
# always try to switch back before exiting so next invocation won't be messed up
set_rate(ser, 115200)