Commit 8097e2de authored by Simon McVittie's avatar Simon McVittie Committed by David Zeuthen

GDBusConnection: delegate to the worker to close the stream

We can't safely close the output part of the I/O stream until any
pending write or flush has been completed. In the worst case, this could
lead to an assertion failure in the worker (when the close wins the
race) or not closing the stream at all (when the write wins the race).

Bug: https://bugzilla.gnome.org/show_bug.cgi?id=651268
Bug-NB: NB#271520
Signed-off-by: Simon McVittie's avatarSimon McVittie <simon.mcvittie@collabora.co.uk>
Signed-off-by: default avatarDavid Zeuthen <davidz@redhat.com>
parent a8f75f21
...@@ -513,12 +513,6 @@ g_dbus_connection_finalize (GObject *object) ...@@ -513,12 +513,6 @@ g_dbus_connection_finalize (GObject *object)
if (connection->stream != NULL) if (connection->stream != NULL)
{ {
/* We don't really care if closing the stream succeeds or not */
g_io_stream_close_async (connection->stream,
G_PRIORITY_DEFAULT,
NULL, /* GCancellable */
NULL, /* GAsyncReadyCallback */
NULL); /* userdata */
g_object_unref (connection->stream); g_object_unref (connection->stream);
connection->stream = NULL; connection->stream = NULL;
} }
...@@ -1225,20 +1219,6 @@ set_closed_unlocked (GDBusConnection *connection, ...@@ -1225,20 +1219,6 @@ set_closed_unlocked (GDBusConnection *connection,
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
static void
close_in_thread_func (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
{
GError *error;
error = NULL;
if (!g_dbus_connection_close_sync (G_DBUS_CONNECTION (object),
cancellable,
&error))
g_simple_async_result_take_error (res, error);
}
/** /**
* g_dbus_connection_close: * g_dbus_connection_close:
* @connection: A #GDBusConnection. * @connection: A #GDBusConnection.
...@@ -1288,10 +1268,7 @@ g_dbus_connection_close (GDBusConnection *connection, ...@@ -1288,10 +1268,7 @@ g_dbus_connection_close (GDBusConnection *connection,
callback, callback,
user_data, user_data,
g_dbus_connection_close); g_dbus_connection_close);
g_simple_async_result_run_in_thread (simple, _g_dbus_worker_close (connection->worker, cancellable, simple);
close_in_thread_func,
G_PRIORITY_DEFAULT,
cancellable);
g_object_unref (simple); g_object_unref (simple);
} }
...@@ -1332,6 +1309,22 @@ g_dbus_connection_close_finish (GDBusConnection *connection, ...@@ -1332,6 +1309,22 @@ g_dbus_connection_close_finish (GDBusConnection *connection,
return ret; return ret;
} }
typedef struct {
GMainLoop *loop;
GAsyncResult *result;
} SyncCloseData;
static void
sync_close_cb (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
SyncCloseData *data = user_data;
data->result = g_object_ref (res);
g_main_loop_quit (data->loop);
}
/** /**
* g_dbus_connection_close_sync: * g_dbus_connection_close_sync:
* @connection: A #GDBusConnection. * @connection: A #GDBusConnection.
...@@ -1362,11 +1355,24 @@ g_dbus_connection_close_sync (GDBusConnection *connection, ...@@ -1362,11 +1355,24 @@ g_dbus_connection_close_sync (GDBusConnection *connection,
CONNECTION_LOCK (connection); CONNECTION_LOCK (connection);
if (!connection->closed) if (!connection->closed)
{ {
ret = g_io_stream_close (connection->stream, GMainContext *context;
cancellable, SyncCloseData data;
error);
if (ret) context = g_main_context_new ();
set_closed_unlocked (connection, FALSE, NULL); g_main_context_push_thread_default (context);
data.loop = g_main_loop_new (context, TRUE);
data.result = NULL;
CONNECTION_UNLOCK (connection);
g_dbus_connection_close (connection, cancellable, sync_close_cb, &data);
g_main_loop_run (data.loop);
ret = g_dbus_connection_close_finish (connection, data.result, error);
CONNECTION_LOCK (connection);
g_object_unref (data.result);
g_main_loop_unref (data.loop);
g_main_context_pop_thread_default (context);
g_main_context_unref (context);
} }
else else
{ {
......
...@@ -370,7 +370,7 @@ struct GDBusWorker ...@@ -370,7 +370,7 @@ struct GDBusWorker
GSocketControlMessage **read_ancillary_messages; GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages; gint read_num_ancillary_messages;
/* TRUE if an async write or flush is pending. /* TRUE if an async write, flush or close is pending.
* Only the worker thread may change its value, and only with the write_lock. * Only the worker thread may change its value, and only with the write_lock.
* Other threads may read its value when holding the write_lock. * Other threads may read its value when holding the write_lock.
* The worker thread may read its value at any time. * The worker thread may read its value at any time.
...@@ -381,8 +381,12 @@ struct GDBusWorker ...@@ -381,8 +381,12 @@ struct GDBusWorker
GQueue *write_queue; GQueue *write_queue;
guint64 write_num_messages_written; guint64 write_num_messages_written;
GList *write_pending_flushes; GList *write_pending_flushes;
/* list of CloseData */
GList *pending_close_attempts;
}; };
static void _g_dbus_worker_unref (GDBusWorker *worker);
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
typedef struct typedef struct
...@@ -404,6 +408,24 @@ static void read_message_print_transport_debug (gssize bytes_read, ...@@ -404,6 +408,24 @@ static void read_message_print_transport_debug (gssize bytes_read,
static void write_message_print_transport_debug (gssize bytes_written, static void write_message_print_transport_debug (gssize bytes_written,
MessageToWriteData *data); MessageToWriteData *data);
typedef struct {
GDBusWorker *worker;
GCancellable *cancellable;
GSimpleAsyncResult *result;
} CloseData;
static void close_data_free (CloseData *close_data)
{
if (close_data->cancellable != NULL)
g_object_unref (close_data->cancellable);
if (close_data->result != NULL)
g_object_unref (close_data->result);
_g_dbus_worker_unref (close_data->worker);
g_slice_free (CloseData, close_data);
}
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
static GDBusWorker * static GDBusWorker *
...@@ -1064,6 +1086,24 @@ typedef struct ...@@ -1064,6 +1086,24 @@ typedef struct
GList *flushers; GList *flushers;
} FlushAsyncData; } FlushAsyncData;
static void
flush_data_list_complete (const GList *flushers,
const GError *error)
{
const GList *l;
for (l = flushers; l != NULL; l = l->next)
{
FlushData *f = l->data;
f->error = error != NULL ? g_error_copy (error) : NULL;
g_mutex_lock (f->mutex);
g_cond_signal (f->cond);
g_mutex_unlock (f->mutex);
}
}
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void static void
ostream_flush_cb (GObject *source_object, ostream_flush_cb (GObject *source_object,
...@@ -1072,7 +1112,6 @@ ostream_flush_cb (GObject *source_object, ...@@ -1072,7 +1112,6 @@ ostream_flush_cb (GObject *source_object,
{ {
FlushAsyncData *data = user_data; FlushAsyncData *data = user_data;
GError *error; GError *error;
GList *l;
error = NULL; error = NULL;
g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
...@@ -1093,16 +1132,7 @@ ostream_flush_cb (GObject *source_object, ...@@ -1093,16 +1132,7 @@ ostream_flush_cb (GObject *source_object,
} }
g_assert (data->flushers != NULL); g_assert (data->flushers != NULL);
for (l = data->flushers; l != NULL; l = l->next) flush_data_list_complete (data->flushers, error);
{
FlushData *f = l->data;
f->error = error != NULL ? g_error_copy (error) : NULL;
g_mutex_lock (f->mutex);
g_cond_signal (f->cond);
g_mutex_unlock (f->mutex);
}
g_list_free (data->flushers); g_list_free (data->flushers);
if (error != NULL) if (error != NULL)
...@@ -1225,6 +1255,76 @@ write_message_cb (GObject *source_object, ...@@ -1225,6 +1255,76 @@ write_message_cb (GObject *source_object,
message_to_write_data_free (data); message_to_write_data_free (data);
} }
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
*/
static void
iostream_close_cb (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
GDBusWorker *worker = user_data;
GError *error = NULL;
GList *pending_close_attempts, *pending_flush_attempts;
GQueue *send_queue;
g_io_stream_close_finish (worker->stream, res, &error);
g_mutex_lock (worker->write_lock);
pending_close_attempts = worker->pending_close_attempts;
worker->pending_close_attempts = NULL;
pending_flush_attempts = worker->write_pending_flushes;
worker->write_pending_flushes = NULL;
send_queue = worker->write_queue;
worker->write_queue = g_queue_new ();
g_assert (worker->output_pending);
worker->output_pending = FALSE;
g_mutex_unlock (worker->write_lock);
while (pending_close_attempts != NULL)
{
CloseData *close_data = pending_close_attempts->data;
pending_close_attempts = g_list_delete_link (pending_close_attempts,
pending_close_attempts);
if (close_data->result != NULL)
{
if (error != NULL)
g_simple_async_result_set_from_error (close_data->result, error);
/* this must be in an idle because the result is likely to be
* intended for another thread
*/
g_simple_async_result_complete_in_idle (close_data->result);
}
close_data_free (close_data);
}
g_clear_error (&error);
/* all messages queued for sending are discarded */
g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
g_queue_free (send_queue);
/* all queued flushes fail */
error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
_("Operation was cancelled"));
flush_data_list_complete (pending_flush_attempts, error);
g_list_free (pending_flush_attempts);
g_clear_error (&error);
_g_dbus_worker_unref (worker);
}
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void static void
maybe_write_next_message (GDBusWorker *worker) maybe_write_next_message (GDBusWorker *worker)
...@@ -1236,9 +1336,25 @@ maybe_write_next_message (GDBusWorker *worker) ...@@ -1236,9 +1336,25 @@ maybe_write_next_message (GDBusWorker *worker)
g_assert (!worker->output_pending); g_assert (!worker->output_pending);
g_mutex_lock (worker->write_lock); g_mutex_lock (worker->write_lock);
data = g_queue_pop_head (worker->write_queue);
if (data != NULL) /* if we want to close the connection, that takes precedence */
worker->output_pending = TRUE; if (worker->pending_close_attempts != NULL)
{
worker->output_pending = TRUE;
g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
NULL, iostream_close_cb,
_g_dbus_worker_ref (worker));
data = NULL;
}
else
{
data = g_queue_pop_head (worker->write_queue);
if (data != NULL)
worker->output_pending = TRUE;
}
g_mutex_unlock (worker->write_lock); g_mutex_unlock (worker->write_lock);
/* Note that write_lock is only used for protecting the @write_queue /* Note that write_lock is only used for protecting the @write_queue
...@@ -1319,6 +1435,45 @@ write_message_in_idle_cb (gpointer user_data) ...@@ -1319,6 +1435,45 @@ write_message_in_idle_cb (gpointer user_data)
return FALSE; return FALSE;
} }
/*
* @write_data: (transfer full) (allow-none):
* @close_data: (transfer full) (allow-none):
*
* Can be called from any thread
*
* write_lock is not held on entry
* output_pending may be true or false
*/
static void
schedule_write_in_worker_thread (GDBusWorker *worker,
MessageToWriteData *write_data,
CloseData *close_data)
{
g_mutex_lock (worker->write_lock);
if (write_data != NULL)
g_queue_push_tail (worker->write_queue, write_data);
if (close_data != NULL)
worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
close_data);
if (!worker->output_pending)
{
GSource *idle_source;
idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
write_message_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
}
g_mutex_unlock (worker->write_lock);
}
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
/* can be called from any thread - steals blob */ /* can be called from any thread - steals blob */
...@@ -1340,21 +1495,7 @@ _g_dbus_worker_send_message (GDBusWorker *worker, ...@@ -1340,21 +1495,7 @@ _g_dbus_worker_send_message (GDBusWorker *worker,
data->blob = blob; /* steal! */ data->blob = blob; /* steal! */
data->blob_size = blob_len; data->blob_size = blob_len;
g_mutex_lock (worker->write_lock); schedule_write_in_worker_thread (worker, data, NULL);
g_queue_push_tail (worker->write_queue, data);
if (!worker->output_pending)
{
GSource *idle_source;
idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
write_message_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
}
g_mutex_unlock (worker->write_lock);
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
...@@ -1415,13 +1556,26 @@ _g_dbus_worker_new (GIOStream *stream, ...@@ -1415,13 +1556,26 @@ _g_dbus_worker_new (GIOStream *stream,
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */ /* can be called from any thread
static gboolean *
unref_in_idle_cb (gpointer user_data) * write_lock is not held on entry
* output_pending may be true or false
*/
void
_g_dbus_worker_close (GDBusWorker *worker,
GCancellable *cancellable,
GSimpleAsyncResult *result)
{ {
GDBusWorker *worker = user_data; CloseData *close_data;
_g_dbus_worker_unref (worker);
return FALSE; close_data = g_slice_new0 (CloseData);
close_data->worker = _g_dbus_worker_ref (worker);
close_data->cancellable =
(cancellable == NULL ? NULL : g_object_ref (cancellable));
close_data->result = (result == NULL ? NULL : g_object_ref (result));
g_cancellable_cancel (worker->cancellable);
schedule_write_in_worker_thread (worker, NULL, close_data);
} }
/* This can be called from any thread - frees worker. Note that /* This can be called from any thread - frees worker. Note that
...@@ -1431,19 +1585,18 @@ unref_in_idle_cb (gpointer user_data) ...@@ -1431,19 +1585,18 @@ unref_in_idle_cb (gpointer user_data)
void void
_g_dbus_worker_stop (GDBusWorker *worker) _g_dbus_worker_stop (GDBusWorker *worker)
{ {
GSource *idle_source;
worker->stopped = TRUE; worker->stopped = TRUE;
g_cancellable_cancel (worker->cancellable);
idle_source = g_idle_source_new (); /* Cancel any pending operations and schedule a close of the underlying I/O
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); * stream in the worker thread
g_source_set_callback (idle_source, */
unref_in_idle_cb, _g_dbus_worker_close (worker, NULL, NULL);
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref); /* _g_dbus_worker_close holds a ref until after an idle in the the worker
g_source_attach (idle_source, worker->shared_thread_data->context); * thread has run, so we no longer need to unref in an idle like in
g_source_unref (idle_source); * commit 322e25b535
*/
_g_dbus_worker_unref (worker);
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
......
...@@ -76,6 +76,11 @@ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker, ...@@ -76,6 +76,11 @@ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker,
GCancellable *cancellable, GCancellable *cancellable,
GError **error); GError **error);
/* can be called from any thread */
void _g_dbus_worker_close (GDBusWorker *worker,
GCancellable *cancellable,
GSimpleAsyncResult *result);
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
void _g_dbus_initialize (void); void _g_dbus_initialize (void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment