summaryrefslogtreecommitdiff
path: root/decoders/lfast/pd.py
blob: 5cf99b6ce48ab007b6ddc089c743596231447d33 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2020 Soeren Apel <soeren@apelpie.net>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##

import sigrokdecode as srd
from common.srdhelper import bitpack
import decimal

'''
OUTPUT_PYTHON format:

[<data>] where <data> is the payload contained between the LFAST header and
the LFAST stop bit. It's an array of bytes. 
'''

# See tc27xD_um_v2.2.pdf, Table 20-10
payload_sizes = {
    0b000: '8 bit',
    0b001: '32 bit',
    0b010: '64 bit',
    0b011: '96 bit',
    0b100: '128 bit',
    0b101: '256 bit',
    0b110: '512 bit',
    0b111: '288 bit'
}

# See tc27xD_um_v2.2.pdf, Table 20-10
payload_byte_sizes = {
    0b000: 1,
    0b001: 4,
    0b010: 8,
    0b011: 12,
    0b100: 16,
    0b101: 32,
    0b110: 64,
    0b111: 36
}

# See tc27xD_um_v2.2.pdf, Table 20-11
channel_types = {
    0b0000: 'Interface Control / PING (32 bit)',
    0b0001: 'Unsolicited Status (32 bit)',
    0b0010: 'Slave Interface Control / Read',
    0b0011: 'CTS Transfer',
    0b0100: 'Data Channel A',
    0b0101: 'Data Channel B',
    0b0110: 'Data Channel C',
    0b0111: 'Data Channel D',
    0b1000: 'Data Channel E',
    0b1001: 'Data Channel F',
    0b1010: 'Data Channel G',
    0b1011: 'Data Channel H',
    0b1100: 'Reserved',
    0b1101: 'Reserved',
    0b1110: 'Reserved',
    0b1111: 'Reserved',
}

# See tc27xD_um_v2.2.pdf, Table 20-12
control_payloads = {
    0x00: 'PING',
    0x01: 'Reserved',
    0x02: 'Slave interface clock multiplier start',
    0x04: 'Slave interface clock multiplier stop',
    0x08: 'Use 5 MBaud for M->S',
    0x10: 'Use 320 MBaud for M->S',
    0x20: 'Use 5 MBaud for S->M',
    0x40: 'Use 20 MBaud for S->M (needs 20 MHz SysClk)',
    0x80: 'Use 320 MBaud for S->M',
    0x31: 'Enable slave interface transmitter',
    0x32: 'Disable slave interface transmitter',
    0x34: 'Enable clock test mode',
    0x38: 'Disable clock test mode and payload loopback',
    0xFF: 'Enable payload loopback',
}


ann_bit, ann_sync, ann_header_pl_size, ann_header_ch_type, ann_header_cts, \
    ann_payload, ann_control_data, ann_sleepbit, ann_warning = range(9)
state_sync, state_header, state_payload, state_sleepbit = range(4)

