summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Anderson <ejona86@gmail.com>2023-08-13 07:37:17 -0700
committerEric Anderson <ejona86@gmail.com>2023-08-13 07:37:17 -0700
commita92839049b3de6f2cd8c1d95c583bd45ec027c73 (patch)
treea6d09528e01b2ba640c5e6e3e11e31003b8e5e6d
parentdf3a4a3bd1763324765f53932d878525a4a20102 (diff)
downloadlibsigrokdecode-floppy.tar.gz
libsigrokdecode-floppy.zip
Add mfm and floppy decodersfloppy
The "MFM" decoder can handle FM, MFM, MMFM, GCR, and could be extended to support RLL. I have only tested with FM and MFM. These are all related encodings but "MFM" is the most distinctive name so it is being used to describe the family. The family of encodings was used in magnetic tape storage, but the focus here is floppies with a side of hard drives. The MFM decoder is pretty simple, and doesn't attempt to separate clock/data or align bytes. The method of doing so varies per sector format so that responsibility is left to the consumer, which makes configuration for the user easier. The decoder also doesn't try to act as a PLL. Someone else can enhance it, but it currently seems to work fine even on some early-1980s floppy disks/drives. The Floppy decoder can also be used with ST506-style hard drives... which are not floppies. But the encoding was very similar when MFM was in use. I have not tested an ST506 using IBM-compatible MFM formatting, but I have tested a Micropolis-encoded HDD which is a slight variation of the MFM FDD format.
-rw-r--r--decoders/floppy/__init__.py28
-rw-r--r--decoders/floppy/pd.py497
-rw-r--r--decoders/mfm/__init__.py38
-rw-r--r--decoders/mfm/pd.py169
4 files changed, 732 insertions, 0 deletions
diff --git a/decoders/floppy/__init__.py b/decoders/floppy/__init__.py
new file mode 100644
index 0000000..669ed99
--- /dev/null
+++ b/decoders/floppy/__init__.py
@@ -0,0 +1,28 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2023 Eric Anderson <ejona86@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+'''
+Decodes floppy disk sectors from MFM data stream. This decoder stacks on the
+MFM decoder. There are many format types and the decoder only supports some,
+but IBM-compatible is compatible and is by far the most common.
+
+The decoder verifies CRC/checksums (unless stated otherwise in its output).
+'''
+
+from .pd import Decoder
diff --git a/decoders/floppy/pd.py b/decoders/floppy/pd.py
new file mode 100644
index 0000000..24229df
--- /dev/null
+++ b/decoders/floppy/pd.py
@@ -0,0 +1,497 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2023 Eric Anderson <ejona86@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+import sigrokdecode as srd
+
+ANN_PAYLOAD = 0
+ANN_ERR = 1
+ANN_FORMAT = 2
+ANN_DESC = 3
+ANN_BIT = 4
+ANN_BITIRR = 5
+
+
+class SamplerateError(Exception):
+ pass
+
+
+def crc16_ccitt_gen(crc):
+ for i in range(8):
+ b = crc & 0x8000
+ crc = (crc & 0x7fff) << 1
+ if b:
+ crc ^= 0x1021
+ return crc
+
+
+crc16_ccitt_table = tuple(crc16_ccitt_gen(i << 8) for i in range(256))
+
+
+class Format:
+ sector_anno = ()
+ sector_desc = ()
+ sync_pattern = ()
+ report_encoding_errors = True
+ fm = False
+ mfm = False
+
+ def clear(self):
+ pass
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ pass
+
+
+class IbmBase(Format):
+ iam_anno = ()
+ iam_desc = ()
+ idam_anno = ()
+ idam_desc = ()
+
+ def __init__(self):
+ self.ibm_sector_len = 128
+ self.clear()
+
+ def clear(self):
+ self.sector_anno = self.__class__.sector_anno
+ self.sector_desc = self.__class__.sector_desc
+ self.crc = 0
+
+ def dam_anno(self, data_len):
+ raise AssertionError('Unimplemented')
+
+ def dam_desc(self, data_len, am_desc):
+ raise AssertionError('Unimplemented')
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if (self.sector_desc == self.__class__.sector_desc and
+ decoder.bytes_remaining == 1):
+ if byte == 0xf8:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len,
+ ['Deleted Data Address Mark', 'Deleted DAM', 'DDAM'])
+ elif byte == 0xf9 or byte == 0xfa:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len,
+ ['Custom Data Address Mark', 'Custom DAM', '? DAM'])
+ elif byte == 0xfb:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len, ['Data Address Mark', 'DAM'])
+ elif byte == 0xfc:
+ self.sector_anno = self.iam_anno
+ self.sector_desc = self.iam_desc
+ elif byte == 0xfe:
+ self.sector_anno = self.idam_anno
+ self.sector_desc = self.idam_desc
+ decoder.bytes_remaining = len(self.sector_anno)-byte_in_sector
+ if (self.sector_desc == self.__class__.idam_desc and
+ decoder.bytes_remaining == 3):
+ self.ibm_sector_len = 2**(byte+7)
+
+ if byte_in_sector == 0:
+ self.crc = 0xffff
+ check = self.crc
+ check = crc16_ccitt_table[(check >> 8) ^ byte] ^ ((check & 0xff) << 8)
+ if (decoder.bytes_remaining == 1 and check != 0 and
+ self.sector_anno not in (
+ self.__class__.sector_anno, self.iam_anno)):
+ decoder.put(decoder.section_start, bit_end, decoder.out_ann,
+ [ANN_ERR, ['Bad CRC (0x%02x)' % check, 'Bad CRC']])
+ self.crc = check
+
+
+class IbmFm(IbmBase):
+ sector_anno = (ANN_FORMAT,)
+ sector_desc = (['Unknown Mark', '? Mark'],)
+ sync_pattern = (
+ 0xaaaaf56a, # data: f8, clock: c7
+ 0xaaaaf56b, # data: f9, clock: c7
+ 0xaaaaf56e, # data: fa, clock: c7
+ 0xaaaaf56f, # data: fb, clock: c7
+ 0xaaaaf57e, # data: fe, clock: c7
+
+ 0xaaaaf77a) # data: fc, clock: d7
+ fm = True
+
+ iam_anno = (ANN_FORMAT,)
+ iam_desc = (['Index Mark', 'IAM'],)
+ idam_anno = 7*(ANN_FORMAT,)
+ idam_desc = (
+ (['ID Address Mark', 'IDAM'],) +
+ (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'],
+ ['Length', 'Len']) +
+ 2 * (['CRC'],))
+
+ def dam_anno(self, data_len):
+ return (ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,)
+
+ def dam_desc(self, data_len, am_desc):
+ return (
+ (am_desc,) +
+ data_len * (['Data'],) +
+ 2 * (['CRC'],))
+
+
+class IbmMfm(IbmBase):
+ sector_anno = 4*(ANN_FORMAT,)
+ sector_desc = 3 * (['Gap End'],) + (['Unknown Mark', '? Mark'],)
+ sync_pattern = (0xaaaa5224, 0xaaaa4489)
+
+ iam_anno = 4*(ANN_FORMAT,)
+ iam_desc = (
+ 3 * (['Gap End'],) + (['Index Mark', 'IAM'],))
+ idam_anno = 10*(ANN_FORMAT,)
+ idam_desc = (
+ 3 * (['Gap End'],) + (['ID Address Mark', 'IDAM'],) +
+ (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'],
+ ['Length', 'Len']) +
+ 2 * (['CRC'],))
+ mfm = True
+
+ def clear(self):
+ # Don't mark last two sync bytes as having clock errors. Check them
+ # with a special-case.
+ self.report_encoding_errors = False
+ super().clear()
+
+ def dam_anno(self, data_len):
+ return 4*(ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,)
+
+ def dam_desc(self, data_len, am_desc):
+ return (
+ 3 * (['Gap End'],) + (am_desc,) +
+ data_len * (['Data'],) +
+ 2 * (['CRC'],))
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0 and byte == 0xc2:
+ self.sector_anno = self.iam_anno
+ self.sector_desc = self.iam_desc
+ if self.sector_desc == self.iam_desc:
+ if byte_in_sector == 1 or byte_in_sector == 2:
+ if decoder.window & 0xffff != 0x5224:
+ self.invalid_byte(decoder, bit_end)
+ return
+ elif byte_in_sector == 3:
+ if byte != 0xfc:
+ self.invalid_byte(decoder, bit_end)
+ return
+ else:
+ if byte_in_sector == 1 or byte_in_sector == 2:
+ if decoder.window & 0xffff != 0x4489:
+ self.invalid_byte(decoder, bit_end)
+ return
+ if byte_in_sector == 2:
+ # Past the sync bytes
+ self.report_encoding_errors = True
+ super().handle_byte(decoder, byte, byte_in_sector, bit_end)
+
+ def invalid_byte(self, decoder, bit_end):
+ decoder.put(
+ decoder.window_bit_start(15),
+ bit_end, decoder.out_ann,
+ [ANN_ERR, ['Invalid', 'Inv']])
+ decoder.bytes_remaining = 1
+
+
+class MicropolisFdd(Format):
+ sector_anno = 13*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 6*(ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) +
+ 10*(['OS Extra', 'OS'],) +
+ 256*(['Data'],) +
+ ((['Checksum', 'Sum'],) +
+ 4*(['ECC (unverified)', 'ECC'],) + (['ECC Present?', 'ECC?'],)))
+ sync_pattern = (0xaaaa5555,)
+ mfm = True
+
+ def __init__(self):
+ self.clear()
+
+ def clear(self):
+ self.checksum = 0
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0:
+ self.checksum = 0
+ check = self.checksum
+ if byte_in_sector == 269 and byte != check & 0xFF:
+ decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann,
+ [ANN_ERR,
+ ['Bad Checksum (0x%02x)' % check, 'Bad Sum']])
+ if check > 0xFF:
+ check -= 0x100 - 1
+ check += byte
+ self.checksum = check
+
+
+class MicropolisHdd(Format):
+ sector_anno = 4*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 4*(ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Head', 'Hd'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) +
+ 256*(['Data'],) +
+ 4*(['ECC (unverified)', 'ECC'],))
+ sync_pattern = (0xaaaa5555,)
+ mfm = True
+
+
+class NorthstarBase(Format):
+ def __init__(self):
+ self.clear()
+
+ def clear(self):
+ self.checksum = 0
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0 or byte_in_sector == 1:
+ self.checksum = 0
+ return
+ check = self.checksum
+ check ^= byte
+ check = ((check & 0x7f) << 1) | (check >> 7)
+ if decoder.bytes_remaining == 1 and check != 0:
+ decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann,
+ [ANN_ERR, ['Bad Check (0x%02x)' % check, 'Bad Check']])
+ self.checksum = check
+
+
+class NorthstarFm(NorthstarBase):
+ sector_anno = (ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'],) +
+ 512*(['Data'],) + ((['Checksum', 'Sum'],)))
+ sync_pattern = (0xaaaa5545,)
+ fm = True
+
+
+class NorthstarMfm(NorthstarBase):
+ sector_anno = 2*(ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Head, Track, Sector', 'Hd/Trk/Sect' 'H/T/S']) +
+ 512*(['Data'],) + ((['Checksum', 'Sum'],)))
+ sync_pattern = (0xaaaa5545,)
+ mfm = True
+
+
+class Decoder(srd.Decoder):
+ api_version = 3
+ id = 'floppy'
+ name = 'Floppy'
+ longname = 'Floppy Disk Sector Formating'
+ desc = 'Floppy sector decoder'
+ license = 'gplv2+'
+ inputs = ['mfm']
+ outputs = ['floppy']
+ tags = ['PC', 'Retro computing']
+ annotations = (
+ ('payload', 'Sector payload byte'),
+ ('err', 'Error'),
+ ('format', 'Sector format byte'),
+ ('desc', 'Byte description'),
+ ('data', 'Data bit'),
+ ('datairr', 'Irregular data bit'),
+ )
+ annotation_rows = (
+ ('bits', 'Bits', (ANN_BIT, ANN_BITIRR)),
+ ('bytes', 'Bytes', (ANN_PAYLOAD, ANN_FORMAT)),
+ ('descs', 'Descriptions', (ANN_DESC,)),
+ ('errs', 'Errors', (ANN_ERR,)),
+ )
+ binary = (
+ ('payload', 'Sector payload'),
+ ('sector', 'Full sector'),
+ )
+ options = (
+ {
+ 'id': 'format',
+ 'desc': 'Format',
+ 'default': 'ibm-mfm',
+ 'values': (
+ 'ibm-fm',
+ 'ibm-mfm',
+ 'micropolis-fdd',
+ 'micropolis-hdd',
+ 'northstar-fm',
+ 'northstar-mfm')},)
+
+ def __init__(self):
+ self.format = Format()
+ self.clear()
+
+ def reset(self):
+ self.__init__()
+
+ def clear(self):
+ self.format.clear()
+ self.window = 0
+ self.window_starts = [0]*16
+ self.window_starts_pos = 0
+ self.bits_remaining = 0
+ self.bytes_remaining = 0
+ self.bits_skip = 0
+ self.sector_start = 0
+ self.sector_bytes = None
+ self.section_name = None
+ self.section_start = None
+ self.section_bytes = None
+
+ def start(self):
+ if self.options['format'] == 'ibm-fm':
+ self.format = IbmFm()
+ elif self.options['format'] == 'ibm-mfm':
+ self.format = IbmMfm()
+ elif self.options['format'] == 'micropolis-fdd':
+ self.format = MicropolisFdd()
+ elif self.options['format'] == 'micropolis-hdd':
+ self.format = MicropolisHdd()
+ elif self.options['format'] == 'northstar-fm':
+ self.format = NorthstarFm()
+ elif self.options['format'] == 'northstar-mfm':
+ self.format = NorthstarMfm()
+ else:
+ raise AssertionError('Unsupported format')
+
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_bin = self.register(srd.OUTPUT_BINARY)
+
+ def window_bit_start(self, i):
+ """Gets the start sample for the ith most recent bit from the window"""
+ lim = len(self.window_starts)
+ assert i < lim and i >= 0, 'bit not in window'
+ i += 1
+ return self.window_starts[(self.window_starts_pos-i+lim) % lim]
+
+ def check_clock(self, pos):
+ window = self.window >> (pos - 1)
+ expclock = None
+ if self.format.mfm:
+ expclock = ((window & 1) or ((window >> 2) & 1)) ^ 1
+ elif self.format.fm:
+ expclock = 1
+ else:
+ return True
+ return ((window >> 1) & 1) == expclock
+
+ def decode(self, bit_start, bit_end, cmd):
+ if cmd[0] == 'DISCONT':
+ self.clear()
+ return
+ if cmd[0] == 'IDX' and self.bits_remaining == 0:
+ self.clear()
+ self.bits_skip = 500
+ return
+
+ if cmd[0] != 'HALFBIT':
+ return
+
+ if self.bits_skip:
+ self.bits_skip -= 1
+ return
+
+ b = cmd[1]
+ self.window = ((self.window & 0x7fffffff) << 1) | b
+ self.window_starts[self.window_starts_pos] = bit_start
+ self.window_starts_pos += 1
+ self.window_starts_pos %= len(self.window_starts)
+ if self.bits_remaining == 0:
+ if self.window in self.format.sync_pattern:
+ self.bits_remaining = 1
+ self.bytes_remaining = len(self.format.sector_anno)
+ self.sector_bytes = None
+ for pos in range(15, 2, -2):
+ b = (self.window >> (pos-1)) & 1
+ if not self.check_clock(pos):
+ ann = [ANN_BITIRR, ['%d (Irregular clock)' % b,
+ '%d (Irr)' % b,
+ str(b)]]
+ else:
+ ann = [ANN_BIT, [str(b)]]
+ self.put(self.window_bit_start(pos),
+ self.window_bit_start(pos-2),
+ self.out_ann, ann)
+ b = self.window & 1
+ else:
+ return
+
+ self.bits_remaining -= 1
+ if (self.bits_remaining & 1) == 0:
+ irregular = not self.check_clock(1)
+ if irregular:
+ ann = [ANN_BITIRR, ['%d (Irregular clock)' % b,
+ '%d (Irr)' % b,
+ str(b)]]
+ else:
+ ann = [ANN_BIT, [str(b)]]
+ self.put(self.window_bit_start(1), bit_end, self.out_ann, ann)
+ if self.format.report_encoding_errors and irregular:
+ self.put(self.window_bit_start(1), bit_end, self.out_ann,
+ [ANN_ERR, ['Bad Clock', 'clk']])
+ if self.bits_remaining:
+ return
+
+ byte_in_sector = len(self.format.sector_anno)-self.bytes_remaining
+ byte = ((self.window & 0x0001) >> 0 |
+ (self.window & 0x0004) >> 1 |
+ (self.window & 0x0010) >> 2 |
+ (self.window & 0x0040) >> 3 |
+ (self.window & 0x0100) >> 4 |
+ (self.window & 0x0400) >> 5 |
+ (self.window & 0x1000) >> 6 |
+ (self.window & 0x4000) >> 7)
+
+ self.format.handle_byte(self, byte, byte_in_sector, bit_end)
+ anno = self.format.sector_anno[byte_in_sector]
+ desc = self.format.sector_desc[byte_in_sector]
+ byte_start = self.window_bit_start(15)
+ if self.sector_bytes is None:
+ self.sector_start = byte_start
+ self.sector_bytes = bytearray()
+ if desc != self.section_name:
+ self.section_name = desc
+ self.section_start = byte_start
+ self.section_bytes = bytearray()
+
+ self.sector_bytes.append(byte)
+ self.section_bytes.append(byte)
+
+ plain = '%02x' % byte
+ if byte >= ord(' ') and byte < 127:
+ pretty = '%02x \'%c\'' % (byte, byte)
+ else:
+ pretty = plain
+ self.put(byte_start, bit_end, self.out_ann, [anno, [pretty, plain]])
+ if (self.bytes_remaining == 1 or
+ desc != self.format.sector_desc[byte_in_sector+1]):
+ self.put(self.section_start, bit_end, self.out_ann,
+ [ANN_DESC, desc])
+ if anno == ANN_PAYLOAD:
+ self.put(self.section_start, bit_end, self.out_bin,
+ [0, bytes(self.section_bytes)])
+
+ self.bytes_remaining -= 1
+ if self.bytes_remaining:
+ self.bits_remaining = 16
+ else:
+ # TODO: expose metadata for type of floppy
+ self.put(self.sector_start, bit_end, self.out_bin,
+ [1, bytes(self.sector_bytes)])
+ self.clear()
diff --git a/decoders/mfm/__init__.py b/decoders/mfm/__init__.py
new file mode 100644
index 0000000..24dfe3b
--- /dev/null
+++ b/decoders/mfm/__init__.py
@@ -0,0 +1,38 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2023 Eric Anderson <ejona86@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+'''
+FM and MFM is a combined clock/data encoding used on floppy disks. The bit
+stream requires a stacked decoder to detect a format-specific sync pattern to
+determine the byte boundaries and to separate the clock and data.
+
+The necessary sampling frequency is often 8x the data rate, so sampling at
+least at 16x the data rate is recommended.
+
+There are various interfaces with floppy disk drives, but most are very similar
+in principal to the Shugart interface. They use open-drain signals with events
+signaled with falling edges. Decoding the data is mostly done with the Read
+Data (RDATA, RD) and Write Data (WDATA, WD) lines, with optional help from
+Index (IDX), Write Gate (WG), and other lines. The "Enable low" and "Enable
+high" channels are most directly used by setting one of the channels to WG to
+enable decoding only when writing or reading. But they can also be used with
+other signals like Disk Select, Step, Track 0, and Side Select.
+'''
+
+from .pd import Decoder
diff --git a/decoders/mfm/pd.py b/decoders/mfm/pd.py
new file mode 100644
index 0000000..59ff62b
--- /dev/null
+++ b/decoders/mfm/pd.py
@@ -0,0 +1,169 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2023 Eric Anderson <ejona86@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+import sigrokdecode as srd
+
+
+class SamplerateError(Exception):
+ pass
+
+
+class Decoder(srd.Decoder):
+ """
+ Python protocol: Each datum is a tuple, with the first entry being a string
+ denoting its type:
+ - HALFBIT: Contains a second entry of either 0 or 1, the value of one half
+ of a bitcell
+ - DISCONT: The data stream dropped and there is a discontinuity
+ - IDX: Index pulse
+ """
+ api_version = 3
+ id = 'mfm'
+ name = 'FM/MFM'
+ longname = '(Modified) Frequency Modulation'
+ desc = 'Floppy disk FM/MFM raw half-bits.'
+ license = 'gplv2+'
+ inputs = ['logic']
+ outputs = ['mfm']
+ tags = ['Encoding', 'PC', 'Retro computing']
+ channels = (
+ {'id': 'data', 'name': 'RD/WD', 'desc': 'Flux pulses'},
+ )
+ optional_channels = (
+ {'id': 'idx', 'name': 'IDX', 'desc': 'Index pulses'},
+ {'id': 'enablelo', 'name': 'Enable low', 'desc': 'Decode when low'},
+ {'id': 'enablehi', 'name': 'Enable high', 'desc': 'Decode when high'},
+ )
+ annotations = (
+ ('halfbit', 'Clock and data'),
+ ('err', 'Clock Error'),
+ )
+ annotation_rows = (
+ ('bit', 'Clock and data', (0,)),
+ ('errs', 'Errors', (1,)),
+ )
+ options = (
+ {
+ 'id': 'data_rate',
+ 'desc': 'Data rate (kbps)',
+ 'default': 250,
+ 'values': (
+ 125,
+ 150,
+ 250,
+ 300,
+ 500,
+ 1000,
+ 5000)},
+ {
+ 'id': 'leading_edge',
+ 'desc': 'Leading edge',
+ 'default': 'falling',
+ 'values': ('falling', 'rising')})
+
+ def __init__(self):
+ self.samplerate = None
+
+ self.wait_cond = 'f'
+ self.data_rate = 0.0 # Hz
+
+ def reset(self):
+ self.__init__()
+
+ def metadata(self, key, value):
+ if key == srd.SRD_CONF_SAMPLERATE:
+ self.samplerate = value
+
+ def start(self):
+ rising = self.options['leading_edge'] == 'rising'
+ self.wait_cond = 'r' if rising else 'f'
+ self.data_rate = float(self.options['data_rate']) * 1000
+
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_py = self.register(srd.OUTPUT_PYTHON)
+
+ def decode(self):
+ if not self.samplerate:
+ raise SamplerateError('Cannot decode without samplerate.')
+
+ window_size = int(self.samplerate / self.data_rate / 2)
+ window_size_half = int(self.samplerate / self.data_rate / 4)
+
+ b = 0
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ skip = window_size
+
+ while True:
+ last_samplenum = self.samplenum
+ _, _, enLo, enHi = self.wait([
+ {0: self.wait_cond},
+ {'skip': skip},
+ {1: self.wait_cond}])
+ if self.matched[2]:
+ self.put(self.samplenum, self.samplenum, self.out_py, ('IDX',))
+ if not (self.matched[0] or self.matched[1]):
+ skip -= self.samplenum - last_samplenum
+ continue
+ if enLo == 1:
+ self.put(self.samplenum, self.samplenum, self.out_py,
+ ('DISCONT',))
+ self.wait({2: 'l'})
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ continue
+ if enHi == 0:
+ self.put(self.samplenum, self.samplenum, self.out_py,
+ ('DISCONT',))
+ self.wait({3: 'h'})
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ continue
+
+ b = int(self.matched[0])
+ bit_end = self.samplenum
+ skip = window_size
+ if b:
+ # Wait returns with zeros aligned to the end of the
+ # half-bitcell and ones aligned to the center. Add half a
+ # window for ones to get past the previous half-bitcell.
+ bit_end += int(.5 * window_size)
+ skip += window_size_half
+ consecutive_zeros = 0
+ else:
+ consecutive_zeros += 1
+ if consecutive_zeros > 32:
+ self.put(bit_start, bit_end, self.out_py, ('DISCONT',))
+ # Stop decoding until there is another pulse
+ self.wait({0: self.wait_cond})
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ continue
+
+ if self.samplenum < bit_start:
+ # Two edges in one half-bitcell
+ self.put(bit_start, bit_end, self.out_ann,
+ [1, ['Spurious pulse', 'extra']])
+ elif consecutive_zeros > 4: # Allow MMFM
+ self.put(bit_start, bit_end, self.out_ann,
+ [1, ['No clock', 'clk']])
+
+ self.put(bit_start, bit_end, self.out_ann, [0, [str(b)]])
+ self.put(bit_start, bit_end, self.out_py, ('HALFBIT', b))
+ bit_start = bit_end