## ## This file is part of the libsigrokdecode project. ## ## Copyright (C) 2023 Eric Anderson ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see . ## import sigrokdecode as srd ANN_PAYLOAD = 0 ANN_ERR = 1 ANN_FORMAT = 2 ANN_DESC = 3 ANN_BIT = 4 ANN_BITIRR = 5 class SamplerateError(Exception): pass def crc16_ccitt_gen(crc): for i in range(8): b = crc & 0x8000 crc = (crc & 0x7fff) << 1 if b: crc ^= 0x1021 return crc crc16_ccitt_table = tuple(crc16_ccitt_gen(i << 8) for i in range(256)) class Format: sector_anno = () sector_desc = () sync_pattern = () report_encoding_errors = True fm = False mfm = False def clear(self): pass def handle_byte(self, decoder, byte, byte_in_sector, bit_end): pass class IbmBase(Format): iam_anno = () iam_desc = () idam_anno = () idam_desc = () def __init__(self): self.ibm_sector_len = 128 self.clear() def clear(self): self.sector_anno = self.__class__.sector_anno self.sector_desc = self.__class__.sector_desc self.crc = 0 def dam_anno(self, data_len): raise AssertionError('Unimplemented') def dam_desc(self, data_len, am_desc): raise AssertionError('Unimplemented') def handle_byte(self, decoder, byte, byte_in_sector, bit_end): if (self.sector_desc == self.__class__.sector_desc and decoder.bytes_remaining == 1): if byte == 0xf8: self.sector_anno = self.dam_anno(self.ibm_sector_len) self.sector_desc = self.dam_desc( self.ibm_sector_len, ['Deleted Data Address Mark', 'Deleted DAM', 'DDAM']) elif byte == 0xf9 or byte == 0xfa: self.sector_anno = self.dam_anno(self.ibm_sector_len) self.sector_desc = self.dam_desc( self.ibm_sector_len, ['Custom Data Address Mark', 'Custom DAM', '? DAM']) elif byte == 0xfb: self.sector_anno = self.dam_anno(self.ibm_sector_len) self.sector_desc = self.dam_desc( self.ibm_sector_len, ['Data Address Mark', 'DAM']) elif byte == 0xfc: self.sector_anno = self.iam_anno self.sector_desc = self.iam_desc elif byte == 0xfe: self.sector_anno = self.idam_anno self.sector_desc = self.idam_desc decoder.bytes_remaining = len(self.sector_anno)-byte_in_sector if (self.sector_desc == self.__class__.idam_desc and decoder.bytes_remaining == 3): self.ibm_sector_len = 2**(byte+7) if byte_in_sector == 0: self.crc = 0xffff check = self.crc check = crc16_ccitt_table[(check >> 8) ^ byte] ^ ((check & 0xff) << 8) if (decoder.bytes_remaining == 1 and check != 0 and self.sector_anno not in ( self.__class__.sector_anno, self.iam_anno)): decoder.put(decoder.section_start, bit_end, decoder.out_ann, [ANN_ERR, ['Bad CRC (0x%02x)' % check, 'Bad CRC']]) self.crc = check class IbmFm(IbmBase): sector_anno = (ANN_FORMAT,) sector_desc = (['Unknown Mark', '? Mark'],) sync_pattern = ( 0xaaaaf56a, # data: f8, clock: c7 0xaaaaf56b, # data: f9, clock: c7 0xaaaaf56e, # data: fa, clock: c7 0xaaaaf56f, # data: fb, clock: c7 0xaaaaf57e, # data: fe, clock: c7 0xaaaaf77a) # data: fc, clock: d7 fm = True iam_anno = (ANN_FORMAT,) iam_desc = (['Index Mark', 'IAM'],) idam_anno = 7*(ANN_FORMAT,) idam_desc = ( (['ID Address Mark', 'IDAM'],) + (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'], ['Length', 'Len']) + 2 * (['CRC'],)) def dam_anno(self, data_len): return (ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,) def dam_desc(self, data_len, am_desc): return ( (am_desc,) + data_len * (['Data'],) + 2 * (['CRC'],)) class IbmMfm(IbmBase): sector_anno = 4*(ANN_FORMAT,) sector_desc = 3 * (['Gap End'],) + (['Unknown Mark', '? Mark'],) sync_pattern = (0xaaaa5224, 0xaaaa4489) iam_anno = 4*(ANN_FORMAT,) iam_desc = ( 3 * (['Gap End'],) + (['Index Mark', 'IAM'],)) idam_anno = 10*(ANN_FORMAT,) idam_desc = ( 3 * (['Gap End'],) + (['ID Address Mark', 'IDAM'],) + (['Cylinder', 'Cyl'], ['Side'], ['Sector', 'Sect'], ['Length', 'Len']) + 2 * (['CRC'],)) mfm = True def clear(self): # Don't mark last two sync bytes as having clock errors. Check them # with a special-case. self.report_encoding_errors = False super().clear() def dam_anno(self, data_len): return 4*(ANN_FORMAT,) + data_len*(ANN_PAYLOAD,) + 2*(ANN_FORMAT,) def dam_desc(self, data_len, am_desc): return ( 3 * (['Gap End'],) + (am_desc,) + data_len * (['Data'],) + 2 * (['CRC'],)) def handle_byte(self, decoder, byte, byte_in_sector, bit_end): if byte_in_sector == 0 and byte == 0xc2: self.sector_anno = self.iam_anno self.sector_desc = self.iam_desc if self.sector_desc == self.iam_desc: if byte_in_sector == 1 or byte_in_sector == 2: if decoder.window & 0xffff != 0x5224: self.invalid_byte(decoder, bit_end) return elif byte_in_sector == 3: if byte != 0xfc: self.invalid_byte(decoder, bit_end) return else: if byte_in_sector == 1 or byte_in_sector == 2: if decoder.window & 0xffff != 0x4489: self.invalid_byte(decoder, bit_end) return if byte_in_sector == 2: # Past the sync bytes self.report_encoding_errors = True super().handle_byte(decoder, byte, byte_in_sector, bit_end) def invalid_byte(self, decoder, bit_end): decoder.put( decoder.window_bit_start(15), bit_end, decoder.out_ann, [ANN_ERR, ['Invalid', 'Inv']]) decoder.bytes_remaining = 1 class MicropolisFdd(Format): sector_anno = 13*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 6*(ANN_FORMAT,) sector_desc = ( (['Sync'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) + 10*(['OS Extra', 'OS'],) + 256*(['Data'],) + ((['Checksum', 'Sum'],) + 4*(['ECC (unverified)', 'ECC'],) + (['ECC Present?', 'ECC?'],))) sync_pattern = (0xaaaa5555,) mfm = True def __init__(self): self.clear() def clear(self): self.checksum = 0 def handle_byte(self, decoder, byte, byte_in_sector, bit_end): if byte_in_sector == 0: self.checksum = 0 check = self.checksum if byte_in_sector == 269 and byte != check & 0xFF: decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann, [ANN_ERR, ['Bad Checksum (0x%02x)' % check, 'Bad Sum']]) if check > 0xFF: check -= 0x100 - 1 check += byte self.checksum = check class MicropolisHdd(Format): sector_anno = 4*(ANN_FORMAT,) + 256*(ANN_PAYLOAD,) + 4*(ANN_FORMAT,) sector_desc = ( (['Sync'], ['Head', 'Hd'], ['Track', 'Trk'], ['Sector', 'Sect', 'S']) + 256*(['Data'],) + 4*(['ECC (unverified)', 'ECC'],)) sync_pattern = (0xaaaa5555,) mfm = True class NorthstarBase(Format): def __init__(self): self.clear() def clear(self): self.checksum = 0 def handle_byte(self, decoder, byte, byte_in_sector, bit_end): if byte_in_sector == 0 or byte_in_sector == 1: self.checksum = 0 return check = self.checksum check ^= byte check = ((check & 0x7f) << 1) | (check >> 7) if decoder.bytes_remaining == 1 and check != 0: decoder.put(decoder.window_bit_start(15), bit_end, decoder.out_ann, [ANN_ERR, ['Bad Check (0x%02x)' % check, 'Bad Check']]) self.checksum = check class NorthstarFm(NorthstarBase): sector_anno = (ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,) sector_desc = ( (['Sync'],) + 512*(['Data'],) + ((['Checksum', 'Sum'],))) sync_pattern = (0xaaaa5545,) fm = True class NorthstarMfm(NorthstarBase): sector_anno = 2*(ANN_FORMAT,) + 512*(ANN_PAYLOAD,) + (ANN_FORMAT,) sector_desc = ( (['Sync'], ['Head, Track, Sector', 'Hd/Trk/Sect' 'H/T/S']) + 512*(['Data'],) + ((['Checksum', 'Sum'],))) sync_pattern = (0xaaaa5545,) mfm = True class Decoder(srd.Decoder): api_version = 3 id = 'floppy' name = 'Floppy' longname = 'Floppy Disk Sector Formating' desc = 'Floppy sector decoder' license = 'gplv2+' inputs = ['mfm'] outputs = ['floppy'] tags = ['PC', 'Retro computing'] annotations = ( ('payload', 'Sector payload byte'), ('err', 'Error'), ('format', 'Sector format byte'), ('desc', 'Byte description'), ('data', 'Data bit'), ('datairr', 'Irregular data bit'), ) annotation_rows = ( ('bits', 'Bits', (ANN_BIT, ANN_BITIRR)), ('bytes', 'Bytes', (ANN_PAYLOAD, ANN_FORMAT)), ('descs', 'Descriptions', (ANN_DESC,)), ('errs', 'Errors', (ANN_ERR,)), ) binary = ( ('payload', 'Sector payload'), ('sector', 'Full sector'), ) options = ( { 'id': 'format', 'desc': 'Format', 'default': 'ibm-mfm', 'values': ( 'ibm-fm', 'ibm-mfm', 'micropolis-fdd', 'micropolis-hdd', 'northstar-fm', 'northstar-mfm')},) def __init__(self): self.format = Format() self.clear() def reset(self): self.__init__() def clear(self): self.format.clear() self.window = 0 self.window_starts = [0]*16 self.window_starts_pos = 0 self.bits_remaining = 0 self.bytes_remaining = 0 self.bits_skip = 0 self.sector_start = 0 self.sector_bytes = None self.section_name = None self.section_start = None self.section_bytes = None def start(self): if self.options['format'] == 'ibm-fm': self.format = IbmFm() elif self.options['format'] == 'ibm-mfm': self.format = IbmMfm() elif self.options['format'] == 'micropolis-fdd': self.format = MicropolisFdd() elif self.options['format'] == 'micropolis-hdd': self.format = MicropolisHdd() elif self.options['format'] == 'northstar-fm': self.format = NorthstarFm() elif self.options['format'] == 'northstar-mfm': self.format = NorthstarMfm() else: raise AssertionError('Unsupported format') self.out_ann = self.register(srd.OUTPUT_ANN) self.out_bin = self.register(srd.OUTPUT_BINARY) def window_bit_start(self, i): """Gets the start sample for the ith most recent bit from the window""" lim = len(self.window_starts) assert i < lim and i >= 0, 'bit not in window' i += 1 return self.window_starts[(self.window_starts_pos-i+lim) % lim] def check_clock(self, pos): window = self.window >> (pos - 1) expclock = None if self.format.mfm: expclock = ((window & 1) or ((window >> 2) & 1)) ^ 1 elif self.format.fm: expclock = 1 else: return True return ((window >> 1) & 1) == expclock def decode(self, bit_start, bit_end, cmd): if cmd[0] == 'DISCONT': self.clear() return if cmd[0] == 'IDX' and self.bits_remaining == 0: self.clear() self.bits_skip = 500 return if cmd[0] != 'HALFBIT': return if self.bits_skip: self.bits_skip -= 1 return b = cmd[1] self.window = ((self.window & 0x7fffffff) << 1) | b self.window_starts[self.window_starts_pos] = bit_start self.window_starts_pos += 1 self.window_starts_pos %= len(self.window_starts) if self.bits_remaining == 0: if self.window in self.format.sync_pattern: self.bits_remaining = 1 self.bytes_remaining = len(self.format.sector_anno) self.sector_bytes = None for pos in range(15, 2, -2): b = (self.window >> (pos-1)) & 1 if not self.check_clock(pos): ann = [ANN_BITIRR, ['%d (Irregular clock)' % b, '%d (Irr)' % b, str(b)]] else: ann = [ANN_BIT, [str(b)]] self.put(self.window_bit_start(pos), self.window_bit_start(pos-2), self.out_ann, ann) b = self.window & 1 else: return self.bits_remaining -= 1 if (self.bits_remaining & 1) == 0: irregular = not self.check_clock(1) if irregular: ann = [ANN_BITIRR, ['%d (Irregular clock)' % b, '%d (Irr)' % b, str(b)]] else: ann = [ANN_BIT, [str(b)]] self.put(self.window_bit_start(1), bit_end, self.out_ann, ann) if self.format.report_encoding_errors and irregular: self.put(self.window_bit_start(1), bit_end, self.out_ann, [ANN_ERR, ['Bad Clock', 'clk']]) if self.bits_remaining: return byte_in_sector = len(self.format.sector_anno)-self.bytes_remaining byte = ((self.window & 0x0001) >> 0 | (self.window & 0x0004) >> 1 | (self.window & 0x0010) >> 2 | (self.window & 0x0040) >> 3 | (self.window & 0x0100) >> 4 | (self.window & 0x0400) >> 5 | (self.window & 0x1000) >> 6 | (self.window & 0x4000) >> 7) self.format.handle_byte(self, byte, byte_in_sector, bit_end) anno = self.format.sector_anno[byte_in_sector] desc = self.format.sector_desc[byte_in_sector] byte_start = self.window_bit_start(15) if self.sector_bytes is None: self.sector_start = byte_start self.sector_bytes = bytearray() if desc != self.section_name: self.section_name = desc self.section_start = byte_start self.section_bytes = bytearray() self.sector_bytes.append(byte) self.section_bytes.append(byte) plain = '%02x' % byte if byte >= ord(' ') and byte < 127: pretty = '%02x \'%c\'' % (byte, byte) else: pretty = plain self.put(byte_start, bit_end, self.out_ann, [anno, [pretty, plain]]) if (self.bytes_remaining == 1 or desc != self.format.sector_desc[byte_in_sector+1]): self.put(self.section_start, bit_end, self.out_ann, [ANN_DESC, desc]) if anno == ANN_PAYLOAD: self.put(self.section_start, bit_end, self.out_bin, [0, bytes(self.section_bytes)]) self.bytes_remaining -= 1 if self.bytes_remaining: self.bits_remaining = 16 else: # TODO: expose metadata for type of floppy self.put(self.sector_start, bit_end, self.out_bin, [1, bytes(self.sector_bytes)]) self.clear()