summaryrefslogtreecommitdiff
path: root/decoders/mfm/pd.py
diff options
context:
space:
mode:
Diffstat (limited to 'decoders/mfm/pd.py')
-rw-r--r--decoders/mfm/pd.py169
1 files changed, 169 insertions, 0 deletions
diff --git a/decoders/mfm/pd.py b/decoders/mfm/pd.py
new file mode 100644
index 0000000..59ff62b
--- /dev/null
+++ b/decoders/mfm/pd.py
@@ -0,0 +1,169 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2023 Eric Anderson <ejona86@gmail.com>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+import sigrokdecode as srd
+
+
+class SamplerateError(Exception):
+ pass
+
+
+class Decoder(srd.Decoder):
+ """
+ Python protocol: Each datum is a tuple, with the first entry being a string
+ denoting its type:
+ - HALFBIT: Contains a second entry of either 0 or 1, the value of one half
+ of a bitcell
+ - DISCONT: The data stream dropped and there is a discontinuity
+ - IDX: Index pulse
+ """
+ api_version = 3
+ id = 'mfm'
+ name = 'FM/MFM'
+ longname = '(Modified) Frequency Modulation'
+ desc = 'Floppy disk FM/MFM raw half-bits.'
+ license = 'gplv2+'
+ inputs = ['logic']
+ outputs = ['mfm']
+ tags = ['Encoding', 'PC', 'Retro computing']
+ channels = (
+ {'id': 'data', 'name': 'RD/WD', 'desc': 'Flux pulses'},
+ )
+ optional_channels = (
+ {'id': 'idx', 'name': 'IDX', 'desc': 'Index pulses'},
+ {'id': 'enablelo', 'name': 'Enable low', 'desc': 'Decode when low'},
+ {'id': 'enablehi', 'name': 'Enable high', 'desc': 'Decode when high'},
+ )
+ annotations = (
+ ('halfbit', 'Clock and data'),
+ ('err', 'Clock Error'),
+ )
+ annotation_rows = (
+ ('bit', 'Clock and data', (0,)),
+ ('errs', 'Errors', (1,)),
+ )
+ options = (
+ {
+ 'id': 'data_rate',
+ 'desc': 'Data rate (kbps)',
+ 'default': 250,
+ 'values': (
+ 125,
+ 150,
+ 250,
+ 300,
+ 500,
+ 1000,
+ 5000)},
+ {
+ 'id': 'leading_edge',
+ 'desc': 'Leading edge',
+ 'default': 'falling',
+ 'values': ('falling', 'rising')})
+
+ def __init__(self):
+ self.samplerate = None
+
+ self.wait_cond = 'f'
+ self.data_rate = 0.0 # Hz
+
+ def reset(self):
+ self.__init__()
+
+ def metadata(self, key, value):
+ if key == srd.SRD_CONF_SAMPLERATE:
+ self.samplerate = value
+
+ def start(self):
+ rising = self.options['leading_edge'] == 'rising'
+ self.wait_cond = 'r' if rising else 'f'
+ self.data_rate = float(self.options['data_rate']) * 1000
+
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_py = self.register(srd.OUTPUT_PYTHON)
+
+ def decode(self):
+ if not self.samplerate:
+ raise SamplerateError('Cannot decode without samplerate.')
+
+ window_size = int(self.samplerate / self.data_rate / 2)
+ window_size_half = int(self.samplerate / self.data_rate / 4)
+
+ b = 0
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ skip = window_size
+
+ while True:
+ last_samplenum = self.samplenum
+ _, _, enLo, enHi = self.wait([
+ {0: self.wait_cond},
+ {'skip': skip},
+ {1: self.wait_cond}])
+ if self.matched[2]:
+ self.put(self.samplenum, self.samplenum, self.out_py, ('IDX',))
+ if not (self.matched[0] or self.matched[1]):
+ skip -= self.samplenum - last_samplenum
+ continue
+ if enLo == 1:
+ self.put(self.samplenum, self.samplenum, self.out_py,
+ ('DISCONT',))
+ self.wait({2: 'l'})
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ continue
+ if enHi == 0:
+ self.put(self.samplenum, self.samplenum, self.out_py,
+ ('DISCONT',))
+ self.wait({3: 'h'})
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ continue
+
+ b = int(self.matched[0])
+ bit_end = self.samplenum
+ skip = window_size
+ if b:
+ # Wait returns with zeros aligned to the end of the
+ # half-bitcell and ones aligned to the center. Add half a
+ # window for ones to get past the previous half-bitcell.
+ bit_end += int(.5 * window_size)
+ skip += window_size_half
+ consecutive_zeros = 0
+ else:
+ consecutive_zeros += 1
+ if consecutive_zeros > 32:
+ self.put(bit_start, bit_end, self.out_py, ('DISCONT',))
+ # Stop decoding until there is another pulse
+ self.wait({0: self.wait_cond})
+ bit_start = self.samplenum
+ consecutive_zeros = 0
+ continue
+
+ if self.samplenum < bit_start:
+ # Two edges in one half-bitcell
+ self.put(bit_start, bit_end, self.out_ann,
+ [1, ['Spurious pulse', 'extra']])
+ elif consecutive_zeros > 4: # Allow MMFM
+ self.put(bit_start, bit_end, self.out_ann,
+ [1, ['No clock', 'clk']])
+
+ self.put(bit_start, bit_end, self.out_ann, [0, [str(b)]])
+ self.put(bit_start, bit_end, self.out_py, ('HALFBIT', b))
+ bit_start = bit_end