diff options
author | Gerhard Sittig <gerhard.sittig@gmx.net> | 2018-10-14 20:17:35 +0200 |
---|---|---|
committer | Uwe Hermann <uwe@hermann-uwe.de> | 2018-10-16 21:49:00 +0200 |
commit | 03a986ea61ec565b5e366b950759d4ad753d3aea (patch) | |
tree | 3d0f29b441b12f10c989d81d81f4a40d46ccde79 /decoders | |
parent | 2dfe09863649bcf85e7e0c977886c243ebab19ad (diff) | |
download | libsigrokdecode-03a986ea61ec565b5e366b950759d4ad753d3aea.tar.gz libsigrokdecode-03a986ea61ec565b5e366b950759d4ad753d3aea.zip |
uart: add support for break condition detection
There are the "traffic inspecting" wait() conditions, which check an
edge to find the start of START, then wait for sample points to grab the
bit values. Bit times are sampled in their respective center, potential
glitches around sample points get ignored.
Add another independent set of wait() conditions which check _all_ edges
regardless of any data communication. This results in the most reliable
and maintainable detection of break conditions, regardless of how they
align to data frames. Break is defined as a period of low input signal
which spans at least one frame's length. Run the edge inspection after
data inspection, which results in the most appropriate annotation output
like leading data bits (of incomplete frames), frame errors (violated
STOP bit expectations), then break conditions. This approach is most
robust in the presence of incomplete input streams.
Diffstat (limited to 'decoders')
-rw-r--r-- | decoders/uart/pd.py | 62 |
1 files changed, 57 insertions, 5 deletions
diff --git a/decoders/uart/pd.py b/decoders/uart/pd.py index 6c3d85c..ffacad9 100644 --- a/decoders/uart/pd.py +++ b/decoders/uart/pd.py @@ -124,14 +124,18 @@ class Decoder(srd.Decoder): ('tx-warnings', 'TX warnings'), ('rx-data-bits', 'RX data bits'), ('tx-data-bits', 'TX data bits'), + ('rx-break', 'RX break'), + ('tx-break', 'TX break'), ) annotation_rows = ( ('rx-data', 'RX', (0, 2, 4, 6, 8)), ('rx-data-bits', 'RX bits', (12,)), ('rx-warnings', 'RX warnings', (10,)), + ('rx-break', 'RX break', (14,)), ('tx-data', 'TX', (1, 3, 5, 7, 9)), ('tx-data-bits', 'TX bits', (13,)), ('tx-warnings', 'TX warnings', (11,)), + ('tx-break', 'TX break', (15,)), ) binary = ( ('rx', 'RX dump'), @@ -156,6 +160,12 @@ class Decoder(srd.Decoder): s, halfbit = self.samplenum, self.bit_width / 2.0 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data) + def putgse(self, ss, es, data): + self.put(ss, es, self.out_ann, data) + + def putpse(self, ss, es, data): + self.put(ss, es, self.out_python, data) + def putbin(self, rxtx, data): s, halfbit = self.startsample[rxtx], self.bit_width / 2.0 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_binary, data) @@ -175,6 +185,7 @@ class Decoder(srd.Decoder): self.startsample = [-1, -1] self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT'] self.databits = [[], []] + self.break_start = [None, None] def start(self): self.out_python = self.register(srd.OUTPUT_PYTHON) @@ -337,6 +348,13 @@ class Decoder(srd.Decoder): self.state[rxtx] = 'WAIT FOR START BIT' + def handle_break(self, rxtx): + self.putpse(self.frame_start[rxtx], self.samplenum, + ['BREAK', rxtx, 0]) + self.putgse(self.frame_start[rxtx], self.samplenum, + [rxtx + 14, ['Break condition', 'Break', 'Brk', 'B']]) + self.state[rxtx] = 'WAIT FOR START BIT' + def get_wait_cond(self, rxtx, inv): # Return condititions that are suitable for Decoder.wait(). Those # conditions either match the falling edge of the START bit, or @@ -373,6 +391,22 @@ class Decoder(srd.Decoder): elif state == 'GET STOP BITS': self.get_stop_bits(rxtx, signal) + def inspect_edge(self, rxtx, signal, inv): + # Inspect edges, independently from traffic, to detect break conditions. + if inv: + signal = not signal + if not signal: + # Signal went low. Start another interval. + self.break_start[rxtx] = self.samplenum + return + # Signal went high. Was there an extended period with low signal? + if self.break_start[rxtx] is None: + return + diff = self.samplenum - self.break_start[rxtx] + if diff >= self.break_min_sample_count: + self.handle_break(rxtx) + self.break_start[rxtx] = None + def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') @@ -383,18 +417,36 @@ class Decoder(srd.Decoder): opt = self.options inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes'] - cond_idx = [None] * len(has_pin) + cond_data_idx = [None] * len(has_pin) + + # Determine the number of samples for a complete frame's time span. + # A period of low signal (at least) that long is a break condition. + frame_samples = 1 # START + frame_samples += self.options['num_data_bits'] + frame_samples += 0 if self.options['parity_type'] == 'none' else 1 + frame_samples += self.options['num_stop_bits'] + frame_samples *= self.bit_width + self.break_min_sample_count = ceil(frame_samples) + cond_edge_idx = [None] * len(has_pin) while True: conds = [] if has_pin[RX]: - cond_idx[RX] = len(conds) + cond_data_idx[RX] = len(conds) conds.append(self.get_wait_cond(RX, inv[RX])) + cond_edge_idx[RX] = len(conds) + conds.append({RX: 'e'}) if has_pin[TX]: - cond_idx[TX] = len(conds) + cond_data_idx[TX] = len(conds) conds.append(self.get_wait_cond(TX, inv[TX])) + cond_edge_idx[TX] = len(conds) + conds.append({TX: 'e'}) (rx, tx) = self.wait(conds) - if cond_idx[RX] is not None and self.matched[cond_idx[RX]]: + if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]: self.inspect_sample(RX, rx, inv[RX]) - if cond_idx[TX] is not None and self.matched[cond_idx[TX]]: + if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]: + self.inspect_edge(RX, rx, inv[RX]) + if cond_data_idx[TX] is not None and self.matched[cond_data_idx[TX]]: self.inspect_sample(TX, tx, inv[TX]) + if cond_edge_idx[TX] is not None and self.matched[cond_edge_idx[TX]]: + self.inspect_edge(TX, tx, inv[TX]) |