Commit 0d761e35 authored by Carlos Garnacho's avatar Carlos Garnacho Committed by Carlos Garnacho

tracker-miner-fs: Move "retry on extraction failed" code to TrackerMinerFiles

Currently the simplest approach of retrying just once
every failed file is taken, after that, the sparql without
embedded metadata is added.

Conflicts:

	src/libtracker-miner/tracker-miner-fs.c
	src/miners/fs/tracker-miner-files.c
parent a14567fb
......@@ -213,7 +213,6 @@ struct _TrackerMinerFSPrivate {
/* Extraction tasks */
TrackerTaskPool *task_pool;
GList *extraction_tasks;
/* Writeback tasks */
TrackerTaskPool *writeback_pool;
......@@ -903,7 +902,6 @@ fs_finalize (GObject *object)
task_pool_cancel_foreach,
NULL);
g_object_unref (priv->task_pool);
g_list_free (priv->extraction_tasks);
g_object_unref (priv->writeback_pool);
......@@ -1879,7 +1877,6 @@ do_process_file (TrackerMinerFS *fs,
"implementation error", G_OBJECT_TYPE_NAME (fs), uri);
} else {
tracker_task_pool_remove (priv->task_pool, task);
priv->extraction_tasks = g_list_remove (priv->extraction_tasks, task);
tracker_task_unref (task);
}
}
......@@ -1903,148 +1900,77 @@ item_add_or_update_cb (TrackerMinerFS *fs,
task_file = tracker_task_get_file (extraction_task);
uri = g_file_get_uri (task_file);
if (error) {
TrackerTask *first_item_task = NULL;
GList *first_task;
first_task = g_list_last (fs->priv->extraction_tasks);
if (first_task) {
first_item_task = first_task->data;
}
/* Perhaps this is too specific to TrackerMinerFiles, if the extractor
* is choking on some file, the miner will get a timeout for all files
* being currently processed, but the one that is actually causing it
* is the first one that was added to the processing pool, so we retry
* the others.
*/
if (extraction_task != first_item_task &&
(error->code == G_DBUS_ERROR_NO_REPLY ||
error->code == G_DBUS_ERROR_TIMEOUT ||
error->code == G_DBUS_ERROR_TIMED_OUT)) {
g_debug (" Got DBus timeout error on '%s', but it could not be caused by it. Retrying file.", uri);
/* Reset the TrackerSparqlBuilder */
g_object_unref (ctxt->builder);
ctxt->builder = tracker_sparql_builder_new_update ();
do_process_file (fs, extraction_task);
g_free (uri);
return;
} else {
fs->priv->total_files_notified_error++;
g_message ("Could not process '%s': %s", uri, error->message);
if (error->code == G_IO_ERROR_CANCELLED) {
/* Cancelled is cancelled, just move along in this case */
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
extraction_task);
tracker_task_unref (extraction_task);
item_queue_handlers_set_up (fs);
g_free (uri);
return;
} else if (error->code == G_IO_ERROR_NOT_FOUND) {
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
extraction_task);
/* File was not found, remove it
* if it was in the store
*/
if (ctxt->urn) {
item_remove (fs, task_file);
} else {
item_queue_handlers_set_up (fs);
}
tracker_task_unref (extraction_task);
g_free (uri);
return;
}
}
}
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
extraction_task);
if (ctxt->urn) {
gboolean attribute_update_only;
attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
fs->priv->quark_attribute_updated));
g_debug ("Updating item%s%s%s '%s' with urn '%s'%s",
error != NULL ? " (which had extractor error '" : "",
error != NULL ? (error->message ? error->message : "No error given") : "",
error != NULL ? "')" : "",
uri,
ctxt->urn,
attribute_update_only ? " (attributes only)" : "");
if (!attribute_update_only) {
gchar *full_sparql;
if (error) {
g_message ("Could not process '%s': %s", uri, error->message);
/* Update, delete all statements inserted by miner except:
* - rdf:type statements as they could cause implicit deletion of user data
* - nie:contentCreated so it persists across updates
*
* Additionally, delete also nie:url as it might have been set by 3rd parties,
* and it's used to know whether a file is known to tracker or not.
*/
full_sparql = g_strdup_printf ("DELETE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" } "
"} "
"WHERE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" FILTER (?p != rdf:type && ?p != nie:contentCreated)"
" } "
"} "
"DELETE {"
" <%s> nie:url ?o"
"} WHERE {"
" <%s> nie:url ?o"
"}"
"%s",
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result (ctxt->builder));
sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
fs->priv->total_files_notified_error++;
item_queue_handlers_set_up (fs);
} else {
if (error != NULL) {
g_debug ("Creating minimal info for new item '%s' which had error: '%s'",
if (ctxt->urn) {
gboolean attribute_update_only;
attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
fs->priv->quark_attribute_updated));
g_debug ("Updating item '%s' with urn '%s'%s",
uri,
error->message ? error->message : "No error given");
ctxt->urn,
attribute_update_only ? " (attributes only)" : "");
if (!attribute_update_only) {
gchar *full_sparql;
/* Update, delete all statements inserted by miner except:
* - rdf:type statements as they could cause implicit deletion of user data
* - nie:contentCreated so it persists across updates
*
* Additionally, delete also nie:url as it might have been set by 3rd parties,
* and it's used to know whether a file is known to tracker or not.
*/
full_sparql = g_strdup_printf ("DELETE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" } "
"} "
"WHERE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" FILTER (?p != rdf:type && ?p != nie:contentCreated)"
" } "
"} "
"DELETE {"
" <%s> nie:url ?o"
"} WHERE {"
" <%s> nie:url ?o"
"}"
"%s",
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result (ctxt->builder));
sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
} else {
g_debug ("Creating new item '%s'", uri);
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
sparql_task,
ctxt->priority,
sparql_buffer_task_finished_cb,
fs);
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
sparql_task,
ctxt->priority,
sparql_buffer_task_finished_cb,
fs);
if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
}
}
tracker_task_unref (extraction_task);
......@@ -2088,7 +2014,6 @@ item_add_or_update (TrackerMinerFS *fs,
task = tracker_task_new (file, ctxt,
(GDestroyNotify) update_processing_task_context_free);
tracker_task_pool_add (priv->task_pool, task);
priv->extraction_tasks = g_list_prepend (priv->extraction_tasks, task);
if (do_process_file (fs, task)) {
fs->priv->total_files_processed++;
......
......@@ -71,6 +71,8 @@ struct ProcessFileData {
TrackerSparqlBuilder *sparql;
GCancellable *cancellable;
GFile *file;
gchar *mime_type;
guint retried : 1;
};
typedef void (*fast_async_cb) (const gchar *preupdate,
......@@ -241,6 +243,9 @@ static void miner_files_in_removable_media_remove_by_date (TrackerMinerF
static void miner_files_add_removable_or_optical_directory (TrackerMinerFiles *mf,
const gchar *mount_path,
const gchar *uuid);
static void extractor_get_embedded_metadata (ProcessFileData *data,
const gchar *uri,
const gchar *mime_type);
static void extractor_cancel_tasks (GDBusConnection *connection,
GFile *prefix);
......@@ -1954,28 +1959,17 @@ process_file_data_free (ProcessFileData *data)
g_object_unref (data->sparql);
g_object_unref (data->cancellable);
g_object_unref (data->file);
g_free (data->mime_type);
g_slice_free (ProcessFileData, data);
}
static void
extractor_get_embedded_metadata_cb (const gchar *preupdate,
const gchar *sparql,
GError *error,
gpointer user_data)
sparql_builder_finish (ProcessFileData *data,
const gchar *preupdate,
const gchar *sparql)
{
ProcessFileData *data = user_data;
const gchar *uuid;
if (error) {
tracker_sparql_builder_graph_close (data->sparql);
tracker_sparql_builder_insert_close (data->sparql);
/* Something bad happened, notify about the error */
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, error);
process_file_data_free (data);
return;
}
if (sparql && *sparql) {
gboolean is_iri;
const gchar *urn;
......@@ -2039,6 +2033,54 @@ extractor_get_embedded_metadata_cb (const gchar *preupdate,
g_free (removable_device_urn);
g_free (uri);
}
}
static void
extractor_get_embedded_metadata_cb (const gchar *preupdate,
const gchar *sparql,
GError *error,
gpointer user_data)
{
ProcessFileData *data = user_data;
if (error) {
if (error->code == G_DBUS_ERROR_NO_REPLY ||
error->code == G_DBUS_ERROR_TIMEOUT ||
error->code == G_DBUS_ERROR_TIMED_OUT) {
gchar *uri;
uri = g_file_get_uri (data->file);
if (!data->retried) {
data->retried = TRUE;
g_debug (" Got extraction DBus error on '%s'. Retrying file.", uri);
/* Try again extraction */
extractor_get_embedded_metadata (data, uri, data->mime_type);
} else {
g_warning (" Got second extraction DBus error on '%s'. "
"Adding only non-embedded metadata to the SparQL, "
"the error was: %s",
uri, error->message);
sparql_builder_finish (data, NULL, NULL);
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, NULL);
process_file_data_free (data);
}
g_free (uri);
} else {
/* Something bad happened, notify about the error */
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, error);
process_file_data_free (data);
}
g_error_free (error);
return;
}
sparql_builder_finish (data, preupdate, sparql);
/* Notify about the success */
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, NULL);
......@@ -2400,6 +2442,8 @@ process_file_cb (GObject *object,
mime_type = g_file_info_get_content_type (file_info);
urn = miner_files_get_file_urn (TRACKER_MINER_FILES (data->miner), file, &is_iri);
data->mime_type = g_strdup (mime_type);
tracker_sparql_builder_insert_silent_open (sparql, NULL);
tracker_sparql_builder_graph_open (sparql, TRACKER_MINER_FS_GRAPH_URN);
......@@ -2461,10 +2505,10 @@ process_file_cb (GObject *object,
/* Next step, if NOT a directory, get embedded metadata */
extractor_get_embedded_metadata (data, uri, mime_type);
} else {
/* For directories, don't request embedded metadata extraction.
* We setup an idle so that we keep the previous behavior. */
/* Otherwise, don't request embedded metadata extraction. */
g_debug ("Avoiding embedded metadata request for directory '%s'", uri);
extractor_get_embedded_metadata_cb (NULL, NULL, NULL, user_data);
sparql_builder_finish (data, NULL, NULL);
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, NULL);
}
g_object_unref (file_info);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment