Commit 8e69acda authored by Jonathan Matthew's avatar Jonathan Matthew 🥗
Browse files

Merge branch 'rhythmdb-barrier' into 'master'

rhythmdb: wait for changes to be processed before committing

Closes #1844 and #1782

See merge request GNOME/rhythmbox!106
parents 47982830 e4e831cf
......@@ -181,6 +181,10 @@ struct _RhythmDBPrivate
GList *deleted_entries_to_emit;
GHashTable *changed_entries_to_emit;
GList *barriers_done;
GMutex barrier_mutex;
GCond barrier_condition;
gboolean can_save;
gboolean saving;
gboolean dirty;
......@@ -206,7 +210,8 @@ typedef struct
RHYTHMDB_EVENT_THREAD_EXITED,
RHYTHMDB_EVENT_DB_SAVED,
RHYTHMDB_EVENT_QUERY_COMPLETE,
RHYTHMDB_EVENT_ENTRY_SET
RHYTHMDB_EVENT_ENTRY_SET,
RHYTHMDB_EVENT_BARRIER
} type;
RBRefString *uri;
RBRefString *real_uri; /* Target of a symlink, if any */
......
......@@ -1001,6 +1001,8 @@ rhythmdb_event_free (RhythmDB *db,
case RHYTHMDB_EVENT_METADATA_CACHE:
free_cached_metadata(&result->cached_metadata);
break;
case RHYTHMDB_EVENT_BARRIER:
break;
}
if (result->error)
g_error_free (result->error);
......@@ -1547,6 +1549,28 @@ rhythmdb_commit_internal (RhythmDB *db,
gboolean sync_changes,
GThread *thread)
{
/*
* during normal operation, if committing from a worker thread,
* wait for changes made on the thread to be processed by the main thread.
* this avoids races and ensures the signals emitted are correct.
*/
if (db->priv->action_thread_running && !rb_is_main_thread ()) {
RhythmDBEvent *event;
event = g_slice_new0 (RhythmDBEvent);
event->db = db;
event->type = RHYTHMDB_EVENT_BARRIER;
g_mutex_lock (&db->priv->barrier_mutex);
rhythmdb_push_event (db, event);
while (g_list_find (db->priv->barriers_done, event) == NULL)
g_cond_wait (&db->priv->barrier_condition, &db->priv->barrier_mutex);
db->priv->barriers_done = g_list_remove (db->priv->barriers_done, event);
g_mutex_unlock (&db->priv->barrier_mutex);
rhythmdb_event_free (db, event);
}
g_mutex_lock (&db->priv->change_mutex);
if (sync_changes) {
......@@ -2626,7 +2650,8 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
((event->type == RHYTHMDB_EVENT_STAT)
|| (event->type == RHYTHMDB_EVENT_METADATA_LOAD)
|| (event->type == RHYTHMDB_EVENT_METADATA_CACHE)
|| (event->type == RHYTHMDB_EVENT_ENTRY_SET))) {
|| (event->type == RHYTHMDB_EVENT_ENTRY_SET)
|| (event->type == RHYTHMDB_EVENT_BARRIER))) {
rb_debug ("Database is read-only, delaying event processing");
g_async_queue_push (db->priv->delayed_write_queue, event);
return;
......@@ -2674,6 +2699,16 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
rb_debug ("processing RHYTHMDB_EVENT_QUERY_COMPLETE");
rhythmdb_read_leave (db);
break;
case RHYTHMDB_EVENT_BARRIER:
rb_debug ("processing RHYTHMDB_EVENT_BARRIER");
g_mutex_lock (&db->priv->barrier_mutex);
db->priv->barriers_done = g_list_prepend (db->priv->barriers_done, event);
g_cond_broadcast (&db->priv->barrier_condition);
g_mutex_unlock (&db->priv->barrier_mutex);
/* freed by the thread waiting on the barrier */
free = FALSE;
break;
}
if (free)
rhythmdb_event_free (db, event);
......
......@@ -402,6 +402,102 @@ START_TEST (test_rhythmdb_deserialisation3)
}
END_TEST
#define BARRIER_TEST_THREADS 10
typedef struct {
RhythmDBEntry *entry;
int committed;
GThread *thread;
} BarrierTestData;
static gpointer
test_worker (gpointer xdata)
{
BarrierTestData *data = xdata;
GValue val = {0, };
const char *str;
rb_debug ("worker thread");
g_value_init (&val, G_TYPE_STRING);
g_value_set_static_string (&val, "Bbb");
rhythmdb_entry_set (db, data->entry, RHYTHMDB_PROP_TITLE, &val);
g_value_unset (&val);
rb_debug ("checking");
str = rhythmdb_entry_get_string (data->entry, RHYTHMDB_PROP_TITLE);
ck_assert_msg (str && (strcmp (str, "Aaa") == 0), "changes should not be visible on worker thread yet");
rb_debug ("committing");
rhythmdb_commit (db);
rb_debug ("checking again");
str = rhythmdb_entry_get_string (data->entry, RHYTHMDB_PROP_TITLE);
ck_assert_msg (str && (strcmp (str, "Bbb") == 0), "changes should be visible on worker thread now");
rb_debug ("done");
data->committed = 1;
return NULL;
}
START_TEST (test_rhythmdb_thread_barrier)
{
BarrierTestData data[BARRIER_TEST_THREADS];
const char *str;
GValue val = {0, };
int i;
for (i = 0; i < BARRIER_TEST_THREADS; i++) {
char *uri;
uri = g_strdup_printf ("file:///bar%d.ogg", i);
data[i].committed = 0;
data[i].entry = rhythmdb_entry_new (db, RHYTHMDB_ENTRY_TYPE_IGNORE, uri);
ck_assert_msg (data[i].entry != NULL, "failed to create entry");
g_value_init (&val, G_TYPE_STRING);
g_value_set_static_string (&val, "Aaa");
rhythmdb_entry_set (db, data[i].entry, RHYTHMDB_PROP_TITLE, &val);
g_value_unset (&val);
g_free (uri);
}
rhythmdb_commit (db);
end_step ();
for (i = 0; i < BARRIER_TEST_THREADS; i++) {
data[i].thread = g_thread_new ("test-worker", test_worker, &data[i]);
}
rb_debug ("letting worker threads run");
g_usleep (G_USEC_PER_SEC / 10);
for (i = 0; i < BARRIER_TEST_THREADS; i++) {
/* worker threads' commits shouldn't finish until we let the event queue run */
ck_assert_msg (data[i].committed == 0, "worker thread should not be able to commit");
str = rhythmdb_entry_get_string (data[i].entry, RHYTHMDB_PROP_TITLE);
ck_assert_msg (str && (strcmp (str, "Aaa") == 0), "worker thread changes should not be visible yet");
}
rb_debug ("processing events from worker threads");
end_step ();
rb_debug ("joining worker threads");
for (i = 0; i < BARRIER_TEST_THREADS; i++) {
g_thread_join (data[i].thread);
}
rb_debug ("checking final state");
for (i = 0; i < BARRIER_TEST_THREADS; i++) {
str = rhythmdb_entry_get_string (data[i].entry, RHYTHMDB_PROP_TITLE);
ck_assert_msg (str && (strcmp (str, "Bbb") == 0), "worker thread changes be visible now");
}
}
START_TEST (test_rhythmdb_podcast_upgrade)
{
RhythmDBEntry *entry;
......@@ -534,6 +630,9 @@ rhythmdb_suite (void)
tcase_add_test (tc_chain, test_rhythmdb_deserialisation3);
/*tcase_add_test (tc_chain, test_rhythmdb_serialisation);*/
/* tests for entry changes and commits from worker threads */
tcase_add_test (tc_chain, test_rhythmdb_thread_barrier);
/* tests for breakable bug fixes */
tcase_add_test (tc_chain, test_rhythmdb_podcast_upgrade);
tcase_add_test (tc_chain, test_rhythmdb_modify_after_delete);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment