summaryrefslogtreecommitdiff
path: root/decoders
diff options
context:
space:
mode:
Diffstat (limited to 'decoders')
-rw-r--r--decoders/ieee488/__init__.py26
-rw-r--r--decoders/ieee488/pd.py721
2 files changed, 747 insertions, 0 deletions
diff --git a/decoders/ieee488/__init__.py b/decoders/ieee488/__init__.py
new file mode 100644
index 0000000..4240114
--- /dev/null
+++ b/decoders/ieee488/__init__.py
@@ -0,0 +1,26 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2019 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+'''
+This protocol decoder can decode the GPIB (IEEE-488) protocol. Both variants
+of (up to) 16 parallel lines as well as serial communication (IEC bus) are
+supported in this implementation.
+'''
+
+from .pd import Decoder
diff --git a/decoders/ieee488/pd.py b/decoders/ieee488/pd.py
new file mode 100644
index 0000000..5c25c66
--- /dev/null
+++ b/decoders/ieee488/pd.py
@@ -0,0 +1,721 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2016 Rudolf Reuter <reuterru@arcor.de>
+## Copyright (C) 2017 Marcus Comstedt <marcus@mc.pp.se>
+## Copyright (C) 2019 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+# This file was created from earlier implementations of the 'gpib' and
+# the 'iec' protocol decoders. It combines the parallel and the serial
+# transmission variants in a single instance with optional inputs for
+# maximum code re-use.
+
+# TODO
+# - Extend annotations for improved usability.
+# - Keep talkers' data streams on separate annotation rows? Is this useful
+# here at the GPIB level, or shall stacked decoders dispatch these? May
+# depend on how often captures get inspected which involve multiple peers.
+# - Make serial bit annotations optional? Could slow down interactive
+# exploration for long captures (see USB).
+# - Move the inlined Commodore IEC peripherals support to a stacked decoder
+# when more peripherals get added.
+# - SCPI over GPIB may "represent somewhat naturally" here already when
+# text lines are a single run of data at the GPIB layer (each line between
+# the address spec and either EOI or ATN). So a stacked SCPI decoder may
+# only become necessary when the text lines' content shall get inspected.
+
+import sigrokdecode as srd
+from common.srdhelper import bitpack
+
+'''
+OUTPUT_PYTHON format for stacked decoders:
+
+General packet format:
+[<ptype>, <addr>, <pdata>]
+
+This is the list of <ptype>s and their respective <pdata> values:
+
+Raw bits and bytes at the physical transport level:
+ - 'IEC_BIT': <addr> is not applicable, <pdata> is the transport's bit value.
+ - 'GPIB_RAW': <addr> is not applicable, <pdata> is the transport's
+ byte value. Data bytes are in the 0x00-0xff range, command/address
+ bytes are in the 0x100-0x1ff range.
+
+GPIB level byte fields (commands, addresses, pieces of data):
+ - 'COMMAND': <addr> is not applicable, <pdata> is the command's byte value.
+ - 'LISTEN': <addr> is the listener address (0-30), <pdata> is the raw
+ byte value (including the 0x20 offset).
+ - 'TALK': <addr> is the talker address (0-30), <pdata> is the raw byte
+ value (including the 0x40 offset).
+ - 'SECONDARY': <addr> is the secondary address (0-31), <pdata> is the
+ raw byte value (including the 0x60 offset).
+ - 'MSB_SET': <addr> as well as <pdata> are the raw byte value (including
+ the 0x80 offset). This usually does not happen for GPIB bytes with ATN
+ active, but was observed with the IEC bus and Commodore floppy drives,
+ when addressing channels within the device.
+ - 'DATA_BYTE': <addr> is the talker address (when available), <pdata>
+ is the raw data byte (transport layer, ATN inactive).
+
+Extracted payload information (peers and their communicated data):
+ - 'TALK_LISTEN': <addr> is the current talker, <pdata> is the list of
+ current listeners. These updates for the current "connected peers"
+ are sent when the set of peers changes, i.e. after talkers/listeners
+ got selected or deselected. Of course the data only covers what could
+ be gathered from the input data. Some controllers may not explicitly
+ address themselves, or captures may not include an early setup phase.
+ - 'TALKER_BYTES': <addr> is the talker address (when available), <pdata>
+ is the accumulated byte sequence between addressing a talker and EOI,
+ or the next command/address.
+ - 'TALKER_TEXT': <addr> is the talker address (when available), <pdata>
+ is the accumulated text sequence between addressing a talker and EOI,
+ or the next command/address.
+'''
+
+class ChannelError(Exception):
+ pass
+
+def _format_ann_texts(fmts, **args):
+ if not fmts:
+ return None
+ return [fmt.format(**args) for fmt in fmts]
+
+_cmd_table = {
+ # Command codes in the 0x00-0x1f range.
+ 0x01: ['Go To Local', 'GTL'],
+ 0x04: ['Selected Device Clear', 'SDC'],
+ 0x05: ['Parallel Poll Configure', 'PPC'],
+ 0x08: ['Global Execute Trigger', 'GET'],
+ 0x09: ['Take Control', 'TCT'],
+ 0x11: ['Local Lock Out', 'LLO'],
+ 0x14: ['Device Clear', 'DCL'],
+ 0x15: ['Parallel Poll Unconfigure', 'PPU'],
+ 0x18: ['Serial Poll Enable', 'SPE'],
+ 0x19: ['Serial Poll Disable', 'SPD'],
+ # Unknown type of command.
+ None: ['Unknown command 0x{cmd:02x}', 'command 0x{cmd:02x}', 'cmd {cmd:02x}', 'C{cmd_ord:c}'],
+ # Special listener/talker "addresses" (deselecting previous peers).
+ 0x3f: ['Unlisten', 'UNL'],
+ 0x5f: ['Untalk', 'UNT'],
+}
+
+def _is_command(b):
+ # Returns a tuple of booleans (or None when not applicable) whether
+ # the raw GPIB byte is: a command, an un-listen, an un-talk command.
+ if b in range(0x00, 0x20):
+ return True, None, None
+ if b in range(0x20, 0x40) and (b & 0x1f) == 31:
+ return True, True, False
+ if b in range(0x40, 0x60) and (b & 0x1f) == 31:
+ return True, False, True
+ return False, None, None
+
+def _is_listen_addr(b):
+ if b in range(0x20, 0x40):
+ return b & 0x1f
+ return None
+
+def _is_talk_addr(b):
+ if b in range(0x40, 0x60):
+ return b & 0x1f
+ return None
+
+def _is_secondary_addr(b):
+ if b in range(0x60, 0x80):
+ return b & 0x1f
+ return None
+
+def _is_msb_set(b):
+ if b & 0x80:
+ return b
+ return None
+
+def _get_raw_byte(b, atn):
+ # "Decorate" raw byte values for stacked decoders.
+ raw_byte = b
+ if atn:
+ raw_byte |= 0x100
+ return raw_byte
+
+def _get_raw_text(b, atn):
+ return ['{leader}{data:02x}'.format(leader = '/' if atn else '', data = b)]
+
+def _get_command_texts(b):
+ fmts = _cmd_table.get(b, None)
+ known = fmts is not None
+ if not fmts:
+ fmts = _cmd_table.get(None, None)
+ if not fmts:
+ return known, None
+ return known, _format_ann_texts(fmts, cmd = b, cmd_ord = ord('0') + b)
+
+def _get_address_texts(b):
+ laddr = _is_listen_addr(b)
+ taddr = _is_talk_addr(b)
+ saddr = _is_secondary_addr(b)
+ msb = _is_msb_set(b)
+ fmts = None
+ if laddr is not None:
+ fmts = ['Listen {addr:d}', 'L {addr:d}', 'L{addr_ord:c}']
+ addr = laddr
+ elif taddr is not None:
+ fmts = ['Talk {addr:d}', 'T {addr:d}', 'T{addr_ord:c}']
+ addr = taddr
+ elif saddr is not None:
+ fmts = ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
+ addr = saddr
+ elif msb is not None: # For IEC bus compat.
+ fmts = ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
+ addr = msb
+ return _format_ann_texts(fmts, addr = addr, addr_ord = ord('0') + addr)
+
+def _get_data_text(b):
+ # TODO Move the table of ASCII control characters to a common location?
+ # TODO Move the "printable with escapes" logic to a common helper?
+ _control_codes = {
+ 0x00: 'NUL',
+ 0x01: 'SOH',
+ 0x02: 'STX',
+ 0x03: 'ETX',
+ 0x04: 'EOT',
+ 0x05: 'ENQ',
+ 0x06: 'ACK',
+ 0x07: 'BEL',
+ 0x08: 'BS',
+ 0x09: 'TAB',
+ 0x0a: 'LF',
+ 0x0b: 'VT',
+ 0x0c: 'FF',
+ 0x0d: 'CR',
+ 0x0e: 'SO',
+ 0x0f: 'SI',
+ 0x10: 'DLE',
+ 0x11: 'DC1',
+ 0x12: 'DC2',
+ 0x13: 'DC3',
+ 0x14: 'DC4',
+ 0x15: 'NAK',
+ 0x16: 'SYN',
+ 0x17: 'ETB',
+ 0x18: 'CAN',
+ 0x19: 'EM',
+ 0x1a: 'SUB',
+ 0x1b: 'ESC',
+ 0x1c: 'FS',
+ 0x1d: 'GS',
+ 0x1e: 'RS',
+ 0x1f: 'US',
+ }
+ # Yes, exclude 0x7f (DEL) here. It's considered non-printable.
+ if b in range(0x20, 0x7f) and b not in ('[', ']'):
+ return '{:s}'.format(chr(b))
+ elif b in _control_codes:
+ return '[{:s}]'.format(_control_codes[b])
+ # Use a compact yet readable and unambigous presentation for bytes
+ # which contain non-printables. The format that is used here is
+ # compatible with 93xx EEPROM and UART decoders.
+ return '[{:02x}]'.format(b)
+
+(
+ PIN_DIO1, PIN_DIO2, PIN_DIO3, PIN_DIO4,
+ PIN_DIO5, PIN_DIO6, PIN_DIO7, PIN_DIO8,
+ PIN_EOI, PIN_DAV, PIN_NRFD, PIN_NDAC,
+ PIN_IFC, PIN_SRQ, PIN_ATN, PIN_REN,
+ PIN_CLK,
+) = range(17)
+PIN_DATA = PIN_DIO1
+
+(
+ ANN_RAW_BIT, ANN_RAW_BYTE,
+ ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,
+ ANN_EOI,
+ ANN_TEXT,
+ # TODO Want to provide one annotation class per talker address (0-30)?
+ ANN_IEC_PERIPH,
+ ANN_WARN,
+) = range(11)
+
+(
+ BIN_RAW,
+ BIN_DATA,
+ # TODO Want to provide one binary annotation class per talker address (0-30)?
+) = range(2)
+
+class Decoder(srd.Decoder):
+ api_version = 3
+ id = 'ieee488'
+ name = 'IEEE-488'
+ longname = 'General Purpose Interface Bus'
+ desc = 'IEEE-488 General Purpose Interface Bus (GPIB/HPIB or IEC).'
+ license = 'gplv2+'
+ inputs = ['logic']
+ outputs = ['ieee488']
+ tags = ['PC', 'Retro computing']
+ channels = (
+ {'id': 'dio1' , 'name': 'DIO1/DATA',
+ 'desc': 'Data I/O bit 1, or serial data'},
+ )
+ optional_channels = (
+ {'id': 'dio2' , 'name': 'DIO2', 'desc': 'Data I/O bit 2'},
+ {'id': 'dio3' , 'name': 'DIO3', 'desc': 'Data I/O bit 3'},
+ {'id': 'dio4' , 'name': 'DIO4', 'desc': 'Data I/O bit 4'},
+ {'id': 'dio5' , 'name': 'DIO5', 'desc': 'Data I/O bit 5'},
+ {'id': 'dio6' , 'name': 'DIO6', 'desc': 'Data I/O bit 6'},
+ {'id': 'dio7' , 'name': 'DIO7', 'desc': 'Data I/O bit 7'},
+ {'id': 'dio8' , 'name': 'DIO8', 'desc': 'Data I/O bit 8'},
+ {'id': 'eoi', 'name': 'EOI', 'desc': 'End or identify'},
+ {'id': 'dav', 'name': 'DAV', 'desc': 'Data valid'},
+ {'id': 'nrfd', 'name': 'NRFD', 'desc': 'Not ready for data'},
+ {'id': 'ndac', 'name': 'NDAC', 'desc': 'Not data accepted'},
+ {'id': 'ifc', 'name': 'IFC', 'desc': 'Interface clear'},
+ {'id': 'srq', 'name': 'SRQ', 'desc': 'Service request'},
+ {'id': 'atn', 'name': 'ATN', 'desc': 'Attention'},
+ {'id': 'ren', 'name': 'REN', 'desc': 'Remote enable'},
+ {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock'},
+ )
+ options = (
+ {'id': 'iec_periph', 'desc': 'Decode Commodore IEC bus peripherals details',
+ 'default': 'no', 'values': ('no', 'yes')},
+ )
+ annotations = (
+ ('bit', 'IEC bit'),
+ ('raw', 'Raw byte'),
+ ('cmd', 'Command'),
+ ('laddr', 'Listener address'),
+ ('taddr', 'Talker address'),
+ ('saddr', 'Secondary address'),
+ ('data', 'Data byte'),
+ ('eoi', 'EOI'),
+ ('text', 'Talker text'),
+ ('periph', 'IEC bus peripherals'),
+ ('warn', 'Warning'),
+ )
+ annotation_rows = (
+ ('bits', 'IEC bits', (ANN_RAW_BIT,)),
+ ('raws', 'Raw bytes', (ANN_RAW_BYTE,)),
+ ('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)),
+ ('eois', 'EOI', (ANN_EOI,)),
+ ('texts', 'Talker texts', (ANN_TEXT,)),
+ ('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)),
+ ('warns', 'Warnings', (ANN_WARN,)),
+ )
+ binary = (
+ ('raw', 'Raw bytes'),
+ ('data', 'Talker bytes'),
+ )
+
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.curr_raw = None
+ self.curr_atn = None
+ self.curr_eoi = None
+ self.latch_atn = None
+ self.latch_eoi = None
+ self.accu_bytes = []
+ self.accu_text = []
+ self.ss_raw = None
+ self.es_raw = None
+ self.ss_eoi = None
+ self.es_eoi = None
+ self.ss_text = None
+ self.es_text = None
+ self.last_talker = None
+ self.last_listener = []
+ self.last_iec_addr = None
+ self.last_iec_sec = None
+
+ def start(self):
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_bin = self.register(srd.OUTPUT_BINARY)
+ self.out_python = self.register(srd.OUTPUT_PYTHON)
+
+ def putg(self, ss, es, data):
+ self.put(ss, es, self.out_ann, data)
+
+ def putbin(self, ss, es, data):
+ self.put(ss, es, self.out_bin, data)
+
+ def putpy(self, ss, es, ptype, addr, pdata):
+ self.put(ss, es, self.out_python, [ptype, addr, pdata])
+
+ def emit_eoi_ann(self, ss, es):
+ self.putg(ss, es, [ANN_EOI, ['EOI']])
+
+ def emit_bin_ann(self, ss, es, ann_cls, data):
+ self.putbin(ss, es, [ann_cls, bytes(data)])
+
+ def emit_data_ann(self, ss, es, ann_cls, data):
+ self.putg(ss, es, [ann_cls, data])
+
+ def emit_warn_ann(self, ss, es, data):
+ self.putg(ss, es, [ANN_WARN, data])
+
+ def flush_bytes_text_accu(self):
+ if self.accu_bytes and self.ss_text is not None and self.es_text is not None:
+ self.emit_bin_ann(self.ss_text, self.es_text, BIN_DATA, bytearray(self.accu_bytes))
+ self.putpy(self.ss_text, self.es_text, 'TALKER_BYTES', self.last_talker, bytearray(self.accu_bytes))
+ self.accu_bytes = []
+ if self.accu_text and self.ss_text is not None and self.es_text is not None:
+ text = ''.join(self.accu_text)
+ self.emit_data_ann(self.ss_text, self.es_text, ANN_TEXT, [text])
+ self.putpy(self.ss_text, self.es_text, 'TALKER_TEXT', self.last_talker, text)
+ self.accu_text = []
+ self.ss_text = self.es_text = None
+
+ def handle_ifc_change(self, ifc):
+ # Track IFC line for parallel input.
+ # Assertion of IFC de-selects all talkers and listeners.
+ if ifc:
+ self.last_talker = None
+ self.last_listener = []
+
+ def handle_eoi_change(self, eoi):
+ # Track EOI line for parallel and serial input.
+ if eoi:
+ self.ss_eoi = self.samplenum
+ self.curr_eoi = eoi
+ else:
+ self.es_eoi = self.samplenum
+ if self.ss_eoi and self.latch_eoi:
+ self.emit_eoi_ann(self.ss_eoi, self.es_eoi)
+ self.es_text = self.es_eoi
+ self.flush_bytes_text_accu()
+ self.ss_eoi = self.es_eoi = None
+ self.curr_eoi = None
+
+ def handle_atn_change(self, atn):
+ # Track ATN line for parallel and serial input.
+ self.curr_atn = atn
+ if atn:
+ self.flush_bytes_text_accu()
+
+ def handle_iec_periph(self, ss, es, addr, sec, data):
+ # The annotation is optional.
+ if self.options['iec_periph'] != 'yes':
+ return
+ # Void internal state.
+ if addr is None and sec is None and data is None:
+ self.last_iec_addr = None
+ self.last_iec_sec = None
+ return
+ # Grab and evaluate new input.
+ _iec_addr_names = {
+ # TODO Add more items here. See the "Device numbering" section
+ # of the https://en.wikipedia.org/wiki/Commodore_bus page.
+ 8: 'Disk 0',
+ 9: 'Disk 1',
+ }
+ _iec_disk_range = range(8, 16)
+ if addr is not None:
+ self.last_iec_addr = addr
+ name = _iec_addr_names.get(addr, None)
+ if name:
+ self.emit_data_ann(ss, es, ANN_IEC_PERIPH, [name])
+ addr = self.last_iec_addr # Simplify subsequent logic.
+ if sec is not None:
+ # BEWARE! The secondary address is a full byte and includes
+ # the 0x60 offset, to also work when the MSB was set.
+ self.last_iec_sec = sec
+ subcmd, channel = sec & 0xf0, sec & 0x0f
+ channel_ord = ord('0') + channel
+ if addr is not None and addr in _iec_disk_range:
+ subcmd_fmts = {
+ 0x60: ['Reopen {ch:d}', 'Re {ch:d}', 'R{ch_ord:c}'],
+ 0xe0: ['Close {ch:d}', 'Cl {ch:d}', 'C{ch_ord:c}'],
+ 0xf0: ['Open {ch:d}', 'Op {ch:d}', 'O{ch_ord:c}'],
+ }.get(subcmd, None)
+ if subcmd_fmts:
+ texts = _format_ann_texts(subcmd_fmts, ch = channel, ch_ord = channel_ord)
+ self.emit_data_ann(ss, es, ANN_IEC_PERIPH, texts)
+ sec = self.last_iec_sec # Simplify subsequent logic.
+ if data is not None:
+ if addr is None or sec is None:
+ return
+ # TODO Process data depending on peripheral type and channel?
+
+ def handle_data_byte(self):
+ b = self.curr_raw
+ texts = _get_raw_text(b, self.curr_atn)
+ self.emit_data_ann(self.ss_raw, self.es_raw, ANN_RAW_BYTE, texts)
+ self.emit_bin_ann(self.ss_raw, self.es_raw, BIN_RAW, b.to_bytes(1, byteorder='big'))
+ self.putpy(self.ss_raw, self.es_raw, 'GPIB_RAW', None, _get_raw_byte(b, self.curr_atn))
+ if self.curr_atn:
+ ann_cls = None
+ upd_iec = False,
+ py_type = None
+ py_peers = False
+ is_cmd, is_unl, is_unt = _is_command(b)
+ laddr = _is_listen_addr(b)
+ taddr = _is_talk_addr(b)
+ saddr = _is_secondary_addr(b)
+ msb = _is_msb_set(b)
+ if is_cmd:
+ known, texts = _get_command_texts(b)
+ if not known:
+ warn_texts = ['Unknown GPIB command', 'unknown', 'UNK']
+ self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts)
+ ann_cls = ANN_CMD
+ py_type, py_addr = 'COMMAND', None
+ if is_unl:
+ self.last_listener = []
+ py_peers = True
+ if is_unt:
+ self.last_talker = None
+ py_peers = True
+ if is_unl or is_unt:
+ upd_iec = True, None, None, None
+ elif laddr is not None:
+ addr = laddr
+ texts = _get_address_texts(b)
+ ann_cls = ANN_LADDR
+ py_type, py_addr = 'LISTEN', addr
+ if addr == self.last_talker:
+ self.last_talker = None
+ self.last_listener.append(addr)
+ upd_iec = True, addr, None, None
+ py_peers = True
+ elif taddr is not None:
+ addr = taddr
+ texts = _get_address_texts(b)
+ ann_cls = ANN_TADDR
+ py_type, py_addr = 'TALK', addr
+ if addr in self.last_listener:
+ self.last_listener.remove(addr)
+ self.last_talker = addr
+ upd_iec = True, addr, None, None
+ py_peers = True
+ elif saddr is not None:
+ addr = saddr
+ texts = _get_address_texts(b)
+ ann_cls = ANN_SADDR
+ upd_iec = True, None, b, None
+ py_type, py_addr = 'SECONDARY', addr
+ elif msb is not None:
+ # These are not really "secondary addresses", but they
+ # are used by the Commodore IEC bus (floppy channels).
+ texts = _get_address_texts(b)
+ ann_cls = ANN_SADDR
+ upd_iec = True, None, b, None
+ py_type, py_addr = 'MSB_SET', b
+ if ann_cls is not None and texts is not None:
+ self.emit_data_ann(self.ss_raw, self.es_raw, ann_cls, texts)
+ if upd_iec[0]:
+ self.handle_iec_periph(self.ss_raw, self.es_raw, upd_iec[1], upd_iec[2], upd_iec[3])
+ if py_type:
+ self.putpy(self.ss_raw, self.es_raw, py_type, py_addr, b)
+ if py_peers:
+ self.last_listener.sort()
+ self.putpy(self.ss_raw, self.es_raw, 'TALK_LISTEN', self.last_talker, self.last_listener)
+ else:
+ self.accu_bytes.append(b)
+ text = _get_data_text(b)
+ if not self.accu_text:
+ self.ss_text = self.ss_raw
+ self.accu_text.append(text)
+ self.es_text = self.es_raw
+ self.emit_data_ann(self.ss_raw, self.es_raw, ANN_DATA, [text])
+ self.handle_iec_periph(self.ss_raw, self.es_raw, None, None, b)
+ self.putpy(self.ss_raw, self.es_raw, 'DATA_BYTE', self.last_talker, b)
+
+ def handle_dav_change(self, dav, data):
+ if dav:
+ # Data availability starts when the flag goes active.
+ self.ss_raw = self.samplenum
+ self.curr_raw = bitpack(data)
+ self.latch_atn = self.curr_atn
+ self.latch_eoi = self.curr_eoi
+ return
+ # Data availability ends when the flag goes inactive. Handle the
+ # previously captured data byte according to associated flags.
+ self.es_raw = self.samplenum
+ self.handle_data_byte()
+ self.ss_raw = self.es_raw = None
+ self.curr_raw = None
+
+ def inject_dav_phase(self, ss, es, data):
+ # Inspection of serial input has resulted in one raw byte which
+ # spans a given period of time. Pretend we had seen a DAV active
+ # phase, to re-use code for the parallel transmission.
+ self.ss_raw = ss
+ self.curr_raw = bitpack(data)
+ self.latch_atn = self.curr_atn
+ self.latch_eoi = self.curr_eoi
+ self.es_raw = es
+ self.handle_data_byte()
+ self.ss_raw = self.es_raw = None
+ self.curr_raw = None
+
+ def invert_pins(self, pins):
+ # All lines (including data bits!) are low active and thus need
+ # to get inverted to receive their logical state (high active,
+ # regular data bit values). Cope with inputs being optional.
+ return [1 - p if p in (0, 1) else p for p in pins]
+
+ def decode_serial(self, has_clk, has_data_1, has_atn, has_srq):
+ if not has_clk or not has_data_1 or not has_atn:
+ raise ChannelError('IEC bus needs at least ATN and serial CLK and DATA.')
+
+ # This is a rephrased version of decoders/iec/pd.py:decode().
+ # SRQ was not used there either. Magic numbers were eliminated.
+ (
+ STEP_WAIT_READY_TO_SEND,
+ STEP_WAIT_READY_FOR_DATA,
+ STEP_PREP_DATA_TEST_EOI,
+ STEP_CLOCK_DATA_BITS,
+ ) = range(4)
+ step_wait_conds = (
+ [{PIN_ATN: 'f'}, {PIN_DATA: 'l', PIN_CLK: 'h'}],
+ [{PIN_ATN: 'f'}, {PIN_DATA: 'h', PIN_CLK: 'h'}, {PIN_CLK: 'l'}],
+ [{PIN_ATN: 'f'}, {PIN_DATA: 'f'}, {PIN_CLK: 'l'}],
+ [{PIN_ATN: 'f'}, {PIN_CLK: 'e'}],
+ )
+ step = STEP_WAIT_READY_TO_SEND
+ bits = []
+
+ while True:
+
+ # Sample input pin values. Keep DATA/CLK in verbatim form to
+ # re-use 'iec' decoder logic. Turn ATN to positive logic for
+ # easier processing. The data bits get handled during byte
+ # accumulation.
+ pins = self.wait(step_wait_conds[step])
+ data, clk = pins[PIN_DATA], pins[PIN_CLK]
+ atn, = self.invert_pins([pins[PIN_ATN]])
+
+ if self.matched[0]:
+ # Falling edge on ATN, reset step.
+ step = STEP_WAIT_READY_TO_SEND
+
+ if step == STEP_WAIT_READY_TO_SEND:
+ # Don't use self.matched[1] here since we might come from
+ # a step with different conds due to the code above.
+ if data == 0 and clk == 1:
+ # Rising edge on CLK while DATA is low: Ready to send.
+ step = STEP_WAIT_READY_FOR_DATA
+ elif step == STEP_WAIT_READY_FOR_DATA:
+ if data == 1 and clk == 1:
+ # Rising edge on DATA while CLK is high: Ready for data.
+ ss_byte = self.samplenum
+ self.handle_atn_change(atn)
+ if self.curr_eoi:
+ self.handle_eoi_change(False)
+ bits = []
+ step = STEP_PREP_DATA_TEST_EOI
+ elif clk == 0:
+ # CLK low again, transfer aborted.
+ step = STEP_WAIT_READY_TO_SEND
+ elif step == STEP_PREP_DATA_TEST_EOI:
+ if data == 0 and clk == 1:
+ # DATA goes low while CLK is still high, EOI confirmed.
+ self.handle_eoi_change(True)
+ elif clk == 0:
+ step = STEP_CLOCK_DATA_BITS
+ ss_bit = self.samplenum
+ elif step == STEP_CLOCK_DATA_BITS:
+ if self.matched[1]:
+ if clk == 1:
+ # Rising edge on CLK; latch DATA.
+ bits.append(data)
+ elif clk == 0:
+ # Falling edge on CLK; end of bit.
+ es_bit = self.samplenum
+ self.emit_data_ann(ss_bit, es_bit, ANN_RAW_BIT, ['{:d}'.format(bits[-1])])
+ self.putpy(ss_bit, es_bit, 'IEC_BIT', None, bits[-1])
+ ss_bit = self.samplenum
+ if len(bits) == 8:
+ es_byte = self.samplenum
+ self.inject_dav_phase(ss_byte, es_byte, bits)
+ if self.curr_eoi:
+ self.handle_eoi_change(False)
+ step = STEP_WAIT_READY_TO_SEND
+
+ def decode_parallel(self, has_data_n, has_dav, has_atn, has_eoi, has_srq):
+
+ if False in has_data_n or not has_dav or not has_atn:
+ raise ChannelError('IEEE-488 needs at least ATN and DAV and eight DIO lines.')
+ has_ifc = self.has_channel(PIN_IFC)
+
+ # Capture data lines at the falling edge of DAV, process their
+ # values at rising DAV edge (when data validity ends). Also make
+ # sure to start inspection when the capture happens to start with
+ # low signal levels, i.e. won't include the initial falling edge.
+ # Scan for ATN/EOI edges as well (including the trick which works
+ # around initial pin state).
+ # Map low-active physical transport lines to positive logic here,
+ # to simplify logical inspection/decoding of communicated data,
+ # and to avoid redundancy and inconsistency in later code paths.
+ waitcond = []
+ idx_dav = len(waitcond)
+ waitcond.append({PIN_DAV: 'l'})
+ idx_atn = len(waitcond)
+ waitcond.append({PIN_ATN: 'l'})
+ idx_eoi = None
+ if has_eoi:
+ idx_eoi = len(waitcond)
+ waitcond.append({PIN_EOI: 'l'})
+ idx_ifc = None
+ if has_ifc:
+ idx_ifc = len(waitcond)
+ waitcond.append({PIN_IFC: 'l'})
+ while True:
+ pins = self.wait(waitcond)
+ pins = self.invert_pins(pins)
+
+ # BEWARE! Order of evaluation does matter. For low samplerate
+ # captures, many edges fall onto the same sample number. So
+ # we process active edges of flags early (before processing
+ # data bits), and inactive edges late (after data got processed).
+ if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1:
+ self.handle_ifc_change(pins[PIN_IFC])
+ if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1:
+ self.handle_eoi_change(pins[PIN_EOI])
+ if self.matched[idx_atn] and pins[PIN_ATN] == 1:
+ self.handle_atn_change(pins[PIN_ATN])
+ if self.matched[idx_dav]:
+ self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1])
+ if self.matched[idx_atn] and pins[PIN_ATN] == 0:
+ self.handle_atn_change(pins[PIN_ATN])
+ if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0:
+ self.handle_eoi_change(pins[PIN_EOI])
+ if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0:
+ self.handle_ifc_change(pins[PIN_IFC])
+
+ waitcond[idx_dav][PIN_DAV] = 'e'
+ waitcond[idx_atn][PIN_ATN] = 'e'
+ if has_eoi:
+ waitcond[idx_eoi][PIN_EOI] = 'e'
+ if has_ifc:
+ waitcond[idx_ifc][PIN_IFC] = 'e'
+
+ def decode(self):
+ # The decoder's boilerplate declares some of the input signals as
+ # optional, but only to support both serial and parallel variants.
+ # The CLK signal discriminates the two. For either variant some
+ # of the "optional" signals are not really optional for proper
+ # operation of the decoder. Check these conditions here.
+ has_clk = self.has_channel(PIN_CLK)
+ has_data_1 = self.has_channel(PIN_DIO1)
+ has_data_n = [bool(self.has_channel(pin) for pin in range(PIN_DIO1, PIN_DIO8 + 1))]
+ has_dav = self.has_channel(PIN_DAV)
+ has_atn = self.has_channel(PIN_ATN)
+ has_eoi = self.has_channel(PIN_EOI)
+ has_srq = self.has_channel(PIN_SRQ)
+ if has_clk:
+ self.decode_serial(has_clk, has_data_1, has_atn, has_srq)
+ else:
+ self.decode_parallel(has_data_n, has_dav, has_atn, has_eoi, has_srq)