Commit f10b6550 authored by Dan Winship's avatar Dan Winship

gio: (belatedly) port gdbus from GSimpleAsyncResult to GTask

https://bugzilla.gnome.org/show_bug.cgi?id=661767
parent e2655cd4
......@@ -34,7 +34,7 @@
#include "gsocketclient.h"
#include "giostream.h"
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#include "glib-private.h"
#include "gdbusprivate.h"
#include "giomodule-priv.h"
......@@ -794,7 +794,6 @@ out:
typedef struct {
gchar *address;
GIOStream *stream;
gchar *guid;
} GetStreamData;
......@@ -802,29 +801,28 @@ static void
get_stream_data_free (GetStreamData *data)
{
g_free (data->address);
if (data->stream != NULL)
g_object_unref (data->stream);
g_free (data->guid);
g_free (data);
}
static void
get_stream_thread_func (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
get_stream_thread_func (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GetStreamData *data;
GError *error;
data = g_simple_async_result_get_op_res_gpointer (res);
GetStreamData *data = task_data;
GIOStream *stream;
GError *error = NULL;
error = NULL;
data->stream = g_dbus_address_get_stream_sync (data->address,
&data->guid,
cancellable,
&error);
if (data->stream == NULL)
g_simple_async_result_take_error (res, error);
stream = g_dbus_address_get_stream_sync (data->address,
&data->guid,
cancellable,
&error);
if (stream)
g_task_return_pointer (task, stream, g_object_unref);
else
g_task_return_error (task, error);
}
/**
......@@ -853,26 +851,18 @@ g_dbus_address_get_stream (const gchar *address,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *res;
GTask *task;
GetStreamData *data;
g_return_if_fail (address != NULL);
res = g_simple_async_result_new (NULL,
callback,
user_data,
g_dbus_address_get_stream);
g_simple_async_result_set_check_cancellable (res, cancellable);
data = g_new0 (GetStreamData, 1);
data->address = g_strdup (address);
g_simple_async_result_set_op_res_gpointer (res,
data,
(GDestroyNotify) get_stream_data_free);
g_simple_async_result_run_in_thread (res,
get_stream_thread_func,
G_PRIORITY_DEFAULT,
cancellable);
g_object_unref (res);
task = g_task_new (NULL, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify) get_stream_data_free);
g_task_run_in_thread (task, get_stream_thread_func);
g_object_unref (task);
}
/**
......@@ -892,26 +882,23 @@ g_dbus_address_get_stream_finish (GAsyncResult *res,
gchar **out_guid,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (res);
GTask *task;
GetStreamData *data;
GIOStream *ret;
g_return_val_if_fail (G_IS_ASYNC_RESULT (res), NULL);
g_return_val_if_fail (g_task_is_valid (res, NULL), NULL);
g_return_val_if_fail (error == NULL || *error == NULL, NULL);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_dbus_address_get_stream);
task = G_TASK (res);
ret = g_task_propagate_pointer (task, error);
ret = NULL;
data = g_simple_async_result_get_op_res_gpointer (simple);
if (g_simple_async_result_propagate_error (simple, error))
goto out;
ret = g_object_ref (data->stream);
if (out_guid != NULL)
*out_guid = g_strdup (data->guid);
if (ret != NULL && out_guid != NULL)
{
data = g_task_get_task_data (task);
*out_guid = data->guid;
data->guid = NULL;
}
out:
return ret;
}
......
This diff is collapsed.
......@@ -29,8 +29,7 @@
#include "gdbusmessage.h"
#include "gdbuserror.h"
#include "gdbusintrospection.h"
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#include "ginputstream.h"
#include "gmemoryinputstream.h"
#include "giostream.h"
......@@ -99,28 +98,17 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
typedef struct
{
GSocket *socket;
GCancellable *cancellable;
void *buffer;
gsize count;
GSocketControlMessage ***messages;
gint *num_messages;
GSimpleAsyncResult *simple;
gboolean from_mainloop;
} ReadWithControlData;
static void
read_with_control_data_free (ReadWithControlData *data)
{
g_object_unref (data->socket);
if (data->cancellable != NULL)
g_object_unref (data->cancellable);
g_object_unref (data->simple);
g_free (data);
g_slice_free (ReadWithControlData, data);
}
static gboolean
......@@ -128,7 +116,8 @@ _g_socket_read_with_control_messages_ready (GSocket *socket,
GIOCondition condition,
gpointer user_data)
{
ReadWithControlData *data = user_data;
GTask *task = user_data;
ReadWithControlData *data = g_task_get_task_data (task);
GError *error;
gssize result;
GInputVector vector;
......@@ -136,29 +125,28 @@ _g_socket_read_with_control_messages_ready (GSocket *socket,
error = NULL;
vector.buffer = data->buffer;
vector.size = data->count;
result = g_socket_receive_message (data->socket,
result = g_socket_receive_message (socket,
NULL, /* address */
&vector,
1,
data->messages,
data->num_messages,
NULL,
data->cancellable,
g_task_get_cancellable (task),
&error);
if (result >= 0)
{
g_simple_async_result_set_op_res_gssize (data->simple, result);
}
else
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
g_assert (error != NULL);
g_simple_async_result_take_error (data->simple, error);
g_error_free (error);
return TRUE;
}
if (data->from_mainloop)
g_simple_async_result_complete (data->simple);
g_assert (result >= 0 || error != NULL);
if (result >= 0)
g_task_return_int (task, result);
else
g_simple_async_result_complete_in_idle (data->simple);
g_task_return_error (task, error);
g_object_unref (task);
return FALSE;
}
......@@ -174,41 +162,30 @@ _g_socket_read_with_control_messages (GSocket *socket,
GAsyncReadyCallback callback,
gpointer user_data)
{
GTask *task;
ReadWithControlData *data;
GSource *source;
data = g_new0 (ReadWithControlData, 1);
data->socket = g_object_ref (socket);
data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
data = g_slice_new0 (ReadWithControlData);
data->buffer = buffer;
data->count = count;
data->messages = messages;
data->num_messages = num_messages;
data->simple = g_simple_async_result_new (G_OBJECT (socket),
callback,
user_data,
_g_socket_read_with_control_messages);
g_simple_async_result_set_check_cancellable (data->simple, cancellable);
task = g_task_new (socket, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify) read_with_control_data_free);
if (!g_socket_condition_check (socket, G_IO_IN))
if (g_socket_condition_check (socket, G_IO_IN))
{
GSource *source;
data->from_mainloop = TRUE;
source = g_socket_create_source (data->socket,
G_IO_IN | G_IO_HUP | G_IO_ERR,
cancellable);
g_source_set_callback (source,
(GSourceFunc) _g_socket_read_with_control_messages_ready,
data,
(GDestroyNotify) read_with_control_data_free);
g_source_attach (source, g_main_context_get_thread_default ());
g_source_unref (source);
}
else
{
_g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data);
read_with_control_data_free (data);
if (!_g_socket_read_with_control_messages_ready (socket, G_IO_IN, task))
return;
}
source = g_socket_create_source (socket,
G_IO_IN | G_IO_HUP | G_IO_ERR,
cancellable);
g_task_attach_source (task, source, (GSourceFunc) _g_socket_read_with_control_messages_ready);
g_source_unref (source);
}
static gssize
......@@ -216,15 +193,10 @@ _g_socket_read_with_control_messages_finish (GSocket *socket,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
g_return_val_if_fail (G_IS_SOCKET (socket), -1);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages);
g_return_val_if_fail (g_task_is_valid (result, socket), -1);
if (g_simple_async_result_propagate_error (simple, error))
return -1;
else
return g_simple_async_result_get_op_res_gssize (simple);
return g_task_propagate_int (G_TASK (result), error);
}
/* ---------------------------------------------------------------------------------------------------- */
......@@ -252,7 +224,7 @@ ensure_required_types (void)
{
g_assert (ensured_classes == NULL);
ensured_classes = g_ptr_array_new ();
ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
ensure_type (G_TYPE_TASK);
ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
}
/* ---------------------------------------------------------------------------------------------------- */
......@@ -422,17 +394,12 @@ static void write_message_print_transport_debug (gssize bytes_written,
typedef struct {
GDBusWorker *worker;
GCancellable *cancellable;
GSimpleAsyncResult *result;
GTask *task;
} CloseData;
static void close_data_free (CloseData *close_data)
{
if (close_data->cancellable != NULL)
g_object_unref (close_data->cancellable);
if (close_data->result != NULL)
g_object_unref (close_data->result);
g_clear_object (&close_data->task);
_g_dbus_worker_unref (close_data->worker);
g_slice_free (CloseData, close_data);
......@@ -890,9 +857,8 @@ struct _MessageToWriteData
gchar *blob;
gsize blob_size;
gsize total_written;
GSimpleAsyncResult *simple;
gsize total_written;
GTask *task;
};
static void
......@@ -902,7 +868,7 @@ message_to_write_data_free (MessageToWriteData *data)
if (data->message)
g_object_unref (data->message);
g_free (data->blob);
g_free (data);
g_slice_free (MessageToWriteData, data);
}
/* ---------------------------------------------------------------------------------------------------- */
......@@ -920,14 +886,14 @@ write_message_async_cb (GObject *source_object,
gpointer user_data)
{
MessageToWriteData *data = user_data;
GSimpleAsyncResult *simple;
GTask *task;
gssize bytes_written;
GError *error;
/* Note: we can't access data->simple after calling g_async_result_complete () because the
/* Note: we can't access data->task after calling g_task_return_* () because the
* callback can free @data and we're not completing in idle. So use a copy of the pointer.
*/
simple = data->simple;
task = data->task;
error = NULL;
bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
......@@ -935,9 +901,8 @@ write_message_async_cb (GObject *source_object,
&error);
if (bytes_written == -1)
{
g_simple_async_result_take_error (simple, error);
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_error (task, error);
g_object_unref (task);
goto out;
}
g_assert (bytes_written > 0); /* zero is never returned */
......@@ -948,8 +913,8 @@ write_message_async_cb (GObject *source_object,
g_assert (data->total_written <= data->blob_size);
if (data->total_written == data->blob_size)
{
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
goto out;
}
......@@ -986,15 +951,15 @@ write_message_continue_writing (MessageToWriteData *data)
{
GOutputStream *ostream;
#ifdef G_OS_UNIX
GSimpleAsyncResult *simple;
GTask *task;
GUnixFDList *fd_list;
#endif
#ifdef G_OS_UNIX
/* Note: we can't access data->simple after calling g_async_result_complete () because the
/* Note: we can't access data->task after calling g_task_return_* () because the
* callback can free @data and we're not completing in idle. So use a copy of the pointer.
*/
simple = data->simple;
task = data->task;
#endif
ostream = g_io_stream_get_output_stream (data->worker->stream);
......@@ -1024,12 +989,11 @@ write_message_continue_writing (MessageToWriteData *data)
{
if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
{
g_simple_async_result_set_error (simple,
G_IO_ERROR,
G_IO_ERROR_FAILED,
"Tried sending a file descriptor but remote peer does not support this capability");
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_new_error (task,
G_IO_ERROR,
G_IO_ERROR_FAILED,
"Tried sending a file descriptor but remote peer does not support this capability");
g_object_unref (task);
goto out;
}
control_message = g_unix_fd_message_new_with_fd_list (fd_list);
......@@ -1066,9 +1030,8 @@ write_message_continue_writing (MessageToWriteData *data)
g_error_free (error);
goto out;
}
g_simple_async_result_take_error (simple, error);
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_error (task, error);
g_object_unref (task);
goto out;
}
g_assert (bytes_written > 0); /* zero is never returned */
......@@ -1079,8 +1042,8 @@ write_message_continue_writing (MessageToWriteData *data)
g_assert (data->total_written <= data->blob_size);
if (data->total_written == data->blob_size)
{
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
goto out;
}
......@@ -1092,13 +1055,12 @@ write_message_continue_writing (MessageToWriteData *data)
#ifdef G_OS_UNIX
if (fd_list != NULL)
{
g_simple_async_result_set_error (simple,
G_IO_ERROR,
G_IO_ERROR_FAILED,
"Tried sending a file descriptor on unsupported stream of type %s",
g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_new_error (task,
G_IO_ERROR,
G_IO_ERROR_FAILED,
"Tried sending a file descriptor on unsupported stream of type %s",
g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
g_object_unref (task);
goto out;
}
#endif
......@@ -1128,10 +1090,7 @@ write_message_async (GDBusWorker *worker,
GAsyncReadyCallback callback,
gpointer user_data)
{
data->simple = g_simple_async_result_new (NULL,
callback,
user_data,
write_message_async);
data->task = g_task_new (NULL, NULL, callback, user_data);
data->total_written = 0;
write_message_continue_writing (data);
}
......@@ -1141,11 +1100,9 @@ static gboolean
write_message_finish (GAsyncResult *res,
GError **error)
{
g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
return FALSE;
else
return TRUE;
g_return_val_if_fail (g_task_is_valid (res, NULL), FALSE);
return g_task_propagate_boolean (G_TASK (res), error);
}
/* ---------------------------------------------------------------------------------------------------- */
......@@ -1398,15 +1355,12 @@ iostream_close_cb (GObject *source_object,
pending_close_attempts = g_list_delete_link (pending_close_attempts,
pending_close_attempts);
if (close_data->result != NULL)
if (close_data->task != NULL)
{
if (error != NULL)
g_simple_async_result_set_from_error (close_data->result, error);
/* this must be in an idle because the result is likely to be
* intended for another thread
*/
g_simple_async_result_complete_in_idle (close_data->result);
g_task_return_error (close_data->task, g_error_copy (error));
else
g_task_return_boolean (close_data->task, TRUE);
}
close_data_free (close_data);
......@@ -1640,7 +1594,7 @@ _g_dbus_worker_send_message (GDBusWorker *worker,
g_return_if_fail (blob != NULL);
g_return_if_fail (blob_len > 16);
data = g_new0 (MessageToWriteData, 1);
data = g_slice_new0 (MessageToWriteData);
data->worker = _g_dbus_worker_ref (worker);
data->message = g_object_ref (message);
data->blob = blob; /* steal! */
......@@ -1717,16 +1671,13 @@ _g_dbus_worker_new (GIOStream *stream,
*/
void
_g_dbus_worker_close (GDBusWorker *worker,
GCancellable *cancellable,
GSimpleAsyncResult *result)
GTask *task)
{
CloseData *close_data;
close_data = g_slice_new0 (CloseData);
close_data->worker = _g_dbus_worker_ref (worker);
close_data->cancellable =
(cancellable == NULL ? NULL : g_object_ref (cancellable));
close_data->result = (result == NULL ? NULL : g_object_ref (result));
close_data->task = (task == NULL ? NULL : g_object_ref (task));
/* Don't set worker->close_expected here - we're in the wrong thread.
* It'll be set before the actual close happens.
......@@ -1752,7 +1703,7 @@ _g_dbus_worker_stop (GDBusWorker *worker)
/* Cancel any pending operations and schedule a close of the underlying I/O
* stream in the worker thread
*/
_g_dbus_worker_close (worker, NULL, NULL);
_g_dbus_worker_close (worker, NULL);
/* _g_dbus_worker_close holds a ref until after an idle in the worker
* thread has run, so we no longer need to unref in an idle like in
......
......@@ -76,8 +76,7 @@ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker,
/* can be called from any thread */
void _g_dbus_worker_close (GDBusWorker *worker,
GCancellable *cancellable,
GSimpleAsyncResult *result);
GTask *task);
/* ---------------------------------------------------------------------------------------------------- */
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment