summaryrefslogtreecommitdiff
path: root/decoders/uart.py
diff options
context:
space:
mode:
Diffstat (limited to 'decoders/uart.py')
-rw-r--r--decoders/uart.py367
1 files changed, 367 insertions, 0 deletions
diff --git a/decoders/uart.py b/decoders/uart.py
new file mode 100644
index 0000000..ee39697
--- /dev/null
+++ b/decoders/uart.py
@@ -0,0 +1,367 @@
+##
+## This file is part of the sigrok project.
+##
+## Copyright (C) 2011 Uwe Hermann <uwe@hermann-uwe.de>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+##
+
+#
+# UART protocol decoder
+#
+
+import sigrok
+
+# States
+WAIT_FOR_START_BIT = 0
+GET_START_BIT = 1
+GET_DATA_BITS = 2
+GET_PARITY_BIT = 3
+GET_STOP_BITS = 4
+
+# Parity options
+PARITY_NONE = 0
+PARITY_ODD = 1
+PARITY_EVEN = 2
+PARITY_ZERO = 3
+PARITY_ONE = 4
+
+# Stop bit options
+STOP_BITS_0_5 = 0
+STOP_BITS_1 = 1
+STOP_BITS_1_5 = 2
+STOP_BITS_2 = 3
+
+# Bit order options
+LSB_FIRST = 0
+MSB_FIRST = 1
+
+# Output data formats
+DATA_FORMAT_ASCII = 0
+DATA_FORMAT_HEX = 1
+
+# TODO: Remove me later.
+quick_hack = 1
+
+class Sample():
+ def __init__(self, data):
+ self.data = data
+ def probe(self, probe):
+ s = ord(self.data[probe / 8]) & (1 << (probe % 8))
+ return True if s else False
+
+def sampleiter(data, unitsize):
+ for i in range(0, len(data), unitsize):
+ yield(Sample(data[i:i+unitsize]))
+
+# Given a parity type to check (odd, even, zero, one), the value of the
+# parity bit, the value of the data, and the length of the data (5-9 bits,
+# usually 8 bits) return True if the parity is correct, False otherwise.
+# PARITY_NONE is _not_ allowed as value for 'parity_type'.
+def parity_ok(parity_type, parity_bit, data, num_data_bits):
+
+ # Handle easy cases first (parity bit is always 1 or 0).
+ if parity_type == PARITY_ZERO:
+ return parity_bit == 0
+ elif parity_type == PARITY_ONE:
+ return parity_bit == 1
+
+ # Count number of 1 (high) bits in the data (and the parity bit itself!).
+ parity = bin(data).count('1') + parity_bit
+
+ # Check for odd/even parity.
+ if parity_type == PARITY_ODD:
+ return (parity % 2) == 1
+ elif parity_type == PARITY_EVEN:
+ return (parity % 2) == 0
+ else:
+ raise Exception('Invalid parity type: %d' % parity_type)
+
+class Decoder(sigrok.Decoder):
+ id = 'uart'
+ name = 'UART'
+ longname = 'Universal Asynchronous Receiver/Transmitter (UART)'
+ desc = 'Universal Asynchronous Receiver/Transmitter (UART)'
+ longdesc = 'TODO.'
+ author = 'Uwe Hermann'
+ email = 'uwe@hermann-uwe.de'
+ license = 'gplv2+'
+ inputs = ['logic']
+ outputs = ['uart']
+ probes = {
+ # Allow specifying only one of the signals, e.g. if only one data
+ # direction exists (or is relevant).
+ ## 'rx': {'ch': 0, 'name': 'RX', 'desc': 'UART receive line'},
+ ## 'tx': {'ch': 1, 'name': 'TX', 'desc': 'UART transmit line'},
+ 'rx': 0,
+ 'tx': 1,
+ }
+ options = {
+ 'baudrate': ['UART baud rate', 115200],
+ 'num_data_bits': ['Data bits', 8], # Valid: 5-9.
+ 'parity': ['Parity', PARITY_NONE],
+ 'parity_check': ['Check parity', True],
+ 'num_stop_bits': ['Stop bit(s)', STOP_BITS_1],
+ 'bit_order': ['Bit order', LSB_FIRST],
+ 'data_format': ['Output data format', DATA_FORMAT_ASCII],
+ # TODO: Options to invert the signal(s).
+ # ...
+ }
+
+ def __init__(self, **kwargs):
+ self.probes = Decoder.probes.copy()
+
+ # Set defaults, can be overridden in 'start'.
+ self.baudrate = 115200
+ self.num_data_bits = 8
+ self.parity = PARITY_NONE
+ self.check_parity = True
+ self.num_stop_bits = 1
+ self.bit_order = LSB_FIRST
+ self.data_format = DATA_FORMAT_ASCII
+
+ self.samplenum = 0
+ self.frame_start = -1
+ self.startbit = -1
+ self.cur_data_bit = 0
+ self.databyte = 0
+ self.stopbit1 = -1
+ self.startsample = -1
+
+ # Initial state.
+ self.staterx = WAIT_FOR_START_BIT
+
+ # Get the channel/probe number of the RX/TX signals.
+ ## self.rx_bit = self.probes['rx']['ch']
+ ## self.tx_bit = self.probes['tx']['ch']
+ self.rx_bit = self.probes['rx']
+ self.tx_bit = self.probes['tx']
+
+ self.oldrx = None
+ self.oldtx = None
+
+ def start(self, metadata):
+ self.unitsize = metadata['unitsize']
+ self.samplerate = metadata['samplerate']
+
+ # TODO
+ ### self.baudrate = metadata['baudrate']
+ ### self.num_data_bits = metadata['num_data_bits']
+ ### self.parity = metadata['parity']
+ ### self.parity_check = metadata['parity_check']
+ ### self.num_stop_bits = metadata['num_stop_bits']
+ ### self.bit_order = metadata['bit_order']
+ ### self.data_format = metadata['data_format']
+
+ # The width of one UART bit in number of samples.
+ self.bit_width = float(self.samplerate) / float(self.baudrate)
+
+ def report(self):
+ pass
+
+ # Return true if we reached the middle of the desired bit, false otherwise.
+ def reached_bit(self, bitnum):
+ # bitpos is the samplenumber which is in the middle of the
+ # specified UART bit (0 = start bit, 1..x = data, x+1 = parity bit
+ # (if used) or the first stop bit, and so on).
+ bitpos = self.frame_start + (self.bit_width / 2.0)
+ bitpos += bitnum * self.bit_width
+ if self.samplenum >= bitpos:
+ return True
+ return False
+
+ def reached_bit_last(self, bitnum):
+ bitpos = self.frame_start + ((bitnum + 1) * self.bit_width)
+ if self.samplenum >= bitpos:
+ return True
+ return False
+
+ def wait_for_start_bit(self, old_signal, signal):
+ # The start bit is always 0 (low). As the idle UART (and the stop bit)
+ # level is 1 (high), the beginning of a start bit is a falling edge.
+ if not (old_signal == 1 and signal == 0):
+ return
+
+ # Save the sample number where the start bit begins.
+ self.frame_start = self.samplenum
+
+ self.staterx = GET_START_BIT
+
+ def get_start_bit(self, signal):
+ # Skip samples until we're in the middle of the start bit.
+ if not self.reached_bit(0):
+ return []
+
+ self.startbit = signal
+
+ if self.startbit != 0:
+ # TODO: Startbit must be 0. If not, we report an error.
+ pass
+
+ self.cur_data_bit = 0
+ self.databyte = 0
+ self.startsample = -1
+
+ self.staterx = GET_DATA_BITS
+
+ if quick_hack: # TODO
+ return []
+
+ o = [{'type': 'S', 'range': (self.frame_start, self.samplenum),
+ 'data': None, 'ann': 'Start bit'}]
+ return o
+
+ def get_data_bits(self, signal):
+ # Skip samples until we're in the middle of the desired data bit.
+ if not self.reached_bit(self.cur_data_bit + 1):
+ return []
+
+ # Save the sample number where the data byte starts.
+ if self.startsample == -1:
+ self.startsample = self.samplenum
+
+ # Get the next data bit in LSB-first or MSB-first fashion.
+ if self.bit_order == LSB_FIRST:
+ self.databyte >>= 1
+ self.databyte |= (signal << (self.num_data_bits - 1))
+ elif self.bit_order == MSB_FIRST:
+ self.databyte <<= 1
+ self.databyte |= (signal << 0)
+ else:
+ raise Exception('Invalid bit order value: %d', self.bit_order)
+
+ # Return here, unless we already received all data bits.
+ if self.cur_data_bit < self.num_data_bits - 1: # TODO? Off-by-one?
+ self.cur_data_bit += 1
+ return []
+
+ # Convert the data byte into the configured format.
+ if self.data_format == DATA_FORMAT_ASCII:
+ d = chr(self.databyte)
+ elif self.data_format == DATA_FORMAT_HEX:
+ d = '0x%02x' % self.databyte
+ else:
+ raise Exception('Invalid data format value: %d', self.data_format)
+
+ self.staterx = GET_PARITY_BIT
+
+ if quick_hack: # TODO
+ return [d]
+
+ o = [{'type': 'D', 'range': (self.startsample, self.samplenum - 1),
+ 'data': d, 'ann': None}]
+
+ return o
+
+ def get_parity_bit(self, signal):
+ # If no parity is used/configured, skip to the next state immediately.
+ if self.parity == PARITY_NONE:
+ self.staterx = GET_STOP_BITS
+ return []
+
+ # Skip samples until we're in the middle of the parity bit.
+ if not self.reached_bit(self.num_data_bits + 1):
+ return []
+
+ self.paritybit = signal
+
+ self.staterx = GET_STOP_BITS
+
+ if parity_ok(self.parity, self.paritybit, self.databyte,
+ self.num_data_bits):
+ if quick_hack: # TODO
+ # return ['P']
+ return []
+ # TODO: Fix range.
+ o = [{'type': 'P', 'range': (self.samplenum, self.samplenum),
+ 'data': self.paritybit, 'ann': 'Parity bit'}]
+ else:
+ if quick_hack: # TODO
+ return ['PE']
+ o = [{'type': 'PE', 'range': (self.samplenum, self.samplenum),
+ 'data': self.paritybit, 'ann': 'Parity error'}]
+
+ return o
+
+ # TODO: Currently only supports 1 stop bit.
+ def get_stop_bits(self, signal):
+ # Skip samples until we're in the middle of the stop bit(s).
+ skip_parity = 0
+ if self.parity != PARITY_NONE:
+ skip_parity = 1
+ if not self.reached_bit(self.num_data_bits + 1 + skip_parity):
+ return []
+
+ self.stopbit1 = signal
+
+ if self.stopbit1 != 1:
+ # TODO: Stop bits must be 1. If not, we report an error.
+ pass
+
+ self.staterx = WAIT_FOR_START_BIT
+
+ if quick_hack: # TODO
+ return []
+
+ # TODO: Fix range.
+ o = [{'type': 'P', 'range': (self.samplenum, self.samplenum),
+ 'data': None, 'ann': 'Stop bit'}]
+ return o
+
+ def decode(self, data):
+ """UART protocol decoder"""
+
+ out = []
+
+ for sample in sampleiter(data["data"], self.unitsize):
+
+ # TODO: Eliminate the need for ord().
+ s = ord(sample.data)
+
+ # TODO: Start counting at 0 or 1? Increase before or after?
+ self.samplenum += 1
+
+ # First sample: Save RX/TX value.
+ if self.oldrx == None:
+ # Get RX/TX bit values (0/1 for low/high) of the first sample.
+ self.oldrx = (s & (1 << self.rx_bit)) >> self.rx_bit
+ # self.oldtx = (s & (1 << self.tx_bit)) >> self.tx_bit
+ continue
+
+ # Get RX/TX bit values (0/1 for low/high).
+ rx = (s & (1 << self.rx_bit)) >> self.rx_bit
+ # tx = (s & (1 << self.tx_bit)) >> self.tx_bit
+
+ # State machine.
+ if self.staterx == WAIT_FOR_START_BIT:
+ self.wait_for_start_bit(self.oldrx, rx)
+ elif self.staterx == GET_START_BIT:
+ out += self.get_start_bit(rx)
+ elif self.staterx == GET_DATA_BITS:
+ out += self.get_data_bits(rx)
+ elif self.staterx == GET_PARITY_BIT:
+ out += self.get_parity_bit(rx)
+ elif self.staterx == GET_STOP_BITS:
+ out += self.get_stop_bits(rx)
+ else:
+ raise Exception('Invalid state: %s' % self.staterx)
+
+ # Save current RX/TX values for the next round.
+ self.oldrx = rx
+ # self.oldtx = tx
+
+ if out != []:
+ self.put(out)
+