Commit 28cf45a3 authored by Philip Van Hoof's avatar Philip Van Hoof

Merge branch 'failsafe-extraction-0.10' into tracker-0.10

parents c2dc99bf ea914fbf
......@@ -213,7 +213,6 @@ struct _TrackerMinerFSPrivate {
/* Extraction tasks */
TrackerTaskPool *task_pool;
GList *extraction_tasks;
/* Writeback tasks */
TrackerTaskPool *writeback_pool;
......@@ -903,7 +902,6 @@ fs_finalize (GObject *object)
task_pool_cancel_foreach,
NULL);
g_object_unref (priv->task_pool);
g_list_free (priv->extraction_tasks);
g_object_unref (priv->writeback_pool);
......@@ -1887,7 +1885,6 @@ do_process_file (TrackerMinerFS *fs,
"implementation error", G_OBJECT_TYPE_NAME (fs), uri);
} else {
tracker_task_pool_remove (priv->task_pool, task);
priv->extraction_tasks = g_list_remove (priv->extraction_tasks, task);
tracker_task_unref (task);
}
}
......@@ -1903,7 +1900,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
const GError *error)
{
UpdateProcessingTaskContext *ctxt;
TrackerTask *sparql_task;
TrackerTask *sparql_task = NULL;
GFile *task_file;
gchar *uri;
......@@ -1911,145 +1908,81 @@ item_add_or_update_cb (TrackerMinerFS *fs,
task_file = tracker_task_get_file (extraction_task);
uri = g_file_get_uri (task_file);
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
if (error) {
TrackerTask *first_item_task = NULL;
GList *first_task;
g_message ("Could not process '%s': %s", uri, error->message);
first_task = g_list_last (fs->priv->extraction_tasks);
fs->priv->total_files_notified_error++;
if (first_task) {
first_item_task = first_task->data;
if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND) &&
!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
sparql_task = tracker_sparql_task_new_with_sparql (task_file,
ctxt->builder);
}
} else {
if (ctxt->urn) {
gboolean attribute_update_only;
/* Perhaps this is too specific to TrackerMinerFiles, if the extractor
* is choking on some file, the miner will get a timeout for all files
* being currently processed, but the one that is actually causing it
* is the first one that was added to the processing pool, so we retry
* the others.
*/
if (extraction_task != first_item_task &&
(error->code == G_DBUS_ERROR_NO_REPLY ||
error->code == G_DBUS_ERROR_TIMEOUT ||
error->code == G_DBUS_ERROR_TIMED_OUT)) {
g_debug (" Got DBus timeout error on '%s', but it could not be caused by it. Retrying file.", uri);
/* Reset the TrackerSparqlBuilder */
g_object_unref (ctxt->builder);
ctxt->builder = tracker_sparql_builder_new_update ();
do_process_file (fs, extraction_task);
g_free (uri);
return;
} else {
fs->priv->total_files_notified_error++;
g_message ("Could not process '%s': %s", uri, error->message);
if (error->code == G_IO_ERROR_CANCELLED) {
/* Cancelled is cancelled, just move along in this case */
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
extraction_task);
tracker_task_unref (extraction_task);
item_queue_handlers_set_up (fs);
g_free (uri);
return;
} else if (error->code == G_IO_ERROR_NOT_FOUND) {
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
extraction_task);
/* File was not found, remove it
* if it was in the store
attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
fs->priv->quark_attribute_updated));
g_debug ("Updating item '%s' with urn '%s'%s",
uri,
ctxt->urn,
attribute_update_only ? " (attributes only)" : "");
if (!attribute_update_only) {
gchar *full_sparql;
/* Update, delete all statements inserted by miner except:
* - rdf:type statements as they could cause implicit deletion of user data
* - nie:contentCreated so it persists across updates
*
* Additionally, delete also nie:url as it might have been set by 3rd parties,
* and it's used to know whether a file is known to tracker or not.
*/
if (ctxt->urn) {
item_remove (fs, task_file);
} else {
item_queue_handlers_set_up (fs);
}
tracker_task_unref (extraction_task);
g_free (uri);
return;
full_sparql = g_strdup_printf ("DELETE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" } "
"} "
"WHERE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" FILTER (?p != rdf:type && ?p != nie:contentCreated)"
" } "
"} "
"DELETE {"
" <%s> nie:url ?o"
"} WHERE {"
" <%s> nie:url ?o"
"}"
"%s",
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result (ctxt->builder));
sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
}
}
tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
extraction_task);
if (ctxt->urn) {
gboolean attribute_update_only;
attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
fs->priv->quark_attribute_updated));
g_debug ("Updating item%s%s%s '%s' with urn '%s'%s",
error != NULL ? " (which had extractor error '" : "",
error != NULL ? (error->message ? error->message : "No error given") : "",
error != NULL ? "')" : "",
uri,
ctxt->urn,
attribute_update_only ? " (attributes only)" : "");
if (!attribute_update_only) {
gchar *full_sparql;
/* Update, delete all statements inserted by miner except:
* - rdf:type statements as they could cause implicit deletion of user data
* - nie:contentCreated so it persists across updates
*
* Additionally, delete also nie:url as it might have been set by 3rd parties,
* and it's used to know whether a file is known to tracker or not.
*/
full_sparql = g_strdup_printf ("DELETE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" } "
"} "
"WHERE {"
" GRAPH <%s> {"
" <%s> ?p ?o"
" FILTER (?p != rdf:type && ?p != nie:contentCreated)"
" } "
"} "
"DELETE {"
" <%s> nie:url ?o"
"} WHERE {"
" <%s> nie:url ?o"
"}"
"%s",
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
TRACKER_MINER_FS_GRAPH_URN, ctxt->urn,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result (ctxt->builder));
sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
} else {
if (error != NULL) {
g_debug ("Creating minimal info for new item '%s' which had error: '%s'",
uri,
error->message ? error->message : "No error given");
} else {
g_debug ("Creating new item '%s'", uri);
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
sparql_task,
ctxt->priority,
sparql_buffer_task_finished_cb,
fs);
if (sparql_task) {
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
sparql_task,
ctxt->priority,
sparql_buffer_task_finished_cb,
fs);
}
if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
......@@ -2096,7 +2029,6 @@ item_add_or_update (TrackerMinerFS *fs,
task = tracker_task_new (file, ctxt,
(GDestroyNotify) update_processing_task_context_free);
tracker_task_pool_add (priv->task_pool, task);
priv->extraction_tasks = g_list_prepend (priv->extraction_tasks, task);
if (do_process_file (fs, task)) {
fs->priv->total_files_processed++;
......
......@@ -71,6 +71,7 @@ struct ProcessFileData {
TrackerSparqlBuilder *sparql;
GCancellable *cancellable;
GFile *file;
gchar *mime_type;
};
typedef void (*fast_async_cb) (const gchar *preupdate,
......@@ -133,6 +134,12 @@ struct TrackerMinerFilesPrivate {
gboolean mount_points_initialized;
guint stale_volumes_check_id;
guint failed_extraction_pause_cookie;
GList *extraction_queue;
GList *failed_extraction_queue;
gboolean failsafe_extraction;
};
enum {
......@@ -241,9 +248,18 @@ static void miner_files_in_removable_media_remove_by_date (TrackerMinerF
static void miner_files_add_removable_or_optical_directory (TrackerMinerFiles *mf,
const gchar *mount_path,
const gchar *uuid);
static void get_metadata_fast_async (GDBusConnection *connection,
const gchar *uri,
const gchar *mime_type,
GCancellable *cancellable,
fast_async_cb callback,
ProcessFileData *user_data);
static void extractor_cancel_tasks (GDBusConnection *connection,
GFile *prefix);
static void extractor_process_failsafe (TrackerMinerFiles *miner);
static GInitableIface* miner_files_initable_parent_iface;
G_DEFINE_TYPE_WITH_CODE (TrackerMinerFiles, tracker_miner_files, TRACKER_TYPE_MINER_FS,
......@@ -669,6 +685,9 @@ miner_files_finalize (GObject *object)
priv->stale_volumes_check_id = 0;
}
g_list_free (priv->extraction_queue);
g_list_free (priv->failed_extraction_queue);
G_OBJECT_CLASS (tracker_miner_files_parent_class)->finalize (object);
}
......@@ -1954,28 +1973,17 @@ process_file_data_free (ProcessFileData *data)
g_object_unref (data->sparql);
g_object_unref (data->cancellable);
g_object_unref (data->file);
g_free (data->mime_type);
g_slice_free (ProcessFileData, data);
}
static void
extractor_get_embedded_metadata_cb (const gchar *preupdate,
const gchar *sparql,
GError *error,
gpointer user_data)
sparql_builder_finish (ProcessFileData *data,
const gchar *preupdate,
const gchar *sparql)
{
ProcessFileData *data = user_data;
const gchar *uuid;
if (error) {
tracker_sparql_builder_graph_close (data->sparql);
tracker_sparql_builder_insert_close (data->sparql);
/* Something bad happened, notify about the error */
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, error);
process_file_data_free (data);
return;
}
if (sparql && *sparql) {
gboolean is_iri;
const gchar *urn;
......@@ -2039,11 +2047,161 @@ extractor_get_embedded_metadata_cb (const gchar *preupdate,
g_free (removable_device_urn);
g_free (uri);
}
}
/* Notify about the success */
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, NULL);
static void
extractor_get_failsafe_metadata_cb (const gchar *preupdate,
const gchar *sparql,
GError *error,
gpointer user_data)
{
ProcessFileData *data = user_data;
TrackerMinerFiles *miner = data->miner;
TrackerMinerFilesPrivate *priv = miner->private;
gchar *uri;
if (error) {
uri = g_file_get_uri (data->file);
g_warning (" Got second extraction DBus error on '%s'. "
"Adding only non-embedded metadata to the SparQL, "
"the error was: %s",
uri, error->message);
sparql_builder_finish (data, NULL, NULL);
g_free (uri);
} else {
g_debug (" Extraction succeeded the second time");
sparql_builder_finish (data, preupdate, sparql);
}
/* Notify success even if the extraction failed
* again, so we get the essential data in the store.
*/
tracker_miner_fs_file_notify (TRACKER_MINER_FS (miner), data->file, NULL);
priv->failed_extraction_queue = g_list_remove (priv->failed_extraction_queue, data);
process_file_data_free (data);
/* Get on to the next failed extraction, or resume miner */
extractor_process_failsafe (miner);
}
/* This function processes failed files one by one,
* the function will be called after each operation
* is finished, so elements are processed linearly.
*/
static void
extractor_process_failsafe (TrackerMinerFiles *miner)
{
TrackerMinerFilesPrivate *priv;
ProcessFileData *data;
priv = miner->private;
if (priv->failed_extraction_queue) {
gchar *uri;
data = priv->failed_extraction_queue->data;
priv->failed_extraction_queue = g_list_remove (priv->failed_extraction_queue, data);
uri = g_file_get_uri (data->file);
g_message ("Performing failsafe extraction on '%s'", uri);
g_free (uri);
get_metadata_fast_async (data->miner->private->connection,
uri,
data->mime_type,
data->cancellable,
extractor_get_failsafe_metadata_cb,
data);
} else {
g_debug ("Failsafe extraction finished. Resuming miner...");
if (priv->failed_extraction_pause_cookie != 0) {
tracker_miner_resume (TRACKER_MINER (miner),
priv->failed_extraction_pause_cookie,
NULL);
priv->failed_extraction_pause_cookie = 0;
}
priv->failsafe_extraction = FALSE;
}
}
static void
extractor_check_process_failsafe (TrackerMinerFiles *miner)
{
TrackerMinerFilesPrivate *priv;
priv = miner->private;
if (priv->failsafe_extraction) {
/* already on failsafe extraction */
return;
}
if (priv->extraction_queue ||
!priv->failed_extraction_queue) {
/* No reasons (yet) to start failsafe extraction */
return;
}
priv->failsafe_extraction = TRUE;
extractor_process_failsafe (miner);
}
static void
extractor_get_embedded_metadata_cb (const gchar *preupdate,
const gchar *sparql,
GError *error,
gpointer user_data)
{
TrackerMinerFilesPrivate *priv;
TrackerMinerFiles *miner;
ProcessFileData *data = user_data;
miner = data->miner;
priv = miner->private;
priv->extraction_queue = g_list_remove (priv->extraction_queue, data);
if (error) {
if (error->code == G_DBUS_ERROR_NO_REPLY ||
error->code == G_DBUS_ERROR_TIMEOUT ||
error->code == G_DBUS_ERROR_TIMED_OUT) {
gchar *uri;
uri = g_file_get_uri (data->file);
g_warning (" Got extraction DBus error on '%s': %s", uri, error->message);
if (priv->failed_extraction_pause_cookie != 0) {
priv->failed_extraction_pause_cookie =
tracker_miner_pause (TRACKER_MINER (data->miner),
_("Extractor error, performing "
"failsafe embedded metadata extraction"),
NULL);
}
priv->failed_extraction_queue = g_list_prepend (priv->failed_extraction_queue, data);
g_free (uri);
} else {
/* Something bad happened, notify about the error */
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, error);
process_file_data_free (data);
}
} else {
sparql_builder_finish (data, preupdate, sparql);
/* Notify about the success */
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, NULL);
process_file_data_free (data);
}
/* Wait until there are no pending extraction requests
* before starting failsafe extraction process.
*/
extractor_check_process_failsafe (miner);
}
static SendAndSpliceData *
......@@ -2372,6 +2530,7 @@ process_file_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
TrackerMinerFilesPrivate *priv;
TrackerSparqlBuilder *sparql;
ProcessFileData *data;
const gchar *mime_type, *urn, *parent_urn;
......@@ -2387,6 +2546,7 @@ process_file_cb (GObject *object,
file = G_FILE (object);
sparql = data->sparql;
file_info = g_file_query_info_finish (file, result, &error);
priv = TRACKER_MINER_FILES (data->miner)->private;
if (error) {
/* Something bad happened, notify about the error */
......@@ -2400,6 +2560,8 @@ process_file_cb (GObject *object,
mime_type = g_file_info_get_content_type (file_info);
urn = miner_files_get_file_urn (TRACKER_MINER_FILES (data->miner), file, &is_iri);
data->mime_type = g_strdup (mime_type);
tracker_sparql_builder_insert_silent_open (sparql, NULL);
tracker_sparql_builder_graph_open (sparql, TRACKER_MINER_FS_GRAPH_URN);
......@@ -2461,10 +2623,13 @@ process_file_cb (GObject *object,
/* Next step, if NOT a directory, get embedded metadata */
extractor_get_embedded_metadata (data, uri, mime_type);
} else {
/* For directories, don't request embedded metadata extraction.
* We setup an idle so that we keep the previous behavior. */
/* Otherwise, don't request embedded metadata extraction. */
g_debug ("Avoiding embedded metadata request for directory '%s'", uri);
extractor_get_embedded_metadata_cb (NULL, NULL, NULL, user_data);
sparql_builder_finish (data, NULL, NULL);
tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, NULL);
priv->extraction_queue = g_list_remove (priv->extraction_queue, data);
extractor_check_process_failsafe (data->miner);
}
g_object_unref (file_info);
......@@ -2477,6 +2642,7 @@ miner_files_process_file (TrackerMinerFS *fs,
TrackerSparqlBuilder *sparql,
GCancellable *cancellable)
{
TrackerMinerFilesPrivate *priv;
ProcessFileData *data;
const gchar *attrs;
......@@ -2486,6 +2652,9 @@ miner_files_process_file (TrackerMinerFS *fs,
data->sparql = g_object_ref (sparql);
data->file = g_object_ref (file);
priv = TRACKER_MINER_FILES (fs)->private;
priv->extraction_queue = g_list_prepend (priv->extraction_queue, data);
attrs = G_FILE_ATTRIBUTE_STANDARD_TYPE ","
G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE ","
G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME ","
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment