diff options
author | Gerhard Sittig <gerhard.sittig@gmx.net> | 2017-03-05 18:09:59 +0100 |
---|---|---|
committer | Uwe Hermann <uwe@hermann-uwe.de> | 2017-03-12 14:59:28 +0100 |
commit | 04383ea876df72b7bd5c19854c30c81abdc0aa0c (patch) | |
tree | 560cdd0b49b9931fd8bc996db39561a5c231a68a /instance.c | |
parent | 1ec8b3acd2ef4e857110e536b40377e6f645b89f (diff) | |
download | libsigrokdecode-04383ea876df72b7bd5c19854c30c81abdc0aa0c.tar.gz libsigrokdecode-04383ea876df72b7bd5c19854c30c81abdc0aa0c.zip |
decoder: terminate .wait() and .decode(), join threads for v3 instances
Add support to terminate blocking .wait() and .decode() method calls of
v3 decoder instances. This terminates the decoder thread's main routine
and allows to release associated resources. Cope with requested as well
as unexpected termination of decode() calls. Add debug messages to
thread related code paths.
Make sure to unblock the main thread which feeds the decoder thread.
This unbreaks situations where decoders e.g. throw "need samplerate to
decode" exceptions.
Drain Python errors which might remain from the most recent .decode()
execution, to not affect other code paths. This avoids an issue where
the creation of a new decoder instance fails in the presence of errors
from a previous run.
This fixes bug #902.
Diffstat (limited to 'instance.c')
-rw-r--r-- | instance.c | 160 |
1 files changed, 156 insertions, 4 deletions
@@ -33,6 +33,10 @@ extern SRD_PRIV GSList *sessions; /* module_sigrokdecode.c */ extern SRD_PRIV PyObject *srd_logic_type; +static void srd_inst_join_decode_thread(struct srd_decoder_inst *di); +static void srd_inst_reset_state(struct srd_decoder_inst *di); +SRD_PRIV void oldpins_array_free(struct srd_decoder_inst *di); + /** @endcond */ /** @@ -376,6 +380,17 @@ SRD_API struct srd_decoder_inst *srd_inst_new(struct srd_session *sess, di->thread_handle = NULL; di->got_new_samples = FALSE; di->handled_all_samples = FALSE; + di->want_wait_terminate = FALSE; + + /* + * Strictly speaking initialization of statically allocated + * condition and mutex variables (or variables allocated on the + * stack) is not required, but won't harm either. Explicitly + * running init() will better match subsequent clear() calls. + */ + g_cond_init(&di->got_new_samples_cond); + g_cond_init(&di->handled_all_samples_cond); + g_mutex_init(&di->data_mutex); /* Instance takes input from a frontend by default. */ sess->di_list = g_slist_append(sess->di_list, di); @@ -384,6 +399,66 @@ SRD_API struct srd_decoder_inst *srd_inst_new(struct srd_session *sess, return di; } +static void srd_inst_join_decode_thread(struct srd_decoder_inst *di) +{ + if (!di) + return; + if (!di->thread_handle) + return; + + srd_dbg("%s: Joining decoder thread.", di->inst_id); + + /* + * Terminate potentially running threads which still + * execute the decoder instance's decode() method. + */ + srd_dbg("%s: Raising want_term, sending got_new.", di->inst_id); + g_mutex_lock(&di->data_mutex); + di->want_wait_terminate = TRUE; + g_cond_signal(&di->got_new_samples_cond); + g_mutex_unlock(&di->data_mutex); + + srd_dbg("%s: Running join().", di->inst_id); + (void)g_thread_join(di->thread_handle); + srd_dbg("%s: Call to join() done.", di->inst_id); + di->thread_handle = NULL; + + /* + * Reset condition and mutex variables, such that next + * operations on them will find them in a clean state. + */ + g_cond_clear(&di->got_new_samples_cond); + g_cond_init(&di->got_new_samples_cond); + g_cond_clear(&di->handled_all_samples_cond); + g_cond_init(&di->handled_all_samples_cond); + g_mutex_clear(&di->data_mutex); + g_mutex_init(&di->data_mutex); +} + +static void srd_inst_reset_state(struct srd_decoder_inst *di) +{ + if (!di) + return; + + srd_dbg("%s: Resetting decoder state.", di->inst_id); + + /* + * Reset internal state of the decoder. + */ + condition_list_free(di); + match_array_free(di); + di->abs_start_samplenum = 0; + di->abs_end_samplenum = 0; + di->inbuf = NULL; + di->inbuflen = 0; + di->abs_cur_samplenum = 0; + oldpins_array_free(di); + di->got_new_samples = FALSE; + di->handled_all_samples = FALSE; + di->want_wait_terminate = FALSE; + /* Conditions and mutex got reset after joining the thread. */ +} + /** * Stack a decoder instance on top of another. * @@ -597,6 +672,19 @@ static void set_initial_pin_values(struct srd_decoder_inst *di) g_string_free(s, TRUE); } +SRD_PRIV void oldpins_array_free(struct srd_decoder_inst *di) +{ + if (!di) + return; + if (!di->old_pins_array) + return; + + srd_dbg("%s: Releasing initial pin state.", di->inst_id); + + g_array_free(di->old_pins_array, TRUE); + di->old_pins_array = NULL; +} + /** @private */ SRD_PRIV int srd_inst_start(struct srd_decoder_inst *di) { @@ -865,6 +953,8 @@ static gboolean find_match(struct srd_decoder_inst *di) * * This function returns if there is an error, or when a match is found, or * when all samples have been processed (whether a match was found or not). + * This function immediately terminates when the decoder's wait() method + * invocation shall get terminated. * * @param di The decoder instance to use. Must not be NULL. * @param found_match Will be set to TRUE if at least one condition matched, @@ -880,6 +970,10 @@ SRD_PRIV int process_samples_until_condition_match(struct srd_decoder_inst *di, if (!di || !found_match) return SRD_ERR_ARG; + *found_match = FALSE; + if (di->want_wait_terminate) + return SRD_OK; + /* Check if any of the current condition(s) match. */ while (TRUE) { /* Feed the (next chunk of the) buffer to find_match(). */ @@ -916,20 +1010,74 @@ static gpointer di_thread(gpointer data) { PyObject *py_res; struct srd_decoder_inst *di; + int wanted_term; if (!data) return NULL; di = data; - /* Call self.decode(). Only returns if the PD throws an exception. */ + srd_dbg("%s: Starting thread routine for decoder.", di->inst_id); + + /* + * Call self.decode(). Only returns if the PD throws an exception. + * "Regular" termination of the decode() method is not expected. + */ Py_IncRef(di->py_inst); - if (!(py_res = PyObject_CallMethod(di->py_inst, "decode", NULL))) { + srd_dbg("%s: Calling decode() method.", di->inst_id); + py_res = PyObject_CallMethod(di->py_inst, "decode", NULL); + srd_dbg("%s: decode() method terminated.", di->inst_id); + + /* + * Make sure to unblock potentially pending srd_inst_decode() + * calls in application threads after the decode() method might + * have terminated, while it neither has processed sample data + * nor has terminated upon request. This happens e.g. when "need + * a samplerate to decode" exception is thrown. + */ + g_mutex_lock(&di->data_mutex); + wanted_term = di->want_wait_terminate; + di->want_wait_terminate = TRUE; + di->handled_all_samples = TRUE; + g_cond_signal(&di->handled_all_samples_cond); + g_mutex_unlock(&di->data_mutex); + + /* + * Check for the termination cause of the decode() method. + * Though this is mostly for information. + */ + if (!py_res && wanted_term) { + /* + * Silently ignore errors upon return from decode() calls + * when termination was requested. Terminate the thread + * which executed this instance's decode() logic. + */ + srd_dbg("%s: Thread done (!res, want_term).", di->inst_id); + PyErr_Clear(); + return NULL; + } + if (!py_res) { + /* + * The decode() invocation terminated unexpectedly. Have + * the back trace printed, and terminate the thread which + * executed the decode() method. + */ + srd_dbg("%s: decode() terminated unrequested.", di->inst_id); srd_exception_catch("Protocol decoder instance %s: ", di->inst_id); - exit(1); /* TODO: Proper shutdown. This is a hack. */ + srd_dbg("%s: Thread done (!res, !want_term).", di->inst_id); return NULL; } + + /* + * TODO: By design the decode() method is not supposed to terminate. + * Nevertheless we have the thread joined, and srd backend calls to + * decode() will re-start another thread transparently. + */ + srd_dbg("%s: decode() terminated (req %d).", di->inst_id, wanted_term); Py_DecRef(py_res); + PyErr_Clear(); + + srd_dbg("%s: Thread done (with res).", di->inst_id); return NULL; } @@ -1069,6 +1217,7 @@ SRD_PRIV int srd_inst_decode(struct srd_decoder_inst *di, di->inbuflen = inbuflen; di->got_new_samples = TRUE; di->handled_all_samples = FALSE; + di->want_wait_terminate = FALSE; /* Signal the thread that we have new data. */ g_cond_signal(&di->got_new_samples_cond); @@ -1076,7 +1225,7 @@ SRD_PRIV int srd_inst_decode(struct srd_decoder_inst *di, /* When all samples in this chunk were handled, return. */ g_mutex_lock(&di->data_mutex); - while (!di->handled_all_samples) + while (!di->handled_all_samples && !di->want_wait_terminate) g_cond_wait(&di->handled_all_samples_cond, &di->data_mutex); g_mutex_unlock(&di->data_mutex); } @@ -1092,6 +1241,9 @@ SRD_PRIV void srd_inst_free(struct srd_decoder_inst *di) srd_dbg("Freeing instance %s", di->inst_id); + srd_inst_join_decode_thread(di); + srd_inst_reset_state(di); + Py_DecRef(di->py_inst); g_free(di->inst_id); g_free(di->dec_channelmap); |