summaryrefslogtreecommitdiff
path: root/decoders/floppy/pd.py
diff options
context:
space:
mode:
Diffstat (limited to 'decoders/floppy/pd.py')
-rw-r--r--decoders/floppy/pd.py497
1 files changed, 497 insertions, 0 deletions
diff --git a/decoders/floppy/pd.py b/decoders/floppy/pd.py
new file mode 100644
index 0000000..24229df
--- /dev/null
+++ b/decoders/floppy/pd.py
@@ -0,0 +1,497 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2023 Eric Anderson <ejona86@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+import sigrokdecode as srd
+
+ANN_PAYLOAD = 0
+ANN_ERR = 1
+ANN_FORMAT = 2
+ANN_DESC = 3
+ANN_BIT = 4
+ANN_BITIRR = 5
+
+
+class SamplerateError(Exception):
+ pass
+
+
+def crc16_ccitt_gen(crc):
+ for i in range(8):
+ b = crc & 0x8000
+ crc = (crc & 0x7fff) << 1
+ if b:
+ crc ^= 0x1021
+ return crc
+
+
+crc16_ccitt_table = tuple(crc16_ccitt_gen(i << 8) for i in range(256))
+
+
+class Format:
+ sector_anno = ()
+ sector_desc = ()
+ sync_pattern = ()
+ report_encoding_errors = True
+ fm = False
+ mfm = False
+
+ def clear(self):
+ pass
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ pass
+
+
+class IbmBase(Format):
+ iam_anno = ()
+ iam_desc = ()
+ idam_anno = ()
+ idam_desc = ()
+
+ def __init__(self):
+ self.ibm_sector_len = 128
+ self.clear()
+
+ def clear(self):
+ self.sector_anno = self.__class__.sector_anno
+ self.sector_desc = self.__class__.sector_desc
+ self.crc = 0
+
+ def dam_anno(self, data_len):
+ raise AssertionError('Unimplemented')
+
+ def dam_desc(self, data_len, am_desc):
+ raise AssertionError('Unimplemented')
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if (self.sector_desc == self.__class__.sector_desc and
+ decoder.bytes_remaining == 1):
+ if byte == 0xf8:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len,
+ ['Deleted Data Address Mark', 'Deleted DAM', 'DDAM'])
+ elif byte == 0xf9 or byte == 0xfa:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len,
+ ['Custom Data Address Mark', 'Custom DAM', '? DAM'])
+ elif byte == 0xfb:
+ self.sector_anno = self.dam_anno(self.ibm_sector_len)
+ self.sector_desc = self.dam_desc(
+ self.ibm_sector_len, ['Data Address Mark', 'DAM'])
+ elif byte == 0xfc:
+ self.sector_anno = self.iam_anno
+ self.sector_desc = self.iam_desc
+ elif byte == 0xfe:
+ self.sector_anno = self.idam_anno
+ self.sector_desc = self.idam_desc
+ decoder.bytes_remaining = len(self.sector_anno)-byte_in_sector
+ if (self.sector_desc == self.__class__.idam_desc and
+ decoder.bytes_remaining == 3):
+ self.ibm_sector_len = 2**(byte+7)
+
+ if byte_in_sector == 0:
+ self.crc = 0xffff
+ check = self.crc
+ check = crc16_ccitt_table[(check >> 8) ^ byte] ^ ((check & 0xff) << 8)
+ if (decoder.bytes_remaining == 1 and check != 0 and
+ self.sector_anno not in (
+ self.__class__.sector_anno, self.iam_anno)):
+ decoder.put(decoder.section_start, bit_end, decoder.out_ann,
+ [ANN_ERR, ['Bad CRC (0x%02x)' % check, 'Bad CRC']])
+ self.crc = check
+
+
+class IbmFm(IbmBase):
+ sector_anno = (ANN_FORMAT,)
+ sector_desc = (['Unknown Mark', '? Mark'],)
+ sync_pattern = (
+ 0xaaaaf56a, # data: f8, clock: c7
+ 0xaaaaf56b, # data: f9, clock: c7
+ 0xaaaaf56e, # data: fa, clock: c7
+ 0xaaaaf56f, # data: fb, clock: c7
+ 0xaaaaf57e, # data: fe, clock: c7
+
+ 0xaaaaf77a) # data: fc, clock: d7
+ fm = True
+
+ iam_anno = (ANN_FORMAT,)
+ iam_desc = (['Index Mark', 'IAM'],)
+ idam_anno = 7*(ANN_FORMAT,)
+ idam_desc = (
+ (['ID Address Mark', 'IDAM'],) +
+ (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'],
+ ['Length', 'Len']) +
+ 2 * (['CRC'],))
+
+ def dam_anno(self, data_len):
+ return (ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,)
+
+ def dam_desc(self, data_len, am_desc):
+ return (
+ (am_desc,) +
+ data_len * (['Data'],) +
+ 2 * (['CRC'],))
+
+
+class IbmMfm(IbmBase):
+ sector_anno = 4*(ANN_FORMAT,)
+ sector_desc = 3 * (['Gap End'],) + (['Unknown Mark', '? Mark'],)
+ sync_pattern = (0xaaaa5224, 0xaaaa4489)
+
+ iam_anno = 4*(ANN_FORMAT,)
+ iam_desc = (
+ 3 * (['Gap End'],) + (['Index Mark', 'IAM'],))
+ idam_anno = 10*(ANN_FORMAT,)
+ idam_desc = (
+ 3 * (['Gap End'],) + (['ID Address Mark', 'IDAM'],) +
+ (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'],
+ ['Length', 'Len']) +
+ 2 * (['CRC'],))
+ mfm = True
+
+ def clear(self):
+ # Don't mark last two sync bytes as having clock errors. Check them
+ # with a special-case.
+ self.report_encoding_errors = False
+ super().clear()
+
+ def dam_anno(self, data_len):
+ return 4*(ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,)
+
+ def dam_desc(self, data_len, am_desc):
+ return (
+ 3 * (['Gap End'],) + (am_desc,) +
+ data_len * (['Data'],) +
+ 2 * (['CRC'],))
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0 and byte == 0xc2:
+ self.sector_anno = self.iam_anno
+ self.sector_desc = self.iam_desc
+ if self.sector_desc == self.iam_desc:
+ if byte_in_sector == 1 or byte_in_sector == 2:
+ if decoder.window & 0xffff != 0x5224:
+ self.invalid_byte(decoder, bit_end)
+ return
+ elif byte_in_sector == 3:
+ if byte != 0xfc:
+ self.invalid_byte(decoder, bit_end)
+ return
+ else:
+ if byte_in_sector == 1 or byte_in_sector == 2:
+ if decoder.window & 0xffff != 0x4489:
+ self.invalid_byte(decoder, bit_end)
+ return
+ if byte_in_sector == 2:
+ # Past the sync bytes
+ self.report_encoding_errors = True
+ super().handle_byte(decoder, byte, byte_in_sector, bit_end)
+
+ def invalid_byte(self, decoder, bit_end):
+ decoder.put(
+ decoder.window_bit_start(15),
+ bit_end, decoder.out_ann,
+ [ANN_ERR, ['Invalid', 'Inv']])
+ decoder.bytes_remaining = 1
+
+
+class MicropolisFdd(Format):
+ sector_anno = 13*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 6*(ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) +
+ 10*(['OS Extra', 'OS'],) +
+ 256*(['Data'],) +
+ ((['Checksum', 'Sum'],) +
+ 4*(['ECC (unverified)', 'ECC'],) + (['ECC Present?', 'ECC?'],)))
+ sync_pattern = (0xaaaa5555,)
+ mfm = True
+
+ def __init__(self):
+ self.clear()
+
+ def clear(self):
+ self.checksum = 0
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0:
+ self.checksum = 0
+ check = self.checksum
+ if byte_in_sector == 269 and byte != check & 0xFF:
+ decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann,
+ [ANN_ERR,
+ ['Bad Checksum (0x%02x)' % check, 'Bad Sum']])
+ if check > 0xFF:
+ check -= 0x100 - 1
+ check += byte
+ self.checksum = check
+
+
+class MicropolisHdd(Format):
+ sector_anno = 4*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 4*(ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Head', 'Hd'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) +
+ 256*(['Data'],) +
+ 4*(['ECC (unverified)', 'ECC'],))
+ sync_pattern = (0xaaaa5555,)
+ mfm = True
+
+
+class NorthstarBase(Format):
+ def __init__(self):
+ self.clear()
+
+ def clear(self):
+ self.checksum = 0
+
+ def handle_byte(self, decoder, byte, byte_in_sector, bit_end):
+ if byte_in_sector == 0 or byte_in_sector == 1:
+ self.checksum = 0
+ return
+ check = self.checksum
+ check ^= byte
+ check = ((check & 0x7f) << 1) | (check >> 7)
+ if decoder.bytes_remaining == 1 and check != 0:
+ decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann,
+ [ANN_ERR, ['Bad Check (0x%02x)' % check, 'Bad Check']])
+ self.checksum = check
+
+
+class NorthstarFm(NorthstarBase):
+ sector_anno = (ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'],) +
+ 512*(['Data'],) + ((['Checksum', 'Sum'],)))
+ sync_pattern = (0xaaaa5545,)
+ fm = True
+
+
+class NorthstarMfm(NorthstarBase):
+ sector_anno = 2*(ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,)
+ sector_desc = (
+ (['Sync'], ['Head, Track, Sector', 'Hd/Trk/Sect' 'H/T/S']) +
+ 512*(['Data'],) + ((['Checksum', 'Sum'],)))
+ sync_pattern = (0xaaaa5545,)
+ mfm = True
+
+
+class Decoder(srd.Decoder):
+ api_version = 3
+ id = 'floppy'
+ name = 'Floppy'
+ longname = 'Floppy Disk Sector Formating'
+ desc = 'Floppy sector decoder'
+ license = 'gplv2+'
+ inputs = ['mfm']
+ outputs = ['floppy']
+ tags = ['PC', 'Retro computing']
+ annotations = (
+ ('payload', 'Sector payload byte'),
+ ('err', 'Error'),
+ ('format', 'Sector format byte'),
+ ('desc', 'Byte description'),
+ ('data', 'Data bit'),
+ ('datairr', 'Irregular data bit'),
+ )
+ annotation_rows = (
+ ('bits', 'Bits', (ANN_BIT, ANN_BITIRR)),
+ ('bytes', 'Bytes', (ANN_PAYLOAD, ANN_FORMAT)),
+ ('descs', 'Descriptions', (ANN_DESC,)),
+ ('errs', 'Errors', (ANN_ERR,)),
+ )
+ binary = (
+ ('payload', 'Sector payload'),
+ ('sector', 'Full sector'),
+ )
+ options = (
+ {
+ 'id': 'format',
+ 'desc': 'Format',
+ 'default': 'ibm-mfm',
+ 'values': (
+ 'ibm-fm',
+ 'ibm-mfm',
+ 'micropolis-fdd',
+ 'micropolis-hdd',
+ 'northstar-fm',
+ 'northstar-mfm')},)
+
+ def __init__(self):
+ self.format = Format()
+ self.clear()
+
+ def reset(self):
+ self.__init__()
+
+ def clear(self):
+ self.format.clear()
+ self.window = 0
+ self.window_starts = [0]*16
+ self.window_starts_pos = 0
+ self.bits_remaining = 0
+ self.bytes_remaining = 0
+ self.bits_skip = 0
+ self.sector_start = 0
+ self.sector_bytes = None
+ self.section_name = None
+ self.section_start = None
+ self.section_bytes = None
+
+ def start(self):
+ if self.options['format'] == 'ibm-fm':
+ self.format = IbmFm()
+ elif self.options['format'] == 'ibm-mfm':
+ self.format = IbmMfm()
+ elif self.options['format'] == 'micropolis-fdd':
+ self.format = MicropolisFdd()
+ elif self.options['format'] == 'micropolis-hdd':
+ self.format = MicropolisHdd()
+ elif self.options['format'] == 'northstar-fm':
+ self.format = NorthstarFm()
+ elif self.options['format'] == 'northstar-mfm':
+ self.format = NorthstarMfm()
+ else:
+ raise AssertionError('Unsupported format')
+
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_bin = self.register(srd.OUTPUT_BINARY)
+
+ def window_bit_start(self, i):
+ """Gets the start sample for the ith most recent bit from the window"""
+ lim = len(self.window_starts)
+ assert i < lim and i >= 0, 'bit not in window'
+ i += 1
+ return self.window_starts[(self.window_starts_pos-i+lim) % lim]
+
+ def check_clock(self, pos):
+ window = self.window >> (pos - 1)
+ expclock = None
+ if self.format.mfm:
+ expclock = ((window & 1) or ((window >> 2) & 1)) ^ 1
+ elif self.format.fm:
+ expclock = 1
+ else:
+ return True
+ return ((window >> 1) & 1) == expclock
+
+ def decode(self, bit_start, bit_end, cmd):
+ if cmd[0] == 'DISCONT':
+ self.clear()
+ return
+ if cmd[0] == 'IDX' and self.bits_remaining == 0:
+ self.clear()
+ self.bits_skip = 500
+ return
+
+ if cmd[0] != 'HALFBIT':
+ return
+
+ if self.bits_skip:
+ self.bits_skip -= 1
+ return
+
+ b = cmd[1]
+ self.window = ((self.window & 0x7fffffff) << 1) | b
+ self.window_starts[self.window_starts_pos] = bit_start
+ self.window_starts_pos += 1
+ self.window_starts_pos %= len(self.window_starts)
+ if self.bits_remaining == 0:
+ if self.window in self.format.sync_pattern:
+ self.bits_remaining = 1
+ self.bytes_remaining = len(self.format.sector_anno)
+ self.sector_bytes = None
+ for pos in range(15, 2, -2):
+ b = (self.window >> (pos-1)) & 1
+ if not self.check_clock(pos):
+ ann = [ANN_BITIRR, ['%d (Irregular clock)' % b,
+ '%d (Irr)' % b,
+ str(b)]]
+ else:
+ ann = [ANN_BIT, [str(b)]]
+ self.put(self.window_bit_start(pos),
+ self.window_bit_start(pos-2),
+ self.out_ann, ann)
+ b = self.window & 1
+ else:
+ return
+
+ self.bits_remaining -= 1
+ if (self.bits_remaining & 1) == 0:
+ irregular = not self.check_clock(1)
+ if irregular:
+ ann = [ANN_BITIRR, ['%d (Irregular clock)' % b,
+ '%d (Irr)' % b,
+ str(b)]]
+ else:
+ ann = [ANN_BIT, [str(b)]]
+ self.put(self.window_bit_start(1), bit_end, self.out_ann, ann)
+ if self.format.report_encoding_errors and irregular:
+ self.put(self.window_bit_start(1), bit_end, self.out_ann,
+ [ANN_ERR, ['Bad Clock', 'clk']])
+ if self.bits_remaining:
+ return
+
+ byte_in_sector = len(self.format.sector_anno)-self.bytes_remaining
+ byte = ((self.window & 0x0001) >> 0 |
+ (self.window & 0x0004) >> 1 |
+ (self.window & 0x0010) >> 2 |
+ (self.window & 0x0040) >> 3 |
+ (self.window & 0x0100) >> 4 |
+ (self.window & 0x0400) >> 5 |
+ (self.window & 0x1000) >> 6 |
+ (self.window & 0x4000) >> 7)
+
+ self.format.handle_byte(self, byte, byte_in_sector, bit_end)
+ anno = self.format.sector_anno[byte_in_sector]
+ desc = self.format.sector_desc[byte_in_sector]
+ byte_start = self.window_bit_start(15)
+ if self.sector_bytes is None:
+ self.sector_start = byte_start
+ self.sector_bytes = bytearray()
+ if desc != self.section_name:
+ self.section_name = desc
+ self.section_start = byte_start
+ self.section_bytes = bytearray()
+
+ self.sector_bytes.append(byte)
+ self.section_bytes.append(byte)
+
+ plain = '%02x' % byte
+ if byte >= ord(' ') and byte < 127:
+ pretty = '%02x \'%c\'' % (byte, byte)
+ else:
+ pretty = plain
+ self.put(byte_start, bit_end, self.out_ann, [anno, [pretty, plain]])
+ if (self.bytes_remaining == 1 or
+ desc != self.format.sector_desc[byte_in_sector+1]):
+ self.put(self.section_start, bit_end, self.out_ann,
+ [ANN_DESC, desc])
+ if anno == ANN_PAYLOAD:
+ self.put(self.section_start, bit_end, self.out_bin,
+ [0, bytes(self.section_bytes)])
+
+ self.bytes_remaining -= 1
+ if self.bytes_remaining:
+ self.bits_remaining = 16
+ else:
+ # TODO: expose metadata for type of floppy
+ self.put(self.sector_start, bit_end, self.out_bin,
+ [1, bytes(self.sector_bytes)])
+ self.clear()