class Decoder(srd.Decoder):
    api_version = 3
    id = 'lfast'
    name = 'LFAST'
    longname = 'NXP LFAST interface'
    desc = 'Differential high-speed P2P interface'
    license = 'gplv2+'
    inputs = ['logic']
    outputs = ['lfast']
    tags = ['Embedded/industrial']
    channels = (
        {'id': 'data', 'name': 'Data', 'desc': 'TXP or RXP'},
    )
    annotations = (
        ('bit', 'Bits'),
        ('sync', 'Sync Pattern'),
        ('header_pl_size', 'Payload Size'),
        ('header_ch_type', 'Logical Channel Type'),
        ('header_cts', 'Clear To Send'),
        ('payload', 'Payload'),
        ('ctrl_data', 'Control Data'),
        ('sleep', 'Sleep Bit'),
        ('warning', 'Warning'),
    )
    annotation_rows = (
        ('bits', 'Bits', (ann_bit,)),
        ('fields', 'Fields', (ann_sync, ann_header_pl_size, ann_header_ch_type,
            ann_header_cts, ann_payload, ann_control_data, ann_sleepbit,)),
        ('warnings', 'Warnings', (ann_warning,)),
    )

    def __init__(self):
        decimal.getcontext().rounding = decimal.ROUND_HALF_UP
        self.reset()

    def reset(self):
        self.ss = self.es = 0
        self.ss_payload = self.es_payload = 0
        self.bits = []
        self.payload = []
        self.payload_size = 0  # Expected number of bytes, as read from header
        self.bit_len = 0       # Length of one bit time, in samples
        self.timeout = 0       # desired timeout for next edge, in samples
        self.ch_type_id = 0    # ID of channel type
        self.state = state_sync

    def metadata(self, key, value):
        pass

    def start(self):
        self.out_python = self.register(srd.OUTPUT_PYTHON)
        self.out_ann = self.register(srd.OUTPUT_ANN)

    def put_ann(self, ss, es, ann_class, value):
        self.put(ss, es, self.out_ann, [ann_class, value])

    def put_payload(self):
        self.put(self.ss_payload, self.es_payload, self.out_python, self.payload)

    def handle_sync(self):
        if len(self.bits) == 1:
            self.ss_sync = self.ss_bit

        if len(self.bits) == 16:
            value = bitpack(self.bits)
            if value == 0xA84B:
                self.put_ann(self.ss_sync, self.es_bit, ann_sync, ['Sync OK'])
            else:
                self.put_ann(self.ss_sync, self.es_bit, ann_warning, ['Wrong Sync Value: {:02X}'.format(value)])

            self.bits = []
            self.state = state_header

    def handle_header(self):
        if len(self.bits) == 1:
            self.ss_header = self.ss_bit

        if len(self.bits) == 8:
            # See tc27xD_um_v2.2.pdf, Figure 20-47, for the header structure
            bit_len = (self.es_bit - self.ss_header) / 8
            value = bitpack(self.bits)

            ss = self.ss_header
            es = ss + 3 * bit_len 
            size_id = (value & 0xE0) >> 5
            size = payload_sizes.get(size_id)
            self.payload_size = payload_byte_sizes.get(size_id) 
            self.put_ann(int(ss), int(es), ann_header_pl_size, [size])

            ss = es
            es = ss + 4 * bit_len 
            self.ch_type_id = (value & 0x1E) >> 1
            ch_type = channel_types.get(self.ch_type_id)
            self.put_ann(int(ss), int(es), ann_header_ch_type, [ch_type])

            ss = es
            es = ss + bit_len 
            cts = value & 0x01 
            self.put_ann(int(ss), int(es), ann_header_cts, ['{}'.format(cts)])

            self.bits = []
            self.state = state_payload

    def handle_payload(self):
        if len(self.bits) == 1:
            self.ss_byte = self.ss_bit
            if self.ss_payload == 0:
                self.ss_payload = self.ss_bit

        if len(self.bits) == 8:
            value = bitpack(self.bits)
            value_hex = '{:02X}'.format(value)
            
            # Control transfers have no SIPI payload, show them as control transfers
            # Check the channel_types list for the meaning of the magic values 
            if (self.ch_type_id >= 0b0100) and (self.ch_type_id <= 0b1011):
                self.put_ann(self.ss_byte, self.es_bit, ann_payload, [value_hex])
            else:
                # Control transfers are 8-bit transfers, so only evaluate the first byte
                if len(self.payload) == 0:
                    ctrl_data = control_payloads.get(value, value_hex)
                    self.put_ann(self.ss_byte, self.es_bit, ann_control_data, [ctrl_data])
                else:
                    self.put_ann(self.ss_byte, self.es_bit, ann_control_data, [value_hex])

            self.bits = []
            self.payload.append(value)
            self.es_payload = self.es_bit
            
        self.timeout = int((self.payload_size - len(self.payload)) * 8 * self.bit_len)
        
        if (len(self.payload) == self.payload_size):
            self.timeout = int(1.4 * self.bit_len)
            self.bits = []
            self.state = state_sleepbit

    def handle_sleepbit(self):
        if len(self.bits) == 0:
            self.put_ann(self.ss_bit, self.es_bit, ann_sleepbit, ['No LVDS sleep mode request', 'No sleep', 'N'])
        elif len(self.bits) > 1:
            self.put_ann(self.ss_bit, self.es_bit, ann_warning, ['Expected only the sleep bit, got {} bits instead'.format(len(self.bits))])
        else:
            if self.bits[0] == 1:
                self.put_ann(self.ss_bit, self.es_bit, ann_sleepbit, ['LVDS sleep mode request', 'Sleep', 'Y'])
            else:
                self.put_ann(self.ss_bit, self.es_bit, ann_sleepbit, ['No LVDS sleep mode request', 'No sleep', 'N'])

        # We only send the payload out if this is an actual data transfer;
        # check the channel_types list for the meaning of the magic values 
        if (self.ch_type_id >= 0b0100) and (self.ch_type_id <= 0b1011):
            if len(self.payload) > 0:
                self.put_payload()

        self.reset()

    def decode(self):
        while True:
            if self.timeout == 0:
                rising_edge, = self.wait({0: 'e'})
            else:
                rising_edge, = self.wait([{0: 'e'}, {'skip': self.timeout}])

            # If this is the first edge, we only update ss
            if self.ss == 0:
                self.ss = self.samplenum
                continue
            
            self.es = self.samplenum

            if int(self.es - self.ss) == 0:
                continue 
            
            # Check for the sleep bit if this is a timeout condition
            if (self.timeout > 0) and (self.es - self.ss == self.timeout):
                rising_edge = ~rising_edge
                if self.state == state_sleepbit:
                    self.handle_sleepbit()

            # We use the first bit to deduce the bit length
            if self.bit_len == 0:
                self.bit_len = self.es - self.ss

            # Determine number of bits covered by this edge
            bit_count = (self.es - self.ss) / self.bit_len
            bit_count = int(decimal.Decimal(bit_count).to_integral_value())
            
            if bit_count == 0:
                self.put_ann(self.ss, self.es, ann_warning, ['Bit time too short'])
                self.reset()
                continue
            
            bit_value = '0' if rising_edge else '1'

            divided_len = (self.es - self.ss) / bit_count
            for i in range(bit_count):
                self.ss_bit = int(self.ss + i * divided_len)
                self.es_bit = int(self.ss_bit + divided_len)
                self.put_ann(self.ss_bit, self.es_bit, ann_bit, [bit_value])

                # Place the new bit at the front of the bit list
                self.bits.insert(0, (0 if rising_edge else 1))

                if self.state == state_sync:
                    self.handle_sync()
                elif self.state == state_header:
                    self.handle_header()
                elif self.state == state_payload:
                    self.handle_payload()
                elif self.state == state_sleepbit:
                    self.handle_sleepbit()
                    break

            # Only update ss if we didn't just perform a reset
            if self.ss > 0:
                self.ss = self.samplenum