Commit 05450979 authored by Martyn Russell's avatar Martyn Russell

libtracker-miner: Fix reference leak with TrackerTaskPool

The leak occurred because tracker_sparql_task_new_with_sparql() was being
called but the returned TrackerTask* was not being unreferenced anywhere and
the call to tracker_sparql_buffer_flush() with the new task was taking its own
references internally.

Took this opportunity to make the code here easier to follow:
- do_process_file() is now merged into item_add_or_update()
- item_add_or_update_cb() is renamed to item_add_or_update_continue() so it's
  obvious it is called from tracker_miner_fs_file_notify().
- renamed various variables to make the code easier to follow.
parent 566fc496
......@@ -1338,80 +1338,19 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
g_slice_free (UpdateProcessingTaskContext, ctxt);
}
static gboolean
do_process_file (TrackerMinerFS *fs,
TrackerTask *task)
{
TrackerMinerFSPrivate *priv;
gboolean processing;
gboolean attribute_update_only;
gchar *uri;
GFile *task_file;
UpdateProcessingTaskContext *ctxt;
ctxt = tracker_task_get_data (task);
task_file = tracker_task_get_file (task);
uri = g_file_get_uri (task_file);
priv = fs->priv;
attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (task_file),
priv->quark_attribute_updated));
if (!attribute_update_only) {
g_debug ("Processing file '%s'...", uri);
g_signal_emit (fs, signals[PROCESS_FILE], 0,
task_file,
ctxt->builder,
ctxt->cancellable,
&processing);
} else {
g_debug ("Processing attributes in file '%s'...", uri);
g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
task_file,
ctxt->builder,
ctxt->cancellable,
&processing);
}
if (!processing) {
/* Re-fetch data, since it might have been
* removed in broken implementations
*/
task = tracker_task_pool_find (priv->task_pool, task_file);
g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
if (!task) {
g_critical ("%s has returned FALSE in ::process-file for '%s', "
"but it seems that this file has been processed through "
"tracker_miner_fs_file_notify(), this is an "
"implementation error", G_OBJECT_TYPE_NAME (fs), uri);
} else {
tracker_task_pool_remove (priv->task_pool, task);
tracker_task_unref (task);
}
}
g_free (uri);
return processing;
}
static void
item_add_or_update_cb (TrackerMinerFS *fs,
TrackerTask *extraction_task,
const GError *error)
item_add_or_update_continue (TrackerMinerFS *fs,
TrackerTask *task,
const GError *error)
{
UpdateProcessingTaskContext *ctxt;
TrackerTask *sparql_task = NULL;
GFile *task_file;
GFile *file;
gchar *uri;
ctxt = tracker_task_get_data (extraction_task);
task_file = tracker_task_get_file (extraction_task);
uri = g_file_get_uri (task_file);
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
ctxt = tracker_task_get_data (task);
file = tracker_task_get_file (task);
uri = g_file_get_uri (file);
if (error) {
g_message ("Could not process '%s': %s", uri, error->message);
......@@ -1420,15 +1359,13 @@ item_add_or_update_cb (TrackerMinerFS *fs,
if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND) &&
!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
sparql_task = tracker_sparql_task_new_with_sparql (task_file,
ctxt->builder);
sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
}
} else {
if (ctxt->urn) {
gboolean attribute_update_only;
attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
fs->priv->quark_attribute_updated));
attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (file), fs->priv->quark_attribute_updated));
g_debug ("Updating item '%s' with urn '%s'%s",
uri,
ctxt->urn,
......@@ -1466,16 +1403,16 @@ item_add_or_update_cb (TrackerMinerFS *fs,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result (ctxt->builder));
sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
sparql_task = tracker_sparql_task_new_take_sparql_str (file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
}
} else {
g_debug ("Creating new item '%s'", uri);
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
}
}
......@@ -1486,14 +1423,20 @@ item_add_or_update_cb (TrackerMinerFS *fs,
sparql_buffer_task_finished_cb,
fs);
if (item_queue_is_blocked_by_file (fs, task_file)) {
if (item_queue_is_blocked_by_file (fs, file)) {
tracker_sparql_buffer_flush (fs->priv->sparql_buffer, "Current file is blocking item queue");
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
}
/* We can let go of our reference here because the
* sparql buffer takes its own reference when adding
* it to the task pool.
*/
tracker_task_unref (sparql_task);
} else {
if (item_queue_is_blocked_by_file (fs, task_file)) {
if (item_queue_is_blocked_by_file (fs, file)) {
/* Make sure that we don't stall the item queue, although we could
* expect the file to be reenqueued until the loop detector makes
* us drop it since we were specifically waiting for it to complete.
......@@ -1510,7 +1453,14 @@ item_add_or_update_cb (TrackerMinerFS *fs,
item_queue_handlers_set_up (fs);
}
tracker_task_unref (extraction_task);
/* Last reference is kept by the pool, removing the task from
* the pool cleans up the task too!
*
* NOTE that calling this any earlier actually causes invalid
* reads because the task frees up the
* UpdateProcessingTaskContext and GFile.
*/
tracker_task_pool_remove (fs->priv->task_pool, task);
g_free (uri);
}
......@@ -1541,15 +1491,17 @@ item_add_or_update (TrackerMinerFS *fs,
{
TrackerMinerFSPrivate *priv;
TrackerSparqlBuilder *sparql;
UpdateProcessingTaskContext *ctxt;
GCancellable *cancellable;
gboolean retval;
gboolean processing;
gboolean keep_processing;
gboolean attribute_update_only;
TrackerTask *task;
const gchar *parent_urn, *urn = NULL;
UpdateProcessingTaskContext *ctxt;
const gchar *parent_urn, *urn;
gchar *uri;
GFile *parent;
priv = fs->priv;
retval = TRUE;
cancellable = g_cancellable_new ();
sparql = tracker_sparql_builder_new_update ();
......@@ -1581,20 +1533,61 @@ item_add_or_update (TrackerMinerFS *fs,
(GDestroyNotify) update_processing_task_context_free);
tracker_task_pool_add (priv->task_pool, task);
tracker_task_unref (task);
/* Call ::process-file to see if we handle this resource or not */
uri = g_file_get_uri (file);
attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (file), priv->quark_attribute_updated));
if (!attribute_update_only) {
g_debug ("Processing file '%s'...", uri);
g_signal_emit (fs, signals[PROCESS_FILE], 0,
file,
ctxt->builder,
ctxt->cancellable,
&processing);
} else {
g_debug ("Processing attributes in file '%s'...", uri);
g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
file,
ctxt->builder,
ctxt->cancellable,
&processing);
}
keep_processing = TRUE;
if (!processing) {
/* Re-fetch data, since it might have been
* removed in broken implementations
*/
task = tracker_task_pool_find (priv->task_pool, file);
if (do_process_file (fs, task)) {
g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
if (!task) {
g_critical ("%s has returned FALSE in ::process-file for '%s', "
"but it seems that this file has been processed through "
"tracker_miner_fs_file_notify(), this is an "
"implementation error", G_OBJECT_TYPE_NAME (fs), uri);
} else {
tracker_task_pool_remove (priv->task_pool, task);
}
} else {
fs->priv->total_files_processed++;
if (tracker_task_pool_limit_reached (priv->task_pool)) {
retval = FALSE;
keep_processing = FALSE;
}
}
g_free (uri);
g_object_unref (file);
g_object_unref (cancellable);
g_object_unref (sparql);
return retval;
return keep_processing;
}
static gboolean
......@@ -3740,7 +3733,7 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
return;
}
item_add_or_update_cb (fs, task, error);
item_add_or_update_continue (fs, task, error);
}
/**
......
......@@ -693,14 +693,16 @@ sparql_buffer_push_to_pool (TrackerSparqlBuffer *buffer,
reset_flush_timeout (buffer);
}
/* Task pool addition adds a reference (below) */
/* Task pool addition increments reference */
tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
if (!priv->tasks) {
priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
}
g_ptr_array_add (priv->tasks, task);
/* We add a reference here because we unref when removed from
* the GPtrArray. */
g_ptr_array_add (priv->tasks, tracker_task_ref (task));
if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
......@@ -722,6 +724,10 @@ tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
g_return_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer));
g_return_if_fail (task != NULL);
/* NOTE: We don't own the task and if we want it we have to
* reference it, each function below references task in
* different ways.
*/
data = tracker_task_get_data (task);
if (!data->result) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment