## ## This file is part of the libsigrokdecode project. ## ## Copyright (C) 2023 Eric Anderson ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see . ## import sigrokdecode as srd class SamplerateError(Exception): pass class Decoder(srd.Decoder): """ Python protocol: Each datum is a tuple, with the first entry being a string denoting its type: - HALFBIT: Contains a second entry of either 0 or 1, the value of one half of a bitcell - DISCONT: The data stream dropped and there is a discontinuity - IDX: Index pulse """ api_version = 3 id = 'mfm' name = 'FM/MFM' longname = '(Modified) Frequency Modulation' desc = 'Floppy disk FM/MFM raw half-bits.' license = 'gplv2+' inputs = ['logic'] outputs = ['mfm'] tags = ['Encoding', 'PC', 'Retro computing'] channels = ( {'id': 'data', 'name': 'RD/WD', 'desc': 'Flux pulses'}, ) optional_channels = ( {'id': 'idx', 'name': 'IDX', 'desc': 'Index pulses'}, {'id': 'enablelo', 'name': 'Enable low', 'desc': 'Decode when low'}, {'id': 'enablehi', 'name': 'Enable high', 'desc': 'Decode when high'}, ) annotations = ( ('halfbit', 'Clock and data'), ('err', 'Clock Error'), ) annotation_rows = ( ('bit', 'Clock and data', (0,)), ('errs', 'Errors', (1,)), ) options = ( { 'id': 'data_rate', 'desc': 'Data rate (kbps)', 'default': 250, 'values': ( 125, 150, 250, 300, 500, 1000, 5000)}, { 'id': 'leading_edge', 'desc': 'Leading edge', 'default': 'falling', 'values': ('falling', 'rising')}) def __init__(self): self.samplerate = None self.wait_cond = 'f' self.data_rate = 0.0 # Hz def reset(self): self.__init__() def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value def start(self): rising = self.options['leading_edge'] == 'rising' self.wait_cond = 'r' if rising else 'f' self.data_rate = float(self.options['data_rate']) * 1000 self.out_ann = self.register(srd.OUTPUT_ANN) self.out_py = self.register(srd.OUTPUT_PYTHON) def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') window_size = int(self.samplerate / self.data_rate / 2) window_size_half = int(self.samplerate / self.data_rate / 4) b = 0 bit_start = self.samplenum consecutive_zeros = 0 skip = window_size while True: last_samplenum = self.samplenum _, _, enLo, enHi = self.wait([ {0: self.wait_cond}, {'skip': skip}, {1: self.wait_cond}]) if self.matched[2]: self.put(self.samplenum, self.samplenum, self.out_py, ('IDX',)) if not (self.matched[0] or self.matched[1]): skip -= self.samplenum - last_samplenum continue if enLo == 1: self.put(self.samplenum, self.samplenum, self.out_py, ('DISCONT',)) self.wait({2: 'l'}) bit_start = self.samplenum consecutive_zeros = 0 continue if enHi == 0: self.put(self.samplenum, self.samplenum, self.out_py, ('DISCONT',)) self.wait({3: 'h'}) bit_start = self.samplenum consecutive_zeros = 0 continue b = int(self.matched[0]) bit_end = self.samplenum skip = window_size if b: # Wait returns with zeros aligned to the end of the # half-bitcell and ones aligned to the center. Add half a # window for ones to get past the previous half-bitcell. bit_end += int(.5 * window_size) skip += window_size_half consecutive_zeros = 0 else: consecutive_zeros += 1 if consecutive_zeros > 32: self.put(bit_start, bit_end, self.out_py, ('DISCONT',)) # Stop decoding until there is another pulse self.wait({0: self.wait_cond}) bit_start = self.samplenum consecutive_zeros = 0 continue if self.samplenum < bit_start: # Two edges in one half-bitcell self.put(bit_start, bit_end, self.out_ann, [1, ['Spurious pulse', 'extra']]) elif consecutive_zeros > 4: # Allow MMFM self.put(bit_start, bit_end, self.out_ann, [1, ['No clock', 'clk']]) self.put(bit_start, bit_end, self.out_ann, [0, [str(b)]]) self.put(bit_start, bit_end, self.out_py, ('HALFBIT', b)) bit_start = bit_end