Commit 6b9b5a28 authored by Christian Hergert's avatar Christian Hergert

threading: add IdeTask as basis for new tasking design

We are going to have to move away from GTask, at least in the short term
because it cannot provide some fundamental ownership and life-cycle
guarantees we require.

This is the start of a new IdeTask replacement which allows us to ensure
that task data, source objects, and results are finalized only within the
GMainContext attached to the task.

It also starts adding some new convenience API that we want for propagating
results between multiple tasks.
parent 3a82c080
......@@ -20,6 +20,7 @@
#include "sourceview/ide-source-view.h"
#include "symbols/ide-symbol.h"
#include "testing/ide-test.h"
#include "threading/ide-task.h"
#include "threading/ide-thread-pool.h"
#include "transfers/ide-transfer.h"
#include "vcs/ide-vcs-config.h"
......@@ -58,3 +59,4 @@ GType
/*** END file-tail ***/
......@@ -174,6 +174,7 @@ G_BEGIN_DECLS
#include "testing/ide-test.h"
#include "testing/ide-test-manager.h"
#include "testing/ide-test-provider.h"
#include "threading/ide-task.h"
#include "threading/ide-thread-pool.h"
#include "terminal/ide-terminal.h"
#include "terminal/ide-terminal-search.h"
......
/* ide-task.c
*
* Copyright 2018 Christian Hergert
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#define G_LOG_DOMAIN "ide-task"
#include <dazzle.h>
#include "threading/ide-task.h"
#include "threading/ide-thread-pool.h"
/**
* SECTION:ide-task
* @title: IdeTask
* @short_description: asynchronous task management
*
* #IdeTask is meant to be an improved form of #GTask. There are a few
* deficiencies in #GTask that have made it unsuitable for certain use cases.
*
* #GTask does not provide a way to guarnatee that the source object,
* task data, and unused results are freed with in a given #GMainContext.
* #IdeTask addresses this by having a more flexible result and object
* ownership control.
*
* Furthermore, #IdeTask allows consumers to force disposal from a given
* thread so that the data is released there.
*
* #IdeTask also supports chaining tasks together which makes it simpler
* to avoid doing duplicate work by instead simply chaining the tasks.
*
* There are some costs to this design. It uses the main context a bit
* more than #GTask may use it. Also, an equivalent to
* g_task_get_source_object() cannot be provided because we do not control
* the exact lifecycle of that object. g_async_result_get_source_object()
* may be used which provides a full reference instead of a borrowed
* reference.
*
* #IdeTask uses it's own #GThreadPool, however the threads are not exclusive
* to increase the chances of thread sharing with other pools.
*
* The number of threads in the pool is equivalent to N_CPU*2+1.
*
* Because #IdeTask needs more control over result life-cycles (for chaining
* results), additional return methods have been provided. Consumers should
* use ide_task_return_boxed() when working with boxed types as it allows us
* to copy the result to another task. Additionally, ide_task_return_object()
* provides a simplified API over ide_task_return_pointer() which also allows
* copying the result to chained tasks.
*
* Since: 3.28
*/
typedef struct
{
/*
* The pointer we were provided.
*/
gpointer data;
/*
* The destroy notify for @data. We should only call this from the
* main context associated with the task.
*/
GDestroyNotify data_destroy;
} IdeTaskData;
typedef enum
{
IDE_TASK_RESULT_NONE,
IDE_TASK_RESULT_CANCELLED,
IDE_TASK_RESULT_BOOLEAN,
IDE_TASK_RESULT_INT,
IDE_TASK_RESULT_ERROR,
IDE_TASK_RESULT_OBJECT,
IDE_TASK_RESULT_BOXED,
IDE_TASK_RESULT_POINTER,
} IdeTaskResultType;
typedef struct
{
/*
* The type of result stored in our union @u.
*/
IdeTaskResultType type;
/*
* To ensure that we can pass ownership back to the main context
* from our worker thread, we need to be able to stash the reference
* here in our result. It is also convenient as we need access to it
* from the main context callback anyway.
*/
IdeTask *task;
/*
* Additionally, we need to allow passing our main context reference
* back so that it cannot be finalized in our thread.
*/
GMainContext *main_context;
/*
* Priority for our GSource attached to @main_context.
*/
gint priority;
/*
* The actual result information, broken down by result @type.
*/
union {
gboolean v_bool;
gssize v_int;
GError *v_error;
GObject *v_object;
struct {
GType type;
gpointer pointer;
} v_boxed;
struct {
gpointer pointer;
GDestroyNotify destroy;
} v_pointer;
} u;
} IdeTaskResult;
typedef struct
{
IdeTask *task;
GMainContext *main_context;
gint priority;
} IdeTaskCancel;
typedef struct
{
/*
* Controls access to our private data. We only access structure
* data while holding this mutex to ensure that we have consistency
* between threads which could be accessing internals.
*/
GMutex mutex;
/*
* The source object for the GAsyncResult interface. If we have set
* release_on_propagate, this will be released when the task propagate
* function is called.
*/
gpointer source_object;
/*
* The cancellable that we're monitoring for task cancellation.
*/
GCancellable *cancellable;
/*
* If ide_task_set_return_on_cancel() has been set, then we might be
* listening for changes. Handling this will queue a completion
*/
gulong cancel_handler;
/*
* The callback to execute upon completion of the operation. It will
* be called from @main_contect after the operation completes.
*/
GAsyncReadyCallback callback;
gpointer user_data;
/*
* The name for the task. This string is interned so you should not
* use dynamic names. They are meant to simplify the process of
* debugging what task failed.
*/
const gchar *name;
/*
* The GMainContext that was the thread default when the task was
* created. Most operations are proxied back to this context so that
* the consumer does not need to worry about thread safety.
*/
GMainContext *main_context;
/*
* The task data that has been set for the task. Task data is released
* from a callback in the #GMainContext if changed outside the main
* context.
*/
IdeTaskData *task_data;
/*
* The result for the task. If release_on_propagate as set to %FALSE,
* then this may be kept around so that ide_task_chain() can be used to
* duplicate the result to another task. This is convenient when multiple
* async funcs race to do some work, allowing just a single winner with all
* the callers getting the same result.
*/
IdeTaskResult *result;
/*
* ide_task_chain() allows us to propagate the result of this task to
* another task (for a limited number of result types). This is the
* list of those tasks.
*/
GPtrArray *chained;
/*
* If ide_task_run_in_thread() is called, this will be set to the func
* that should be called from within the thread.
*/
IdeTaskThreadFunc thread_func;
/*
* If we're running in a thread, we'll stash the value here until we
* can complete things cleanly and pass ownership back as one operation.
*/
IdeTaskResult *thread_result;
/*
* The source tag for the task, which can be used to determine what
* the task is from a debugger as well as to verify correctness
* in async finish functions.
*/
gpointer source_tag;
/*
* Our priority for scheduling tasks in the particular workqueue.
*/
gint priority;
/*
* While we're waiting for our return callback, this is set to our
* source id. We use that to know we need to block on the main loop
* in case the user calls ide_task_propagate_*() synchronously without
* round-triping to the main loop.
*/
guint return_source;
/*
* Our kind of task, which is used to determine what thread pool we
* can use when running threaded work. This can be used to help choke
* lots of work down to a relatively small number of threads.
*/
IdeTaskKind kind : 8;
/*
* If the task has been completed, which is to say that the callback
* dispatch has occurred in @main_context.
*/
guint completed : 1;
/*
* If we should check @cancellable before returning the result. If set
* to true, and the cancellable was cancelled, an error will be returned
* even if the task completed successfully.
*/
guint check_cancellable : 1;
/*
* If we should synthesize completion from a GCancellable::cancelled
* event instead of waiting for the task to complete normally.
*/
guint return_on_cancel : 1;
/*
* If we should release the source object and task data after we've
* dispatched the callback (or the callback was NULL). This allows us
* to ensure that various dependent data are released in the main
* context. This is the default and helps ensure thread-safety.
*/
guint release_on_propagate : 1;
/*
* Protect against multiple return calls, and given the developer a good
* warning so they catch this early.
*/
guint return_called : 1;
/*
* If we got a result that was a cancellation, then we mark it here so
* that we can deal with it cleanly later.
*/
guint got_cancel : 1;
/*
* If we have dispatched to a thread already.
*/
guint thread_called : 1;
} IdeTaskPrivate;
static void async_result_init_iface (GAsyncResultIface *iface);
static void ide_task_data_free (IdeTaskData *task_data);
static void ide_task_result_free (IdeTaskResult *result);
static gboolean ide_task_return_cb (gpointer user_data);
static void ide_task_release (IdeTask *self,
gboolean force);
G_DEFINE_AUTOPTR_CLEANUP_FUNC (IdeTaskData, ide_task_data_free);
G_DEFINE_AUTOPTR_CLEANUP_FUNC (IdeTaskResult, ide_task_result_free);
DZL_DEFINE_COUNTER (instances, "Tasks", "Instances", "Number of active tasks")
G_DEFINE_TYPE_WITH_CODE (IdeTask, ide_task, G_TYPE_OBJECT,
G_ADD_PRIVATE (IdeTask)
G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_RESULT, async_result_init_iface))
enum {
PROP_0,
PROP_COMPLETED,
N_PROPS
};
static GParamSpec *properties [N_PROPS];
static void
ide_task_cancel_free (IdeTaskCancel *cancel)
{
g_clear_pointer (&cancel->main_context, g_main_context_unref);
g_clear_object (&cancel->task);
g_slice_free (IdeTaskCancel, cancel);
}
static const gchar *
result_type_name (IdeTaskResultType type)
{
switch (type)
{
case IDE_TASK_RESULT_NONE:
return "none";
case IDE_TASK_RESULT_CANCELLED:
return "cancelled";
case IDE_TASK_RESULT_INT:
return "int";
case IDE_TASK_RESULT_POINTER:
return "pointer";
case IDE_TASK_RESULT_OBJECT:
return "object";
case IDE_TASK_RESULT_BOXED:
return "boxed";
case IDE_TASK_RESULT_BOOLEAN:
return "boolean";
case IDE_TASK_RESULT_ERROR:
return "error";
default:
return NULL;
}
}
static void
ide_task_data_free (IdeTaskData *task_data)
{
if (task_data->data_destroy != NULL)
task_data->data_destroy (task_data->data);
g_slice_free (IdeTaskData, task_data);
}
static IdeTaskResult *
ide_task_result_copy (const IdeTaskResult *src)
{
IdeTaskResult *dst;
dst = g_slice_new0 (IdeTaskResult);
dst->type = src->type;
switch (src->type)
{
case IDE_TASK_RESULT_INT:
dst->u.v_int = src->u.v_int;
break;
case IDE_TASK_RESULT_BOOLEAN:
dst->u.v_bool = src->u.v_bool;
break;
case IDE_TASK_RESULT_ERROR:
dst->u.v_error = g_error_copy (src->u.v_error);
break;
case IDE_TASK_RESULT_OBJECT:
dst->u.v_object = src->u.v_object ? g_object_ref (src->u.v_object) : NULL;
break;
case IDE_TASK_RESULT_BOXED:
dst->u.v_boxed.type = src->u.v_boxed.type;
dst->u.v_boxed.pointer = g_boxed_copy (src->u.v_boxed.type, src->u.v_boxed.pointer);
break;
case IDE_TASK_RESULT_POINTER:
g_critical ("Cannot proxy raw pointers for task results");
break;
case IDE_TASK_RESULT_CANCELLED:
case IDE_TASK_RESULT_NONE:
default:
break;
}
return g_steal_pointer (&dst);
}
static void
ide_task_result_free (IdeTaskResult *result)
{
if (result == NULL)
return;
switch (result->type)
{
case IDE_TASK_RESULT_POINTER:
if (result->u.v_pointer.destroy)
result->u.v_pointer.destroy (result->u.v_pointer.pointer);
break;
case IDE_TASK_RESULT_ERROR:
g_error_free (result->u.v_error);
break;
case IDE_TASK_RESULT_BOXED:
if (result->u.v_boxed.pointer)
g_boxed_free (result->u.v_boxed.type, result->u.v_boxed.pointer);
break;
case IDE_TASK_RESULT_OBJECT:
g_clear_object (&result->u.v_object);
break;
case IDE_TASK_RESULT_BOOLEAN:
case IDE_TASK_RESULT_INT:
case IDE_TASK_RESULT_NONE:
case IDE_TASK_RESULT_CANCELLED:
default:
break;
}
g_clear_object (&result->task);
g_clear_pointer (&result->main_context, g_main_context_unref);
g_slice_free (IdeTaskResult, result);
}
/*
* ide_task_complete:
* @result: (transfer full): the result to complete
*
* queues the completion for the task. make sure that you've
* set the result->task, main_context, and priority first.
*
* This is designed to allow stealing the last reference from
* a worker thread and pass it back to the main context.
*
* Returns: a gsource identifier
*/
static guint
ide_task_complete (IdeTaskResult *result)
{
GSource *source;
guint ret;
g_assert (result != NULL);
g_assert (IDE_IS_TASK (result->task));
g_assert (result->main_context);
source = g_idle_source_new ();
g_source_set_name (source, "[ide-task] complete result");
g_source_set_ready_time (source, -1);
g_source_set_callback (source, ide_task_return_cb, result, NULL);
g_source_set_priority (source, result->priority);
ret = g_source_attach (source, result->main_context);
g_source_unref (source);
return ret;
}
static void
ide_task_thread_func (gpointer data)
{
g_autoptr(GObject) source_object = NULL;
g_autoptr(GCancellable) cancellable = NULL;
g_autoptr(IdeTask) task = data;
IdeTaskPrivate *priv = ide_task_get_instance_private (task);
gpointer task_data = NULL;
IdeTaskThreadFunc thread_func;
g_assert (IDE_IS_TASK (task));
g_mutex_lock (&priv->mutex);
source_object = priv->source_object ? g_object_ref (priv->source_object) : NULL;
cancellable = priv->cancellable ? g_object_ref (priv->cancellable) : NULL;
if (priv->task_data)
task_data = priv->task_data->data;
thread_func = priv->thread_func;
priv->thread_func = NULL;
g_mutex_unlock (&priv->mutex);
g_assert (thread_func != NULL);
thread_func (task, source_object, task_data, cancellable);
g_clear_object (&source_object);
g_clear_object (&cancellable);
g_mutex_lock (&priv->mutex);
/*
* We've delayed our ide_task_return() until we reach here, so now
* we can steal our object instance and complete the task along with
* ensuring the object wont be finalized from this thread.
*/
if (priv->thread_result)
{
IdeTaskResult *result = g_steal_pointer (&priv->thread_result);
g_assert (result->task == task);
g_clear_object (&result->task);
result->task = g_steal_pointer (&task);
priv->return_source = ide_task_complete (g_steal_pointer (&result));
g_assert (source_object == NULL);
g_assert (cancellable == NULL);
g_assert (task == NULL);
}
else
{
/* The task did not return a value while in the thread func! GTask
* doesn't support this, but its useful to us in a number of ways, so
* we'll begrudgingly support it but the best we can do is drop our
* reference from the thread.
*/
}
g_mutex_unlock (&priv->mutex);
g_assert (source_object == NULL);
g_assert (cancellable == NULL);
g_assert (task == NULL);
}
static void
ide_task_dispose (GObject *object)
{
IdeTask *self = (IdeTask *)object;
IdeTaskPrivate *priv = ide_task_get_instance_private (self);
g_assert (IDE_IS_TASK (self));
ide_task_release (self, TRUE);
g_mutex_lock (&priv->mutex);
g_clear_pointer (&priv->result, ide_task_result_free);
g_mutex_unlock (&priv->mutex);
G_OBJECT_CLASS (ide_task_parent_class)->dispose (object);
}
static void
ide_task_finalize (GObject *object)
{
IdeTask *self = (IdeTask *)object;
IdeTaskPrivate *priv = ide_task_get_instance_private (self);
if (!priv->return_called)
g_critical ("%s [%s] finalized before completing",
G_OBJECT_TYPE_NAME (self),
priv->name ?: "unnamed");
else if (priv->chained && priv->chained->len)
g_critical ("%s [%s] finalized before dependents were notified",
G_OBJECT_TYPE_NAME (self),
priv->name ?: "unnamed");
else if (priv->thread_func)
g_critical ("%s [%s] finalized while thread_func is active",
G_OBJECT_TYPE_NAME (self),
priv->name ?: "unnamed");
else if (!priv->completed)
g_critical ("%s [%s] finalized before completion",
G_OBJECT_TYPE_NAME (self),
priv->name ?: "unnamed");
g_assert (priv->return_source == 0);
g_assert (priv->result == NULL);
g_assert (priv->task_data == NULL);
g_assert (priv->source_object == NULL);
g_assert (priv->chained == NULL);
g_assert (priv->thread_result == NULL);
g_clear_pointer (&priv->main_context, g_main_context_unref);
g_clear_object (&priv->cancellable);
g_mutex_clear (&priv->mutex);
G_OBJECT_CLASS (ide_task_parent_class)->finalize (object);
DZL_COUNTER_DEC (instances);
}
static void
ide_task_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
IdeTask *self = IDE_TASK (object);
switch (prop_id)
{
case PROP_COMPLETED:
g_value_set_boolean (value, ide_task_get_completed (self));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
}
}
static void
ide_task_class_init (IdeTaskClass *klass)
{
GObjectClass *object_class = G_OBJECT_CLASS (klass);
object_class->dispose = ide_task_dispose;
object_class->finalize = ide_task_finalize;
object_class->get_property = ide_task_get_property;
properties [PROP_COMPLETED] =
g_param_spec_boolean ("completed",
"Completed",
"If the task has completed",
FALSE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (object_class, N_PROPS, properties);
}
static void
ide_task_init (IdeTask *self)
{
IdeTaskPrivate *priv = ide_task_get_instance_private (self);
DZL_COUNTER_INC (instances);
g_mutex_init (&priv->mutex);
priv->check_cancellable = TRUE;
priv->release_on_propagate = TRUE;
priv->priority = G_PRIORITY_DEFAULT;
priv->main_context = g_main_context_ref_thread_default ();
}
/**
* ide_task_get_source_object: (skip)
* @self: a #IdeTask
*
* Gets the #GObject used when creating the source object.
*
* As this does not provide ownership transfer of the #GObject, it is a
* programmer error to call this function outside of a thread worker called
* from ide_task_run_in_thread() or outside the #GMainContext that is
* associated with the task.
*
* If you need to access the object in other scenarios, you must use the
* g_async_result_get_source_object() which provides a full reference to the
* source object, safely. You are responsible for ensuring that you do not
* release the object in a manner that is unsafe for the source object.
*
* Returns: (transfer none) (nullable) (type GObject.Object): a #GObject or %NULL
*
* Since: 3.28
*/
gpointer
ide_task_get_source_object (IdeTask *self)
{
IdeTaskPrivate *priv = ide_task_get_instance_private (self);
gpointer ret;
g_return_val_if_fail (IDE_IS_TASK (self), NULL);
g_mutex_lock (&priv->mutex);
ret = priv->source_object;
g_mutex_unlock (&priv->mutex);
return ret;
}
/**
* ide_task_new:
* @source_object: (type GObject.Object) (nullable): a #GObject or %NULL
* @cancellable: (nullable): a #GCancellable or %NULL
* @callback: (scope async) (nullable): a #GAsyncReadyCallback or %NULL
* @user_data: closure data for @callback
*
* Creates a new #IdeTask.
*
* #IdeTask is similar to #GTask but provides some additional guarantees
* such that by default, the source object, task data, and unused results
* are guaranteed to be finalized in the #GMainContext associated with
* the task itself.
*
* Returns: (transfer full): an #IdeTask
*
* Since: 3.28
*/
IdeTask *
(ide_task_new) (gpointer source_object,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
g_autoptr(IdeTask) self = NULL;
IdeTaskPrivate *priv;
g_return_val_if_fail (!source_object || G_IS_OBJECT (source_object</