diff options
-rw-r--r-- | decoders/sbus_futaba/__init__.py | 35 | ||||
-rw-r--r-- | decoders/sbus_futaba/pd.py | 273 |
2 files changed, 308 insertions, 0 deletions
diff --git a/decoders/sbus_futaba/__init__.py b/decoders/sbus_futaba/__init__.py new file mode 100644 index 0000000..9404f4f --- /dev/null +++ b/decoders/sbus_futaba/__init__.py @@ -0,0 +1,35 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net> +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, see <http://www.gnu.org/licenses/>. +## + +''' +SBUS by Futaba, a hobby remote control protocol on top of UART. +Sometimes referred to as "Serial BUS" or S-BUS. + +UART communication typically runs at 100kbps with 8e2 frame format and +inverted signals (high voltage level is logic low). + +SBUS messages take 3ms to transfer, and typically repeat in intervals +of 7ms or 14ms. An SBUS message consists of 25 UART bytes, and carries +16 proportional channels with 11 bits each, and 2 digital channels +(boolean, 1 bit), and flags which represent current communication state. +Proportional channel values typically are in the 192..1792 range, but +individual implementations may differ. +''' + +from .pd import Decoder diff --git a/decoders/sbus_futaba/pd.py b/decoders/sbus_futaba/pd.py new file mode 100644 index 0000000..75c2cfb --- /dev/null +++ b/decoders/sbus_futaba/pd.py @@ -0,0 +1,273 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net> +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, see <http://www.gnu.org/licenses/>. +## + +""" +OUTPUT_PYTHON format: + +Packet: +(<ptype>, <pdata>) + +This is the list of <ptype> codes and their respective <pdata> values: + - 'HEADER': The data is the header byte's value. + - 'PROPORTIONAL': The data is a tuple of the channel number (1-based) + and the channel's value. + - 'DIGITAL': The data is a tuple of the channel number (1-based) + and the channel's value. + - 'FLAG': The data is a tuple of the flag's name, and the flag's value. + - 'FOOTER': The data is the footer byte's value. +""" + +import sigrokdecode as srd +from common.srdhelper import bitpack_lsb + +class Ann: + HEADER, PROPORTIONAL, DIGITAL, FRAME_LOST, FAILSAFE, FOOTER, \ + WARN = range(7) + FLAG_LSB = FRAME_LOST + +class Decoder(srd.Decoder): + api_version = 3 + id = 'sbus_futaba' + name = 'SBUS (Futaba)' + longname = 'Futaba SBUS (Serial bus)' + desc = 'Serial bus for hobby remote control by Futaba' + license = 'gplv2+' + inputs = ['uart'] + outputs = ['sbus_futaba'] + tags = ['Remote Control'] + options = ( + {'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0}, + {'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047}, + ) + annotations = ( + ('header', 'Header'), + ('proportional', 'Proportional'), + ('digital', 'Digital'), + ('framelost', 'Frame Lost'), + ('failsafe', 'Failsafe'), + ('footer', 'Footer'), + ('warning', 'Warning'), + ) + annotation_rows = ( + ('framing', 'Framing', (Ann.HEADER, Ann.FOOTER, + Ann.FRAME_LOST, Ann.FAILSAFE)), + ('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)), + ('warnings', 'Warnings', (Ann.WARN,)), + ) + + def __init__(self): + self.bits_accum = [] + self.sent_fields = None + self.msg_complete = None + self.failed = None + self.reset() + + def reset(self): + self.bits_accum.clear() + self.sent_fields = 0 + self.msg_complete = False + self.failed = None + + def start(self): + self.out_ann = self.register(srd.OUTPUT_ANN) + self.out_py = self.register(srd.OUTPUT_PYTHON) + + def putg(self, ss, es, data): + # Put a graphical annotation. + self.put(ss, es, self.out_ann, data) + + def putpy(self, ss, es, data): + # Pass Python to upper layers. + self.put(ss, es, self.out_py, data) + + def get_ss_es_bits(self, bitcount): + # Get start/end times, and bit values of given length. + # Gets all remaining data when 'bitcount' is None. + if bitcount is None: + bitcount = len(self.bits_accum) + if len(self.bits_accum) < bitcount: + return None, None, None + bits = self.bits_accum[:bitcount] + self.bits_accum = self.bits_accum[bitcount:] + ss, es = bits[0][1], bits[-1][2] + bits = [b[0] for b in bits] + return ss, es, bits + + def flush_accum_bits(self): + # Valid data was queued. See if we got full SBUS fields so far. + # Annotate them early, cease inspection of failed messages. The + # implementation is phrased to reduce the potential for clipboard + # errors: 'upto' is the next supported field count, 'want' is one + # field's bit count. Grab as many as we find in an invocation. + upto = 0 + if self.failed: + return + # Annotate the header byte. Not seeing the expected bit pattern + # emits a warning annotation, but by design won't fail the SBUS + # message. It's considered more useful to present the channels' + # values instead. The warning still raises awareness. + upto += 1 + want = 8 + while self.sent_fields < upto: + if len(self.bits_accum) < want: + return + ss, es, bits = self.get_ss_es_bits(want) + value = bitpack_lsb(bits) + text = ['0x{:02x}'.format(value)] + self.putg(ss, es, [Ann.HEADER, text]) + if value != 0x0f: + text = ['Unexpected header', 'Header'] + self.putg(ss, es, [Ann.WARN, text]) + self.putpy(ss, es, ['HEADER', value]) + self.sent_fields += 1 + # Annotate the proportional channels' data. Check for user + # provided value range violations. Channel numbers are in + # the 1..18 range (1-based). + upto += 16 + want = 11 + while self.sent_fields < upto: + if len(self.bits_accum) < want: + return + ss, es, bits = self.get_ss_es_bits(want) + value = bitpack_lsb(bits) + text = ['{:d}'.format(value)] + self.putg(ss, es, [Ann.PROPORTIONAL, text]) + if value < self.options['prop_val_min']: + text = ['Low proportional value', 'Low value', 'Low'] + self.putg(ss, es, [Ann.WARN, text]) + if value > self.options['prop_val_max']: + text = ['High proportional value', 'High value', 'High'] + self.putg(ss, es, [Ann.WARN, text]) + idx = self.sent_fields - (upto - 16) + ch_nr = 1 + idx + self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)]) + self.sent_fields += 1 + # Annotate the digital channels' data. + upto += 2 + want = 1 + while self.sent_fields < upto: + if len(self.bits_accum) < want: + return + ss, es, bits = self.get_ss_es_bits(want) + value = bitpack_lsb(bits) + text = ['{:d}'.format(value)] + self.putg(ss, es, [Ann.DIGITAL, text]) + idx = self.sent_fields - (upto - 2) + ch_nr = 17 + idx + self.putpy(ss, es, ['DIGITAL', (ch_nr, value)]) + self.sent_fields += 1 + # Annotate the flags' state. Index starts from LSB. + flag_names = ['framelost', 'failsafe', 'msb'] + upto += 2 + want = 1 + while self.sent_fields < upto: + if len(self.bits_accum) < want: + return + ss, es, bits = self.get_ss_es_bits(want) + value = bitpack_lsb(bits) + text = ['{:d}'.format(value)] + idx = self.sent_fields - (upto - 2) + cls = Ann.FLAG_LSB + idx + self.putg(ss, es, [cls, text]) + flg_name = flag_names[idx] + self.putpy(ss, es, ['FLAG', (flg_name, value)]) + self.sent_fields += 1 + # Warn when flags' padding (bits [7:4]) is unexpexted. + upto += 1 + want = 4 + while self.sent_fields < upto: + if len(self.bits_accum) < want: + return + ss, es, bits = self.get_ss_es_bits(want) + value = bitpack_lsb(bits) + if value != 0x0: + text = ['Unexpected MSB flags', 'Flags'] + self.putg(ss, es, [Ann.WARN, text]) + flg_name = flag_names[-1] + self.putpy(ss, es, ['FLAG', (flg_name, value)]) + self.sent_fields += 1 + # Annotate the footer byte. Warn when unexpected. + upto += 1 + want = 8 + while self.sent_fields < upto: + if len(self.bits_accum) < want: + return + ss, es, bits = self.get_ss_es_bits(want) + value = bitpack_lsb(bits) + text = ['0x{:02x}'.format(value)] + self.putg(ss, es, [Ann.FOOTER, text]) + if value != 0x00: + text = ['Unexpected footer', 'Footer'] + self.putg(ss, es, [Ann.WARN, text]) + self.putpy(ss, es, ['FOOTER', value]) + self.sent_fields += 1 + # Check for the completion of an SBUS message. Warn when more + # UART data is seen after the message. Defer the warning until + # more bits were collected, flush at next IDLE or BREAK, which + # spans all unprocessed data, and improves perception. + if self.sent_fields >= upto: + self.msg_complete = True + if self.msg_complete and self.bits_accum: + self.failed = ['Excess data bits', 'Excess'] + + def handle_bits(self, ss, es, bits): + # UART data bits were seen. Store them, validity is yet unknown. + self.bits_accum.extend(bits) + + def handle_frame(self, ss, es, value, valid): + # A UART frame became complete. Get its validity. Process its bits. + if not valid: + self.failed = ['Invalid data', 'Invalid'] + self.flush_accum_bits() + + def handle_idle(self, ss, es): + # An IDLE period was seen in the UART level. Flush, reset state. + if self.bits_accum and not self.failed: + self.failed = ['Unprocessed data bits', 'Unprocessed'] + if self.bits_accum and self.failed: + ss, es, _ = self.get_ss_es_bits(None) + self.putg(ss, es, [Ann.WARN, self.failed]) + self.reset() + + def handle_break(self, ss, es): + # A BREAK period was seen in the UART level. Warn, reset state. + break_ss, break_es = ss, es + if not self.failed: + self.failed = ['BREAK condition', 'Break'] + # Re-use logic for "annotated bits warning". + self.handle_idle(None, None) + # Unconditionally annotate BREAK as warning. + text = ['BREAK condition', 'Break'] + self.putg(ss, es, [Ann.WARN, text]) + self.reset() + + def decode(self, ss, es, data): + # Implementor's note: Expects DATA bits to arrive before FRAME + # validity. Either of IDLE or BREAK terminates an SBUS message. + ptype, rxtx, pdata = data + if ptype == 'DATA': + _, bits = pdata + self.handle_bits(ss, es, bits) + elif ptype == 'FRAME': + value, valid = pdata + self.handle_frame(ss, es, value, valid) + elif ptype == 'IDLE': + self.handle_idle(ss, es) + elif ptype == 'BREAK': + self.handle_break(ss, es) |