Commit 43519620 authored by Ell's avatar Ell

gegl: add gegl-parallel

gegl-parallel provides various parallel algorithms.  Currently,
this is limited to the gegl_parallel_distribute() family of
functions, which distribute work across multiple threads, migrated
from GIMP.

The following commits use these functions to replace the various
thread-pools we use to auto-parallelize operations with simpler
code, fixing potential dealocks as a result of nested operation
processing, as in bug #790810, along the way.

Since gegl-parallel is public API, it also eases manual
parallelization of operations, inside and outside of GEGL.
parent c5317a7e
......@@ -55,6 +55,7 @@ GEGL_introspectable_headers = \
gegl-matrix.h \
gegl-lookup.h \
gegl-random.h \
gegl-parallel.h \
gegl-init.h \
gegl-version.h \
buffer/gegl-buffer.h \
......@@ -100,6 +101,7 @@ GEGL_sources = \
gegl-xml.c \
gegl-gio.c \
gegl-random.c \
gegl-parallel.c \
gegl-serialize.c \
gegl-stats.c \
gegl-matrix.c \
......@@ -118,6 +120,7 @@ GEGL_sources = \
gegl-op.h \
gegl-plugin.h \
gegl-random-private.h \
gegl-parallel-private.h \
gegl-stats.h \
gegl-gio-private.h \
gegl-types-internal.h \
......
......@@ -105,6 +105,7 @@ guint gegl_debug_flags = 0;
#include "gegl-stats.h"
#include "graph/gegl-node-private.h"
#include "gegl-random-private.h"
#include "gegl-parallel-private.h"
static gboolean gegl_post_parse_hook (GOptionContext *context,
GOptionGroup *group,
......@@ -489,6 +490,7 @@ gegl_exit (void)
gegl_operation_gtype_cleanup ();
gegl_operation_handlers_cleanup ();
gegl_random_cleanup ();
gegl_parallel_cleanup ();
gegl_cl_cleanup ();
gegl_temp_buffer_free ();
......@@ -702,6 +704,7 @@ gegl_post_parse_hook (GOptionContext *context,
GEGL_INSTRUMENT_START();
gegl_parallel_init ();
gegl_operation_gtype_init ();
gegl_tile_cache_init ();
......
/* This file is part of GEGL.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
*
* Copyright 2018 Ell
*/
#ifndef __GEGL_PARALLEL_PRIVATE_H__
#define __GEGL_PARALLEL_PRIVATE_H__
G_BEGIN_DECLS
void gegl_parallel_init (void);
void gegl_parallel_cleanup (void);
G_END_DECLS
#endif /* __GEGL_PARALLEL_PRIVATE_H__ */
/* This file is part of GEGL.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
*
* Copyright 2018 Ell
*/
#include "config.h"
#include <glib.h>
#include "gegl.h"
#include "gegl-config.h"
#include "gegl-parallel.h"
#include "gegl-parallel-private.h"
#define GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS GEGL_MAX_THREADS
typedef struct
{
GeglParallelDistributeFunc func;
gint n;
gpointer user_data;
} GeglParallelDistributeTask;
typedef struct
{
GThread *thread;
GMutex mutex;
GCond cond;
gboolean quit;
GeglParallelDistributeTask *volatile task;
volatile gint i;
} GeglParallelDistributeThread;
/* local function prototypes */
static void gegl_parallel_notify_threads (GeglConfig *config);
static void gegl_parallel_set_n_threads (gint n_threads,
gboolean finish_tasks);
static void gegl_parallel_distribute_set_n_threads (gint n_threads);
static gpointer gegl_parallel_distribute_thread_func (GeglParallelDistributeThread *thread);
/* local variables */
static gint gegl_parallel_distribute_n_threads = 1;
static GeglParallelDistributeThread gegl_parallel_distribute_threads[GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS - 1];
static GMutex gegl_parallel_distribute_completion_mutex;
static GCond gegl_parallel_distribute_completion_cond;
static volatile gint gegl_parallel_distribute_completion_counter;
static volatile gint gegl_parallel_distribute_busy;
/* public functions */
void
gegl_parallel_init (void)
{
g_signal_connect (gegl_config (), "notify::threads",
G_CALLBACK (gegl_parallel_notify_threads),
NULL);
gegl_parallel_notify_threads (gegl_config ());
}
void
gegl_parallel_cleanup (void)
{
g_signal_handlers_disconnect_by_func (gegl_config (),
gegl_parallel_notify_threads,
NULL);
/* stop all threads */
gegl_parallel_set_n_threads (0, /* finish_tasks = */ FALSE);
}
void
gegl_parallel_distribute (gint max_n,
GeglParallelDistributeFunc func,
gpointer user_data)
{
GeglParallelDistributeTask task;
gint i;
g_return_if_fail (func != NULL);
if (max_n == 0)
return;
if (max_n < 0)
max_n = gegl_parallel_distribute_n_threads;
else
max_n = MIN (max_n, gegl_parallel_distribute_n_threads);
if (max_n == 1 ||
! g_atomic_int_compare_and_exchange (&gegl_parallel_distribute_busy,
0, 1))
{
func (0, 1, user_data);
return;
}
task.n = max_n;
task.func = func;
task.user_data = user_data;
g_atomic_int_set (&gegl_parallel_distribute_completion_counter, task.n - 1);
for (i = 0; i < task.n - 1; i++)
{
GeglParallelDistributeThread *thread =
&gegl_parallel_distribute_threads[i];
g_mutex_lock (&thread->mutex);
thread->task = &task;
thread->i = i;
g_cond_signal (&thread->cond);
g_mutex_unlock (&thread->mutex);
}
func (i, task.n, user_data);
if (g_atomic_int_get (&gegl_parallel_distribute_completion_counter))
{
g_mutex_lock (&gegl_parallel_distribute_completion_mutex);
while (g_atomic_int_get (&gegl_parallel_distribute_completion_counter))
{
g_cond_wait (&gegl_parallel_distribute_completion_cond,
&gegl_parallel_distribute_completion_mutex);
}
g_mutex_unlock (&gegl_parallel_distribute_completion_mutex);
}
g_atomic_int_set (&gegl_parallel_distribute_busy, 0);
}
typedef struct
{
gsize size;
GeglParallelDistributeRangeFunc func;
gpointer user_data;
} GeglParallelDistributeRangeData;
static void
gegl_parallel_distribute_range_func (gint i,
gint n,
GeglParallelDistributeRangeData *data)
{
gsize offset;
gsize sub_size;
offset = (2 * i * data->size + n) / (2 * n);
sub_size = (2 * (i + 1) * data->size + n) / (2 * n) - offset;
data->func (offset, sub_size, data->user_data);
}
void
gegl_parallel_distribute_range (gsize size,
gsize min_sub_size,
GeglParallelDistributeRangeFunc func,
gpointer user_data)
{
GeglParallelDistributeRangeData data;
gsize n = size;
g_return_if_fail (func != NULL);
if (size == 0)
return;
if (min_sub_size > 1)
n /= min_sub_size;
n = CLAMP (n, 1, gegl_parallel_distribute_n_threads);
data.size = size;
data.func = func;
data.user_data = user_data;
gegl_parallel_distribute (
n,
(GeglParallelDistributeFunc) gegl_parallel_distribute_range_func,
&data);
}
typedef struct
{
const GeglRectangle *area;
GeglSplitStrategy split_strategy;
GeglParallelDistributeAreaFunc func;
gpointer user_data;
} GeglParallelDistributeAreaData;
static void
gegl_parallel_distribute_area_func (gint i,
gint n,
GeglParallelDistributeAreaData *data)
{
GeglRectangle sub_area;
switch (data->split_strategy)
{
case GEGL_SPLIT_STRATEGY_HORIZONTAL:
sub_area.x = data->area->x;
sub_area.width = data->area->width;
sub_area.y = (2 * i * data->area->height + n) / (2 * n);
sub_area.height = (2 * (i + 1) * data->area->height + n) / (2 * n);
sub_area.height -= sub_area.y;
sub_area.y += data->area->y;
break;
case GEGL_SPLIT_STRATEGY_VERTICAL:
sub_area.y = data->area->y;
sub_area.height = data->area->height;
sub_area.x = (2 * i * data->area->width + n) / (2 * n);
sub_area.width = (2 * (i + 1) * data->area->width + n) / (2 * n);
sub_area.width -= sub_area.x;
sub_area.x += data->area->x;
break;
default:
g_return_if_reached ();
}
data->func (&sub_area, data->user_data);
}
void
gegl_parallel_distribute_area (const GeglRectangle *area,
gsize min_sub_area,
GeglSplitStrategy split_strategy,
GeglParallelDistributeAreaFunc func,
gpointer user_data)
{
GeglParallelDistributeAreaData data;
gsize n;
g_return_if_fail (area != NULL);
g_return_if_fail (func != NULL);
if (area->width <= 0 || area->height <= 0)
return;
if (split_strategy == GEGL_SPLIT_STRATEGY_AUTO)
{
if (area->width > area->height)
split_strategy = GEGL_SPLIT_STRATEGY_VERTICAL;
else
split_strategy = GEGL_SPLIT_STRATEGY_HORIZONTAL;
}
n = (gsize) area->width * (gsize) area->height;
if (min_sub_area > 1)
n /= min_sub_area;
n = CLAMP (n, 1, gegl_parallel_distribute_n_threads);
data.area = area;
data.split_strategy = split_strategy;
data.func = func;
data.user_data = user_data;
gegl_parallel_distribute (
n,
(GeglParallelDistributeFunc) gegl_parallel_distribute_area_func,
&data);
}
/* private functions */
static void
gegl_parallel_notify_threads (GeglConfig *config)
{
gint n_threads;
g_object_get (config,
"threads", &n_threads,
NULL);
gegl_parallel_set_n_threads (n_threads,
/* finish_tasks = */ TRUE);
}
static void
gegl_parallel_set_n_threads (gint n_threads,
gboolean finish_tasks)
{
gegl_parallel_distribute_set_n_threads (n_threads);
}
static void
gegl_parallel_distribute_set_n_threads (gint n_threads)
{
gint i;
while (! g_atomic_int_compare_and_exchange (&gegl_parallel_distribute_busy,
0, 1));
n_threads = CLAMP (n_threads, 1, GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS);
if (n_threads > gegl_parallel_distribute_n_threads) /* need more threads */
{
for (i = gegl_parallel_distribute_n_threads - 1; i < n_threads - 1; i++)
{
GeglParallelDistributeThread *thread =
&gegl_parallel_distribute_threads[i];
thread->quit = FALSE;
thread->task = NULL;
thread->thread = g_thread_new (
"worker",
(GThreadFunc) gegl_parallel_distribute_thread_func,
thread);
}
}
else if (n_threads < gegl_parallel_distribute_n_threads) /* need less threads */
{
for (i = n_threads - 1; i < gegl_parallel_distribute_n_threads - 1; i++)
{
GeglParallelDistributeThread *thread =
&gegl_parallel_distribute_threads[i];
g_mutex_lock (&thread->mutex);
thread->quit = TRUE;
g_cond_signal (&thread->cond);
g_mutex_unlock (&thread->mutex);
}
for (i = n_threads - 1; i < gegl_parallel_distribute_n_threads - 1; i++)
{
GeglParallelDistributeThread *thread =
&gegl_parallel_distribute_threads[i];
g_thread_join (thread->thread);
}
}
gegl_parallel_distribute_n_threads = n_threads;
g_atomic_int_set (&gegl_parallel_distribute_busy, 0);
}
static gpointer
gegl_parallel_distribute_thread_func (GeglParallelDistributeThread *thread)
{
g_mutex_lock (&thread->mutex);
while (TRUE)
{
if (thread->quit)
{
break;
}
else if (thread->task)
{
thread->task->func (thread->i, thread->task->n,
thread->task->user_data);
if (g_atomic_int_dec_and_test (
&gegl_parallel_distribute_completion_counter))
{
g_mutex_lock (&gegl_parallel_distribute_completion_mutex);
g_cond_signal (&gegl_parallel_distribute_completion_cond);
g_mutex_unlock (&gegl_parallel_distribute_completion_mutex);
}
thread->task = NULL;
}
g_cond_wait (&thread->cond, &thread->mutex);
}
g_mutex_unlock (&thread->mutex);
return NULL;
}
/* This file is part of GEGL.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
*
* Copyright 2018 Ell
*/
#ifndef __GEGL_PARALLEL_H__
#define __GEGL_PARALLEL_H__
G_BEGIN_DECLS
/**
* GeglParallelDistributeFunc:
* @i: the current thread index, in the range [0,@n)
* @n: the number of threads execution is distributed across
* @user_data: user data pointer
*
* Specifies the type of function passed to gegl_parallel_distribute().
*
* The function should process the @i-th part of the data, out of @n
* equal parts. @n may be less-than or equal-to the @max_n argument
* passed to gegl_parallel_distribute().
*/
typedef void (* GeglParallelDistributeFunc) (gint i,
gint n,
gpointer user_data);
/**
* GeglParallelDistributeRangeFunc:
* @offset: the current data offset
* @size: the current data size
* @user_data: user data pointer
*
* Specifies the type of function passed to gegl_parallel_distribute_range().
*
* The function should process @size elements of the data, starting
* at @offset. @size may be greater-than or equal-to the @min_sub_size
* argument passed to gegl_parallel_distribute_range().
*/
typedef void (* GeglParallelDistributeRangeFunc) (gsize offset,
gsize size,
gpointer user_data);
/**
* GeglParallelDistributeAreaFunc:
* @area: the current sub-region
* @user_data: user data pointer
*
* Specifies the type of function passed to gegl_parallel_distribute_area().
*
* The function should process the sub-region specified by @area, whose
* area may be greater-than or equal-to the @min_sub_area argument passed
* to gegl_parallel_distribute_area().
*
*/
typedef void (* GeglParallelDistributeAreaFunc) (const GeglRectangle *area,
gpointer user_data);
/**
* gegl_parallel_distribute:
* @max_n: the maximal number of threads to use
* @func: (closure user_data) (scope call): the function to call
* @user_data: user data to pass to the function
*
* Distributes the execution of a function across multiple threads,
* by calling it with a different index on each thread.
*/
void gegl_parallel_distribute (gint max_n,
GeglParallelDistributeFunc func,
gpointer user_data);
/**
* gegl_parallel_distribute_range:
* @size: the total size of the data
* @min_sub_size: the minimal data size to be processed by each thread
* @func: (closure user_data) (scope call): the function to call
* @user_data: user data to pass to the function
*
* Distributes the processing of a linear data-structure across
* multiple threads, by calling the given function with different
* sub-ranges on different threads.
*/
void gegl_parallel_distribute_range (gsize size,
gsize min_sub_size,
GeglParallelDistributeRangeFunc func,
gpointer user_data);
/**
* gegl_parallel_distribute_area:
* @area: the region to process
* @min_sub_area: the minimal area to be processed by each thread
* @split_strategy: the strategy to use for dividing the region
* @func: (closure user_data) (scope call): the function to call
* @user_data: user data to pass to the function
*
* Distributes the processing of a planar data-structure across
* multiple threads, by calling the given function with different
* sub-regions on different threads.
*/
void gegl_parallel_distribute_area (const GeglRectangle *area,
gsize min_sub_area,
GeglSplitStrategy split_strategy,
GeglParallelDistributeAreaFunc func,
gpointer user_data);
#ifdef __cplusplus
extern "C++"
{
template <class ParallelDistributeFunc>
inline void
gegl_parallel_distribute (gint max_n,
ParallelDistributeFunc func)
{
gegl_parallel_distribute (max_n,
[] (gint i,
gint n,
gpointer user_data)
{
ParallelDistributeFunc func_copy (
*(const ParallelDistributeFunc *) user_data);
func_copy (i, n);
},
&func);
}
template <class ParallelDistributeRangeFunc>
inline void
gegl_parallel_distribute_range (gsize size,
gsize min_sub_size,
ParallelDistributeRangeFunc func)
{
gegl_parallel_distribute_range (size, min_sub_size,
[] (gsize offset,
gsize size,
gpointer user_data)
{
ParallelDistributeRangeFunc func_copy (
*(const ParallelDistributeRangeFunc *) user_data);
func_copy (offset, size);
},
&func);
}
template <class ParallelDistributeAreaFunc>
inline void
gegl_parallel_distribute_area (const GeglRectangle *area,
gsize min_sub_area,
GeglSplitStrategy split_strategy,
ParallelDistributeAreaFunc func)
{
gegl_parallel_distribute_area (area, min_sub_area, split_strategy,
[] (const GeglRectangle *area,
gpointer user_data)
{
ParallelDistributeAreaFunc func_copy (
*(const ParallelDistributeAreaFunc *) user_data);
func_copy (area);
},
&func);
}
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GEGL_PARALLEL_H__ */
......@@ -37,6 +37,7 @@
#include <gegl-init.h>
#include <gegl-version.h>
#include <gegl-random.h>
#include <gegl-parallel.h>
#include <gegl-node.h>
#include <gegl-processor.h>
#include <gegl-apply.h>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment