diff options
Diffstat (limited to 'decoders/dmx512')
-rw-r--r-- | decoders/dmx512/__init__.py | 25 | ||||
-rw-r--r-- | decoders/dmx512/pd.py | 170 |
2 files changed, 0 insertions, 195 deletions
diff --git a/decoders/dmx512/__init__.py b/decoders/dmx512/__init__.py deleted file mode 100644 index b5e5783..0000000 --- a/decoders/dmx512/__init__.py +++ /dev/null @@ -1,25 +0,0 @@ -## -## This file is part of the libsigrokdecode project. -## -## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de> -## -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, see <http://www.gnu.org/licenses/>. -## - -''' -DMX512 (Digital MultipleX 512) is a protocol based on RS485, used to control -professional lighting fixtures. -''' - -from .pd import Decoder diff --git a/decoders/dmx512/pd.py b/decoders/dmx512/pd.py deleted file mode 100644 index 9ef82d7..0000000 --- a/decoders/dmx512/pd.py +++ /dev/null @@ -1,170 +0,0 @@ -## -## This file is part of the libsigrokdecode project. -## -## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de> -## -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, see <http://www.gnu.org/licenses/>. -## - -import sigrokdecode as srd - -class Decoder(srd.Decoder): - api_version = 3 - id = 'dmx512' - name = 'DMX512' - longname = 'Digital MultipleX 512' - desc = 'Digital MultipleX 512 (DMX512) lighting protocol.' - license = 'gplv2+' - inputs = ['logic'] - outputs = [] - tags = ['Embedded/industrial', 'Lighting'] - channels = ( - {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'}, - ) - annotations = ( - ('bit', 'Bit'), - ('break', 'Break'), - ('mab', 'Mark after break'), - ('startbit', 'Start bit'), - ('stopbit', 'Stop bit'), - ('startcode', 'Start code'), - ('channel', 'Channel'), - ('interframe', 'Interframe'), - ('interpacket', 'Interpacket'), - ('data', 'Data'), - ('error', 'Error'), - ) - annotation_rows = ( - ('bits', 'Bits', (0, 3, 4)), - ('data-vals', 'Data', (9,)), - ('logical-vals', 'Logical', (1, 2, 5, 6, 7, 8)), - ('errors', 'Errors', (10,)), - ) - - def __init__(self): - self.reset() - - def reset(self): - self.samplerate = None - self.sample_usec = None - self.run_start = -1 - self.run_bit = 0 - self.state = 'FIND BREAK' - - def start(self): - self.out_ann = self.register(srd.OUTPUT_ANN) - - def metadata(self, key, value): - if key == srd.SRD_CONF_SAMPLERATE: - self.samplerate = value - self.sample_usec = 1 / value * 1000000 - self.skip_per_bit = int(4 / self.sample_usec) - - def putr(self, data): - self.put(self.run_start, self.samplenum, self.out_ann, data) - - def decode(self): - if not self.samplerate: - raise SamplerateError('Cannot decode without samplerate.') - while True: - # Seek for an interval with no state change with a length between - # 88 and 1000000 us (BREAK). - if self.state == 'FIND BREAK': - (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'}) - runlen = (self.samplenum - self.run_start) * self.sample_usec - if runlen > 88 and runlen < 1000000: - self.putr([1, ['Break']]) - self.bit_break = self.run_bit - self.state = 'MARK MAB' - self.channel = 0 - elif runlen >= 1000000: - # Error condition. - self.putr([10, ['Invalid break length']]) - self.run_bit = dmx - self.run_start = self.samplenum - # Directly following the BREAK is the MARK AFTER BREAK. - elif self.state == 'MARK MAB': - (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'}) - self.putr([2, ['MAB']]) - self.state = 'READ BYTE' - self.channel = 0 - self.bit = 0 - self.aggreg = dmx - self.run_start = self.samplenum - # Mark and read a single transmitted byte - # (start bit, 8 data bits, 2 stop bits). - elif self.state == 'READ BYTE': - (dmx,) = self.wait() - self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit - self.aggreg += dmx - if self.samplenum != self.next_sample: - continue - bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1 - - if self.bit == 0: - self.byte = 0 - self.putr([3, ['Start bit']]) - if bit_value != 0: - # (Possibly) invalid start bit, mark but don't fail. - self.put(self.samplenum, self.samplenum, - self.out_ann, [10, ['Invalid start bit']]) - elif self.bit >= 9: - self.put(self.samplenum - self.skip_per_bit, - self.samplenum, self.out_ann, [4, ['Stop bit']]) - if bit_value != 1: - # Invalid stop bit, mark. - self.put(self.samplenum, self.samplenum, - self.out_ann, [10, ['Invalid stop bit']]) - if self.bit == 10: - # On invalid 2nd stop bit, search for new break. - self.run_bit = dmx - self.state = 'FIND BREAK' - else: - # Label and process one bit. - self.put(self.samplenum - self.skip_per_bit, - self.samplenum, self.out_ann, [0, [str(bit_value)]]) - self.byte |= bit_value << (self.bit - 1) - - # Label a complete byte. - if self.bit == 10: - if self.channel == 0: - d = [5, ['Start code']] - else: - d = [6, ['Channel ' + str(self.channel)]] - self.put(self.run_start, self.next_sample, self.out_ann, d) - self.put(self.run_start + self.skip_per_bit, - self.next_sample - 2 * self.skip_per_bit, - self.out_ann, [9, [str(self.byte) + ' / ' + \ - str(hex(self.byte))]]) - # Continue by scanning the IFT. - self.channel += 1 - self.run_start = self.samplenum - self.run_bit = dmx - self.state = 'MARK IFT' - - self.aggreg = dmx - self.bit += 1 - # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets. - elif self.state == 'MARK IFT': - (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'}) - if self.channel > 512: - self.putr([8, ['Interpacket']]) - self.state = 'FIND BREAK' - self.run_bit = dmx - self.run_start = self.samplenum - else: - self.putr([7, ['Interframe']]) - self.state = 'READ BYTE' - self.bit = 0 - self.run_start = self.samplenum |