summaryrefslogtreecommitdiff
path: root/decoders/floppy/pd.py
diff options
context:
space:
mode:
authorEric Anderson <ejona86@gmail.com>2023-08-13 07:37:17 -0700
committerEric Anderson <ejona86@gmail.com>2023-08-13 07:37:17 -0700
commita92839049b3de6f2cd8c1d95c583bd45ec027c73 (patch)
treea6d09528e01b2ba640c5e6e3e11e31003b8e5e6d /decoders/floppy/pd.py
parentdf3a4a3bd1763324765f53932d878525a4a20102 (diff)
downloadlibsigrokdecode-a92839049b3de6f2cd8c1d95c583bd45ec027c73.tar.gz
libsigrokdecode-a92839049b3de6f2cd8c1d95c583bd45ec027c73.zip
Add mfm and floppy decodersfloppy
The "MFM" decoder can handle FM, MFM, MMFM, GCR, and could be extended to support RLL. I have only tested with FM and MFM. These are all related encodings but "MFM" is the most distinctive name so it is being used to describe the family. The family of encodings was used in magnetic tape storage, but the focus here is floppies with a side of hard drives. The MFM decoder is pretty simple, and doesn't attempt to separate clock/data or align bytes. The method of doing so varies per sector format so that responsibility is left to the consumer, which makes configuration for the user easier. The decoder also doesn't try to act as a PLL. Someone else can enhance it, but it currently seems to work fine even on some early-1980s floppy disks/drives. The Floppy decoder can also be used with ST506-style hard drives... which are not floppies. But the encoding was very similar when MFM was in use. I have not tested an ST506 using IBM-compatible MFM formatting, but I have tested a Micropolis-encoded HDD which is a slight variation of the MFM FDD format.
Diffstat (limited to 'decoders/floppy/pd.py')
-rw-r--r--decoders/floppy/pd.py497
1 files changed, 497 insertions, 0 deletions
diff --git a/decoders/floppy/pd.py b/decoders/floppy/pd.py
new file mode 100644
index 0000000..24229df
--- /dev/null
+++ b/decoders/floppy/pd.py
@@ -0,0 +1,497 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2023 Eric Anderson <ejona86@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+import sigrokdecode as srd
+
+ANN_PAYLOAD = 0
+ANN_ERR = 1
+ANN_FORMAT = 2
+ANN_DESC = 3
+ANN_BIT = 4
+ANN_BITIRR = 5
+
+
+class SamplerateError(Exception):
+ pass
+
+
+def crc16_ccitt_gen(crc):
+ for i in range(8):
+ b = crc & 0x8000
+ crc = (crc & 0x7fff) << 1
+ if b:
+ crc ^= 0x1021
+ return crc
+
+
+crc16_ccitt_table = tuple(crc16_ccitt_gen(i << 8) for i in range(256))
+
+
+class Format:
+ sector_anno = ()
+ sector_desc = ()
+ sync_pattern = ()
+ report_encoding_errors = True
+ fm = False
+ mfm = False
+
+ def clear(self):
+ pass
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ pass
+
+
+class IbmBase(Format):
+ iam_anno = ()
+ iam_desc = ()
+ idam_anno = ()
+ idam_desc = ()
+
+ def __init__(self):
+ self.ibm_sector_len = 128
+ self.clear()
+
+ def clear(self):
+ self.sector_anno = self.__class__.sector_anno
+ self.sector_desc = self.__class__.sector_desc
+ self.crc = 0
+
+ def dam_anno(self, data_len):
+ raise AssertionError('Unimplemented')
+
+ def dam_desc(self, data_len, am_desc):
+ raise AssertionError('Unimplemented')
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if (self.sector_desc == self.__class__.sector_desc and
+ decoder.bytes_remaining == 1):
+ if byte == 0xf8:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len,
+ ['Deleted Data Address Mark', 'Deleted DAM', 'DDAM'])
+ elif byte == 0xf9 or byte == 0xfa:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len,
+ ['Custom Data Address Mark', 'Custom DAM', '? DAM'])
+ elif byte == 0xfb:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len, ['Data Address Mark', 'DAM'])
+ elif byte == 0xfc:
+ self.sector_anno = self.iam_anno
+ self.sector_desc = self.iam_desc
+ elif byte == 0xfe:
+ self.sector_anno = self.idam_anno
+ self.sector_desc = self.idam_desc
+ decoder.bytes_remaining = len(self.sector_anno)-byte_in_sector
+ if (self.sector_desc == self.__class__.idam_desc and
+ decoder.bytes_remaining == 3):
+ self.ibm_sector_len = 2**(byte+7)
+
+ if byte_in_sector == 0:
+ self.crc = 0xffff
+ check = self.crc
+ check = crc16_ccitt_table[(check >> 8) ^ byte] ^ ((check & 0xff) << 8)
+ if (decoder.bytes_remaining == 1 and check != 0 and
+ self.sector_anno not in (
+ self.__class__.sector_anno, self.iam_anno)):
+ decoder.put(decoder.section_start, bit_end, decoder.out_ann,
+ [ANN_ERR, ['Bad CRC (0x%02x)' % check, 'Bad CRC']])
+ self.crc = check
+
+
+class IbmFm(IbmBase):
+ sector_anno = (ANN_FORMAT,)
+ sector_desc = (['Unknown Mark', '? Mark'],)
+ sync_pattern = (
+ 0xaaaaf56a, # data: f8, clock: c7
+ 0xaaaaf56b, # data: f9, clock: c7
+ 0xaaaaf56e, # data: fa, clock: c7
+ 0xaaaaf56f, # data: fb, clock: c7
+ 0xaaaaf57e, # data: fe, clock: c7
+
+ 0xaaaaf77a) # data: fc, clock: d7
+ fm = True
+
+ iam_anno = (ANN_FORMAT,)
+ iam_desc = (['Index Mark', 'IAM'],)
+ idam_anno = 7*(ANN_FORMAT,)
+ idam_desc = (
+ (['ID Address Mark', 'IDAM'],) +
+ (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'],
+ ['Length', 'Len']) +
+ 2 * (['CRC'],))
+
+ def dam_anno(self, data_len):
+ return (ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,)
+
+ def dam_desc(self, data_len, am_desc):
+ return (
+ (am_desc,) +
+ data_len * (['Data'],) +
+ 2 * (['CRC'],))
+
+
+class IbmMfm(IbmBase):
+ sector_anno = 4*(ANN_FORMAT,)
+ sector_desc = 3 * (['Gap End'],) + (['Unknown Mark', '? Mark'],)
+ sync_pattern = (0xaaaa5224, 0xaaaa4489)
+
+ iam_anno = 4*(ANN_FORMAT,)
+ iam_desc = (
+ 3 * (['Gap End'],) + (['Index Mark', 'IAM'],))
+ idam_anno = 10*(ANN_FORMAT,)
+ idam_desc = (
+ 3 * (['Gap End'],) + (['ID Address Mark', 'IDAM'],) +
+ (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'],
+ ['Length', 'Len']) +
+ 2 * (['CRC'],))
+ mfm = True
+
+ def clear(self):
+ # Don't mark last two sync bytes as having clock errors. Check them
+ # with a special-case.
+ self.report_encoding_errors = False
+ super().clear()
+
+ def dam_anno(self, data_len):
+ return 4*(ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,)
+
+ def dam_desc(self, data_len, am_desc):
+ return (
+ 3 * (['Gap End'],) + (am_desc,) +
+ data_len * (['Data'],) +
+ 2 * (['CRC'],))
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0 and byte == 0xc2:
+ self.sector_anno = self.iam_anno
+ self.sector_desc = self.iam_desc
+ if self.sector_desc == self.iam_desc:
+ if byte_in_sector == 1 or byte_in_sector == 2:
+ if decoder.window & 0xffff != 0x5224:
+ self.invalid_byte(decoder, bit_end)
+ return
+ elif byte_in_sector == 3:
+ if byte != 0xfc:
+ self.invalid_byte(decoder, bit_end)
+ return
+ else:
+ if byte_in_sector == 1 or byte_in_sector == 2:
+ if decoder.window & 0xffff != 0x4489:
+ self.invalid_byte(decoder, bit_end)
+ return
+ if byte_in_sector == 2:
+ # Past the sync bytes
+ self.report_encoding_errors = True
+ super().handle_byte(decoder, byte, byte_in_sector, bit_end)
+
+ def invalid_byte(self, decoder, bit_end):
+ decoder.put(
+ decoder.window_bit_start(15),
+ bit_end, decoder.out_ann,
+ [ANN_ERR, ['Invalid', 'Inv']])
+ decoder.bytes_remaining = 1
+
+
+class MicropolisFdd(Format):
+ sector_anno = 13*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 6*(ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) +
+ 10*(['OS Extra', 'OS'],) +
+ 256*(['Data'],) +
+ ((['Checksum', 'Sum'],) +
+ 4*(['ECC (unverified)', 'ECC'],) + (['ECC Present?', 'ECC?'],)))
+ sync_pattern = (0xaaaa5555,)
+ mfm = True
+
+ def __init__(self):
+ self.clear()
+
+ def clear(self):
+ self.checksum = 0
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0:
+ self.checksum = 0
+ check = self.checksum
+ if byte_in_sector == 269 and byte != check & 0xFF:
+ decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann,
+ [ANN_ERR,
+ ['Bad Checksum (0x%02x)' % check, 'Bad Sum']])
+ if check > 0xFF:
+ check -= 0x100 - 1
+ check += byte
+ self.checksum = check
+
+
+class MicropolisHdd(Format):
+ sector_anno = 4*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 4*(ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Head', 'Hd'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) +
+ 256*(['Data'],) +
+ 4*(['ECC (unverified)', 'ECC'],))
+ sync_pattern = (0xaaaa5555,)
+ mfm = True
+
+
+class NorthstarBase(Format):
+ def __init__(self):
+ self.clear()
+
+ def clear(self):
+ self.checksum = 0
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0 or byte_in_sector == 1:
+ self.checksum = 0
+ return
+ check = self.checksum
+ check ^= byte
+ check = ((check & 0x7f) << 1) | (check >> 7)
+ if decoder.bytes_remaining == 1 and check != 0:
+ decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann,
+ [ANN_ERR, ['Bad Check (0x%02x)' % check, 'Bad Check']])
+ self.checksum = check
+
+
+class NorthstarFm(NorthstarBase):
+ sector_anno = (ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'],) +
+ 512*(['Data'],) + ((['Checksum', 'Sum'],)))
+ sync_pattern = (0xaaaa5545,)
+ fm = True
+
+
+class NorthstarMfm(NorthstarBase):
+ sector_anno = 2*(ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Head, Track, Sector', 'Hd/Trk/Sect' 'H/T/S']) +
+ 512*(['Data'],) + ((['Checksum', 'Sum'],)))
+ sync_pattern = (0xaaaa5545,)
+ mfm = True
+
+
+class Decoder(srd.Decoder):
+ api_version = 3
+ id = 'floppy'
+ name = 'Floppy'
+ longname = 'Floppy Disk Sector Formating'
+ desc = 'Floppy sector decoder'
+ license = 'gplv2+'
+ inputs = ['mfm']
+ outputs = ['floppy']
+ tags = ['PC', 'Retro computing']
+ annotations = (
+ ('payload', 'Sector payload byte'),
+ ('err', 'Error'),
+ ('format', 'Sector format byte'),
+ ('desc', 'Byte description'),
+ ('data', 'Data bit'),
+ ('datairr', 'Irregular data bit'),
+ )
+ annotation_rows = (
+ ('bits', 'Bits', (ANN_BIT, ANN_BITIRR)),
+ ('bytes', 'Bytes', (ANN_PAYLOAD, ANN_FORMAT)),
+ ('descs', 'Descriptions', (ANN_DESC,)),
+ ('errs', 'Errors', (ANN_ERR,)),
+ )
+ binary = (
+ ('payload', 'Sector payload'),
+ ('sector', 'Full sector'),
+ )
+ options = (
+ {
+ 'id': 'format',
+ 'desc': 'Format',
+ 'default': 'ibm-mfm',
+ 'values': (
+ 'ibm-fm',
+ 'ibm-mfm',
+ 'micropolis-fdd',
+ 'micropolis-hdd',
+ 'northstar-fm',
+ 'northstar-mfm')},)
+
+ def __init__(self):
+ self.format = Format()
+ self.clear()
+
+ def reset(self):
+ self.__init__()
+
+ def clear(self):
+ self.format.clear()
+ self.window = 0
+ self.window_starts = [0]*16
+ self.window_starts_pos = 0
+ self.bits_remaining = 0
+ self.bytes_remaining = 0
+ self.bits_skip = 0
+ self.sector_start = 0
+ self.sector_bytes = None
+ self.section_name = None
+ self.section_start = None
+ self.section_bytes = None
+
+ def start(self):
+ if self.options['format'] == 'ibm-fm':
+ self.format = IbmFm()
+ elif self.options['format'] == 'ibm-mfm':
+ self.format = IbmMfm()
+ elif self.options['format'] == 'micropolis-fdd':
+ self.format = MicropolisFdd()
+ elif self.options['format'] == 'micropolis-hdd':
+ self.format = MicropolisHdd()
+ elif self.options['format'] == 'northstar-fm':
+ self.format = NorthstarFm()
+ elif self.options['format'] == 'northstar-mfm':
+ self.format = NorthstarMfm()
+ else:
+ raise AssertionError('Unsupported format')
+
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_bin = self.register(srd.OUTPUT_BINARY)
+
+ def window_bit_start(self, i):
+ """Gets the start sample for the ith most recent bit from the window"""
+ lim = len(self.window_starts)
+ assert i < lim and i >= 0, 'bit not in window'
+ i += 1
+ return self.window_starts[(self.window_starts_pos-i+lim) % lim]
+
+ def check_clock(self, pos):
+ window = self.window >> (pos - 1)
+ expclock = None
+ if self.format.mfm:
+ expclock = ((window & 1) or ((window >> 2) & 1)) ^ 1
+ elif self.format.fm:
+ expclock = 1
+ else:
+ return True
+ return ((window >> 1) & 1) == expclock
+
+ def decode(self, bit_start, bit_end, cmd):
+ if cmd[0] == 'DISCONT':
+ self.clear()
+ return
+ if cmd[0] == 'IDX' and self.bits_remaining == 0:
+ self.clear()
+ self.bits_skip = 500
+ return
+
+ if cmd[0] != 'HALFBIT':
+ return
+
+ if self.bits_skip:
+ self.bits_skip -= 1
+ return
+
+ b = cmd[1]
+ self.window = ((self.window & 0x7fffffff) << 1) | b
+ self.window_starts[self.window_starts_pos] = bit_start
+ self.window_starts_pos += 1
+ self.window_starts_pos %= len(self.window_starts)
+ if self.bits_remaining == 0:
+ if self.window in self.format.sync_pattern:
+ self.bits_remaining = 1
+ self.bytes_remaining = len(self.format.sector_anno)
+ self.sector_bytes = None
+ for pos in range(15, 2, -2):
+ b = (self.window >> (pos-1)) & 1
+ if not self.check_clock(pos):
+ ann = [ANN_BITIRR, ['%d (Irregular clock)' % b,
+ '%d (Irr)' % b,
+ str(b)]]
+ else:
+ ann = [ANN_BIT, [str(b)]]
+ self.put(self.window_bit_start(pos),
+ self.window_bit_start(pos-2),
+ self.out_ann, ann)
+ b = self.window & 1
+ else:
+ return
+
+ self.bits_remaining -= 1
+ if (self.bits_remaining & 1) == 0:
+ irregular = not self.check_clock(1)
+ if irregular:
+ ann = [ANN_BITIRR, ['%d (Irregular clock)' % b,
+ '%d (Irr)' % b,
+ str(b)]]
+ else:
+ ann = [ANN_BIT, [str(b)]]
+ self.put(self.window_bit_start(1), bit_end, self.out_ann, ann)
+ if self.format.report_encoding_errors and irregular:
+ self.put(self.window_bit_start(1), bit_end, self.out_ann,
+ [ANN_ERR, ['Bad Clock', 'clk']])
+ if self.bits_remaining:
+ return
+
+ byte_in_sector = len(self.format.sector_anno)-self.bytes_remaining
+ byte = ((self.window & 0x0001) >> 0 |
+ (self.window & 0x0004) >> 1 |
+ (self.window & 0x0010) >> 2 |
+ (self.window & 0x0040) >> 3 |
+ (self.window & 0x0100) >> 4 |
+ (self.window & 0x0400) >> 5 |
+ (self.window & 0x1000) >> 6 |
+ (self.window & 0x4000) >> 7)
+
+ self.format.handle_byte(self, byte, byte_in_sector, bit_end)
+ anno = self.format.sector_anno[byte_in_sector]
+ desc = self.format.sector_desc[byte_in_sector]
+ byte_start = self.window_bit_start(15)
+ if self.sector_bytes is None:
+ self.sector_start = byte_start
+ self.sector_bytes = bytearray()
+ if desc != self.section_name:
+ self.section_name = desc
+ self.section_start = byte_start
+ self.section_bytes = bytearray()
+
+ self.sector_bytes.append(byte)
+ self.section_bytes.append(byte)
+
+ plain = '%02x' % byte
+ if byte >= ord(' ') and byte < 127:
+ pretty = '%02x \'%c\'' % (byte, byte)
+ else:
+ pretty = plain
+ self.put(byte_start, bit_end, self.out_ann, [anno, [pretty, plain]])
+ if (self.bytes_remaining == 1 or
+ desc != self.format.sector_desc[byte_in_sector+1]):
+ self.put(self.section_start, bit_end, self.out_ann,
+ [ANN_DESC, desc])
+ if anno == ANN_PAYLOAD:
+ self.put(self.section_start, bit_end, self.out_bin,
+ [0, bytes(self.section_bytes)])
+
+ self.bytes_remaining -= 1
+ if self.bytes_remaining:
+ self.bits_remaining = 16
+ else:
+ # TODO: expose metadata for type of floppy
+ self.put(self.sector_start, bit_end, self.out_bin,
+ [1, bytes(self.sector_bytes)])
+ self.clear()