1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
"""
OUTPUT_PYTHON format:
Packet:
(<ptype>, <pdata>)
This is the list of <ptype> codes and their respective <pdata> values:
- 'HEADER': The data is the header byte's value.
- 'PROPORTIONAL': The data is a tuple of the channel number (1-based)
and the channel's value.
- 'DIGITAL': The data is a tuple of the channel number (1-based)
and the channel's value.
- 'FLAG': The data is a tuple of the flag's name, and the flag's value.
- 'FOOTER': The data is the footer byte's value.
"""
import sigrokdecode as srd
from common.srdhelper import bitpack_lsb
class Ann:
HEADER, PROPORTIONAL, DIGITAL, FRAME_LOST, FAILSAFE, FOOTER, \
WARN = range(7)
FLAG_LSB = FRAME_LOST
class Decoder(srd.Decoder):
api_version = 3
id = 'sbus_futaba'
name = 'SBUS (Futaba)'
longname = 'Futaba SBUS (Serial bus)'
desc = 'Serial bus for hobby remote control by Futaba'
license = 'gplv2+'
inputs = ['uart']
outputs = ['sbus_futaba']
tags = ['Remote Control']
options = (
{'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0},
{'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047},
)
annotations = (
('header', 'Header'),
('proportional', 'Proportional'),
('digital', 'Digital'),
('framelost', 'Frame Lost'),
('failsafe', 'Failsafe'),
('footer', 'Footer'),
('warning', 'Warning'),
)
annotation_rows = (
('framing', 'Framing', (Ann.HEADER, Ann.FOOTER,
Ann.FRAME_LOST, Ann.FAILSAFE)),
('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)),
('warnings', 'Warnings', (Ann.WARN,)),
)
def __init__(self):
self.bits_accum = []
self.sent_fields = None
self.msg_complete = None
self.failed = None
self.reset()
def reset(self):
self.bits_accum.clear()
self.sent_fields = 0
self.msg_complete = False
self.failed = None
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_py = self.register(srd.OUTPUT_PYTHON)
def putg(self, ss, es, data):
# Put a graphical annotation.
self.put(ss, es, self.out_ann, data)
def putpy(self, ss, es, data):
# Pass Python to upper layers.
self.put(ss, es, self.out_py, data)
def get_ss_es_bits(self, bitcount):
# Get start/end times, and bit values of given length.
# Gets all remaining data when 'bitcount' is None.
if bitcount is None:
bitcount = len(self.bits_accum)
if len(self.bits_accum) < bitcount:
return None, None, None
bits = self.bits_accum[:bitcount]
self.bits_accum = self.bits_accum[bitcount:]
ss, es = bits[0][1], bits[-1][2]
bits = [b[0] for b in bits]
return ss, es, bits
def flush_accum_bits(self):
# Valid data was queued. See if we got full SBUS fields so far.
# Annotate them early, cease inspection of failed messages. The
# implementation is phrased to reduce the potential for clipboard
# errors: 'upto' is the next supported field count, 'want' is one
# field's bit count. Grab as many as we find in an invocation.
upto = 0
if self.failed:
return
# Annotate the header byte. Not seeing the expected bit pattern
# emits a warning annotation, but by design won't fail the SBUS
# message. It's considered more useful to present the channels'
# values instead. The warning still raises awareness.
upto += 1
want = 8
while self.sent_fields < upto:
if len(self.bits_accum) < want:
return
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['0x{:02x}'.format(value)]
self.putg(ss, es, [Ann.HEADER, text])
if value != 0x0f:
text = ['Unexpected header', 'Header']
self.putg(ss, es, [Ann.WARN, text])
self.putpy(ss, es, ['HEADER', value])
self.sent_fields += 1
# Annotate the proportional channels' data. Check for user
# provided value range violations. Channel numbers are in
# the 1..18 range (1-based).
upto += 16
want = 11
while self.sent_fields < upto:
if len(self.bits_accum) < want:
return
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['{:d}'.format(value)]
self.putg(ss, es, [Ann.PROPORTIONAL, text])
if value < self.options['prop_val_min']:
text = ['Low proportional value', 'Low value', 'Low']
self.putg(ss, es, [Ann.WARN, text])
if value > self.options['prop_val_max']:
text = ['High proportional value', 'High value', 'High']
self.putg(ss, es, [Ann.WARN, text])
idx = self.sent_fields - (upto - 16)
ch_nr = 1 + idx
self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)])
self.sent_fields += 1
# Annotate the digital channels' data.
upto += 2
want = 1
while self.sent_fields < upto:
if len(self.bits_accum) < want:
return
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['{:d}'.format(value)]
self.putg(ss, es, [Ann.DIGITAL, text])
idx = self.sent_fields - (upto - 2)
ch_nr = 17 + idx
self.putpy(ss, es, ['DIGITAL', (ch_nr, value)])
self.sent_fields += 1
# Annotate the flags' state. Index starts from LSB.
flag_names = ['framelost', 'failsafe', 'msb']
upto += 2
want = 1
while self.sent_fields < upto:
if len(self.bits_accum) < want:
return
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['{:d}'.format(value)]
idx = self.sent_fields - (upto - 2)
cls = Ann.FLAG_LSB + idx
self.putg(ss, es, [cls, text])
flg_name = flag_names[idx]
self.putpy(ss, es, ['FLAG', (flg_name, value)])
self.sent_fields += 1
# Warn when flags' padding (bits [7:4]) is unexpexted.
upto += 1
want = 4
while self.sent_fields < upto:
if len(self.bits_accum) < want:
return
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
if value != 0x0:
text = ['Unexpected MSB flags', 'Flags']
self.putg(ss, es, [Ann.WARN, text])
flg_name = flag_names[-1]
self.putpy(ss, es, ['FLAG', (flg_name, value)])
self.sent_fields += 1
# Annotate the footer byte. Warn when unexpected.
upto += 1
want = 8
while self.sent_fields < upto:
if len(self.bits_accum) < want:
return
ss, es, bits = self.get_ss_es_bits(want)
value = bitpack_lsb(bits)
text = ['0x{:02x}'.format(value)]
self.putg(ss, es, [Ann.FOOTER, text])
if value != 0x00:
text = ['Unexpected footer', 'Footer']
self.putg(ss, es, [Ann.WARN, text])
self.putpy(ss, es, ['FOOTER', value])
self.sent_fields += 1
# Check for the completion of an SBUS message. Warn when more
# UART data is seen after the message. Defer the warning until
# more bits were collected, flush at next IDLE or BREAK, which
# spans all unprocessed data, and improves perception.
if self.sent_fields >= upto:
self.msg_complete = True
if self.msg_complete and self.bits_accum:
self.failed = ['Excess data bits', 'Excess']
def handle_bits(self, ss, es, bits):
# UART data bits were seen. Store them, validity is yet unknown.
self.bits_accum.extend(bits)
def handle_frame(self, ss, es, value, valid):
# A UART frame became complete. Get its validity. Process its bits.
if not valid:
self.failed = ['Invalid data', 'Invalid']
self.flush_accum_bits()
def handle_idle(self, ss, es):
# An IDLE period was seen in the UART level. Flush, reset state.
if self.bits_accum and not self.failed:
self.failed = ['Unprocessed data bits', 'Unprocessed']
if self.bits_accum and self.failed:
ss, es, _ = self.get_ss_es_bits(None)
self.putg(ss, es, [Ann.WARN, self.failed])
self.reset()
def handle_break(self, ss, es):
# A BREAK period was seen in the UART level. Warn, reset state.
break_ss, break_es = ss, es
if not self.failed:
self.failed = ['BREAK condition', 'Break']
# Re-use logic for "annotated bits warning".
self.handle_idle(None, None)
# Unconditionally annotate BREAK as warning.
text = ['BREAK condition', 'Break']
self.putg(ss, es, [Ann.WARN, text])
self.reset()
def decode(self, ss, es, data):
# Implementor's note: Expects DATA bits to arrive before FRAME
# validity. Either of IDLE or BREAK terminates an SBUS message.
ptype, rxtx, pdata = data
if ptype == 'DATA':
_, bits = pdata
self.handle_bits(ss, es, bits)
elif ptype == 'FRAME':
value, valid = pdata
self.handle_frame(ss, es, value, valid)
elif ptype == 'IDLE':
self.handle_idle(ss, es)
elif ptype == 'BREAK':
self.handle_break(ss, es)
|