summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--instance.c71
-rw-r--r--libsigrokdecode-internal.h1
-rw-r--r--libsigrokdecode.h4
-rw-r--r--session.c26
-rw-r--r--type_decoder.c16
5 files changed, 118 insertions, 0 deletions
diff --git a/instance.c b/instance.c
index c30b46b..067e98d 100644
--- a/instance.c
+++ b/instance.c
@@ -426,6 +426,7 @@ SRD_API struct srd_decoder_inst *srd_inst_new(struct srd_session *sess,
di->got_new_samples = FALSE;
di->handled_all_samples = FALSE;
di->want_wait_terminate = FALSE;
+ di->communicate_eof = FALSE;
di->decoder_state = SRD_OK;
/*
@@ -500,6 +501,7 @@ static void srd_inst_reset_state(struct srd_decoder_inst *di)
di->got_new_samples = FALSE;
di->handled_all_samples = FALSE;
di->want_wait_terminate = FALSE;
+ di->communicate_eof = FALSE;
di->decoder_state = SRD_OK;
/* Conditions and mutex got reset after joining the thread. */
}
@@ -1058,6 +1060,17 @@ static gpointer di_thread(gpointer data)
py_res = PyObject_CallMethod(di->py_inst, "decode", NULL);
srd_dbg("%s: decode() terminated.", di->inst_id);
+ /*
+ * Termination with an EOFError exception is accepted to simplify
+ * the implementation of decoders and for backwards compatibility.
+ */
+ if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_EOFError)) {
+ srd_dbg("%s: ignoring EOFError during decode() execution.",
+ di->inst_id);
+ PyErr_Clear();
+ if (!py_res)
+ py_res = Py_None;
+ }
if (!py_res)
di->decoder_state = SRD_ERR;
@@ -1282,6 +1295,64 @@ SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di)
}
/**
+ * Communicate the end of the stream of sample data to a decoder instance.
+ *
+ * @param[in] di The decoder instance to call. Must not be NULL.
+ *
+ * @return SRD_OK upon success, a (negative) error code otherwise.
+ *
+ * @private
+ */
+SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di)
+{
+ GSList *l;
+ int ret;
+
+ if (!di)
+ return SRD_ERR_ARG;
+
+ /*
+ * Send EOF to the caller specified decoder instance. Only
+ * communicate EOF to currently executing decoders. Never
+ * started or previously finished is perfectly acceptable.
+ */
+ srd_dbg("End of sample data: instance %s.", di->inst_id);
+ if (!di->thread_handle) {
+ srd_dbg("No worker thread, nothing to do.");
+ return SRD_OK;
+ }
+
+ /* Signal the thread about the EOF condition. */
+ g_mutex_lock(&di->data_mutex);
+ di->inbuf = NULL;
+ di->inbuflen = 0;
+ di->got_new_samples = TRUE;
+ di->handled_all_samples = FALSE;
+ di->want_wait_terminate = TRUE;
+ di->communicate_eof = TRUE;
+ g_cond_signal(&di->got_new_samples_cond);
+ g_mutex_unlock(&di->data_mutex);
+
+ /* Only return from here when the condition was handled. */
+ g_mutex_lock(&di->data_mutex);
+ while (!di->handled_all_samples && !di->want_wait_terminate)
+ g_cond_wait(&di->handled_all_samples_cond, &di->data_mutex);
+ g_mutex_unlock(&di->data_mutex);
+
+ /* Flush the decoder instance which handled EOF. */
+ srd_inst_flush(di);
+
+ /* Pass EOF to all stacked decoders. */
+ for (l = di->next_di; l; l = l->next) {
+ ret = srd_inst_send_eof(l->data);
+ if (ret != SRD_OK)
+ return ret;
+ }
+
+ return SRD_OK;
+}
+
+/**
* Terminate current decoder work, prepare for re-use on new input data.
*
* Terminates all decoder operations in the specified decoder instance
diff --git a/libsigrokdecode-internal.h b/libsigrokdecode-internal.h
index 453e14b..0e3cb64 100644
--- a/libsigrokdecode-internal.h
+++ b/libsigrokdecode-internal.h
@@ -94,6 +94,7 @@ SRD_PRIV int srd_inst_decode(struct srd_decoder_inst *di,
const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize);
SRD_PRIV int process_samples_until_condition_match(struct srd_decoder_inst *di, gboolean *found_match);
SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di);
+SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di);
SRD_PRIV int srd_inst_terminate_reset(struct srd_decoder_inst *di);
SRD_PRIV void srd_inst_free(struct srd_decoder_inst *di);
SRD_PRIV void srd_inst_free_all(struct srd_session *sess);
diff --git a/libsigrokdecode.h b/libsigrokdecode.h
index 3ce6ae5..cf6479c 100644
--- a/libsigrokdecode.h
+++ b/libsigrokdecode.h
@@ -291,6 +291,9 @@ struct srd_decoder_inst {
/** Requests termination of wait() and decode(). */
gboolean want_wait_terminate;
+ /** Requests that .wait() terminates a Python iteration. */
+ gboolean communicate_eof;
+
/** Indicates the current state of the decoder stack. */
int decoder_state;
@@ -353,6 +356,7 @@ SRD_API int srd_session_metadata_set(struct srd_session *sess, int key,
SRD_API int srd_session_send(struct srd_session *sess,
uint64_t abs_start_samplenum, uint64_t abs_end_samplenum,
const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize);
+SRD_API int srd_session_send_eof(struct srd_session *sess);
SRD_API int srd_session_terminate_reset(struct srd_session *sess);
SRD_API int srd_session_destroy(struct srd_session *sess);
SRD_API int srd_pd_output_callback_add(struct srd_session *sess,
diff --git a/session.c b/session.c
index 386fb71..ad08407 100644
--- a/session.c
+++ b/session.c
@@ -279,6 +279,32 @@ SRD_API int srd_session_send(struct srd_session *sess,
}
/**
+ * Communicate the end of the stream of sample data to the session.
+ *
+ * @param[in] sess The session. Must not be NULL.
+ *
+ * @return SRD_OK upon success. A (negative) error code otherwise.
+ *
+ * @since 0.6.0
+ */
+SRD_API int srd_session_send_eof(struct srd_session *sess)
+{
+ GSList *d;
+ int ret;
+
+ if (!sess)
+ return SRD_ERR_ARG;
+
+ for (d = sess->di_list; d; d = d->next) {
+ ret = srd_inst_send_eof(d->data);
+ if (ret != SRD_OK)
+ return ret;
+ }
+
+ return SRD_OK;
+}
+
+/**
* Terminate currently executing decoders in a session, reset internal state.
*
* All decoder instances have their .wait() method terminated, which
diff --git a/type_decoder.c b/type_decoder.c
index 0a92a45..6c6eab6 100644
--- a/type_decoder.c
+++ b/type_decoder.c
@@ -1090,6 +1090,22 @@ static PyObject *Decoder_wait(PyObject *self, PyObject *args)
g_cond_signal(&di->handled_all_samples_cond);
/*
+ * When EOF was provided externally, communicate the
+ * Python EOFError exception to .decode() and return
+ * from the .wait() method call. This is motivated by
+ * the use of Python context managers, so that .decode()
+ * methods can "close" incompletely accumulated data
+ * when the sample data is exhausted.
+ */
+ if (di->communicate_eof) {
+ srd_dbg("%s: %s: Raising EOF from wait().",
+ di->inst_id, __func__);
+ g_mutex_unlock(&di->data_mutex);
+ PyErr_SetString(PyExc_EOFError, "samples exhausted");
+ goto err;
+ }
+
+ /*
* When termination of wait() and decode() was requested,
* then exit the loop after releasing the mutex.
*/