Commit 171052f3 authored by Carlos Garnacho's avatar Carlos Garnacho

tracker-miner-fs: Process files from the given GFileInfo

Instead of going file by file querying info asynchronously, and
then populating the info. Use the GFileInfo received indirectly from
the TrackerCrawler.

This avoids the overhead coming from being far too asynchronous in
handling. We already have all information so the SPARQL can be figured
out synchronously. The updates are still batched though.
parent 915fd779
......@@ -400,8 +400,8 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
G_STRUCT_OFFSET (TrackerMinerFSClass, process_file),
NULL, NULL,
NULL,
G_TYPE_BOOLEAN,
2, G_TYPE_FILE, G_TYPE_TASK);
G_TYPE_STRING,
2, G_TYPE_FILE, G_TYPE_FILE_INFO);
/**
* TrackerMinerFS::process-file-attributes:
......@@ -436,8 +436,8 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
G_STRUCT_OFFSET (TrackerMinerFSClass, process_file_attributes),
NULL, NULL,
NULL,
G_TYPE_BOOLEAN,
2, G_TYPE_FILE, G_TYPE_TASK);
G_TYPE_STRING,
2, G_TYPE_FILE, G_TYPE_FILE_INFO);
/**
* TrackerMinerFS::finished:
......@@ -1273,73 +1273,27 @@ sparql_buffer_task_finished_cb (GObject *object,
tracker_task_unref (task);
}
static UpdateProcessingTaskContext *
update_processing_task_context_new (TrackerMiner *miner,
gint priority,
GCancellable *cancellable)
{
UpdateProcessingTaskContext *ctxt;
ctxt = g_slice_new0 (UpdateProcessingTaskContext);
ctxt->miner = miner;
ctxt->priority = priority;
if (cancellable) {
ctxt->cancellable = g_object_ref (cancellable);
}
return ctxt;
}
static void
update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
{
g_clear_pointer (&ctxt->task, tracker_task_unref);
if (ctxt->cancellable) {
g_object_unref (ctxt->cancellable);
}
g_slice_free (UpdateProcessingTaskContext, ctxt);
}
static void
on_signal_gtask_complete (GObject *source,
GAsyncResult *res,
gpointer user_data)
push_sparql_task (TrackerMinerFS *fs,
GFile *file,
gchar *sparql,
gint priority)
{
TrackerMinerFS *fs = TRACKER_MINER_FS (source);
TrackerTask *task, *sparql_task = NULL;
UpdateProcessingTaskContext *ctxt;
GError *error = NULL;
GFile *file = user_data;
gchar *uri, *sparql;
sparql = g_task_propagate_pointer (G_TASK (res), &error);
g_object_unref (res);
TrackerTask *sparql_task = NULL;
gchar *uri;
ctxt = g_task_get_task_data (G_TASK (res));
uri = g_file_get_uri (file);
task = ctxt->task;
g_assert (task != NULL);
if (error) {
g_message ("Could not process '%s': %s", uri, error->message);
g_error_free (error);
fs->priv->total_files_notified_error++;
} else {
fs->priv->total_files_notified++;
fs->priv->total_files_notified++;
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Creating/updating item '%s'", uri));
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Creating/updating item '%s'", uri));
sparql_task = tracker_sparql_task_new_take_sparql_str (file, sparql);
}
sparql_task = tracker_sparql_task_new_take_sparql_str (file, sparql);
if (sparql_task) {
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
sparql_task,
ctxt->priority,
priority,
sparql_buffer_task_finished_cb,
fs);
......@@ -1367,8 +1321,6 @@ on_signal_gtask_complete (GObject *source,
}
}
tracker_task_pool_remove (fs->priv->task_pool, task);
if (tracker_miner_fs_has_items_to_process (fs) == FALSE &&
tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->task_pool)) == 0) {
/* We need to run this one more time to trigger process_stop() */
......@@ -1381,69 +1333,45 @@ on_signal_gtask_complete (GObject *source,
static gboolean
item_add_or_update (TrackerMinerFS *fs,
GFile *file,
GFileInfo *info,
gint priority,
gboolean attributes_update)
{
TrackerMinerFSPrivate *priv;
UpdateProcessingTaskContext *ctxt;
GCancellable *cancellable;
gboolean processing;
TrackerTask *task;
gchar *uri;
GTask *gtask;
priv = fs->priv;
gchar *uri, *sparql;
cancellable = g_cancellable_new ();
g_object_ref (file);
/* Create task and add it to the pool as a WAIT task (we need to extract
* the file metadata and such) */
ctxt = update_processing_task_context_new (TRACKER_MINER (fs),
priority,
cancellable);
/* Call ::process-file to see if we handle this resource or not */
uri = g_file_get_uri (file);
gtask = g_task_new (fs, ctxt->cancellable, on_signal_gtask_complete, file);
g_task_set_task_data (gtask, ctxt,
(GDestroyNotify) update_processing_task_context_free);
task = tracker_task_new (file, gtask, NULL);
if (!info) {
info = g_file_query_info (file,
fs->priv->file_attributes,
G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
NULL, NULL);
}
ctxt->task = tracker_task_ref (task);
tracker_task_pool_add (priv->task_pool, task);
tracker_task_unref (task);
if (!info)
return TRUE;
if (!attributes_update) {
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Processing file '%s'...", uri));
g_signal_emit (fs, signals[PROCESS_FILE], 0,
file, gtask,
&processing);
file, info,
&sparql);
} else {
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Processing attributes in file '%s'...", uri));
g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
file, gtask,
&processing);
file, info,
&sparql);
}
if (!processing) {
GError *error;
error = g_error_new (tracker_miner_fs_error_quark (),
TRACKER_MINER_FS_ERROR_INIT,
"TrackerMinerFS::process-file returned FALSE");
g_task_return_error (gtask, error);
} else {
fs->priv->total_files_processed++;
}
fs->priv->total_files_processed++;
push_sparql_task (fs, file, sparql, priority);
g_free (uri);
g_object_unref (file);
g_object_unref (cancellable);
return !tracker_task_pool_limit_reached (priv->task_pool);
return TRUE;
}
static gboolean
......@@ -1831,7 +1759,7 @@ miner_handle_next_item (TrackerMinerFS *fs)
case TRACKER_MINER_FS_EVENT_UPDATED:
parent = g_file_get_parent (file);
keep_processing = item_add_or_update (fs, file, priority, attributes_update);
keep_processing = item_add_or_update (fs, file, info, priority, attributes_update);
if (parent) {
g_object_unref (parent);
......
......@@ -91,18 +91,18 @@ struct _TrackerMinerFS {
typedef struct {
TrackerMinerClass parent;
gboolean (* process_file) (TrackerMinerFS *fs,
gchar * (* process_file) (TrackerMinerFS *fs,
GFile *file,
GTask *task);
GFileInfo *info);
void (* finished) (TrackerMinerFS *fs,
gdouble elapsed,
gint directories_found,
gint directories_ignored,
gint files_found,
gint files_ignored);
gboolean (* process_file_attributes) (TrackerMinerFS *fs,
gchar * (* process_file_attributes) (TrackerMinerFS *fs,
GFile *file,
GTask *task);
GFileInfo *info);
void (* finished_root) (TrackerMinerFS *fs,
GFile *root,
gint directories_found,
......
......@@ -67,16 +67,6 @@
static GQuark miner_files_error_quark = 0;
typedef struct ProcessFileData ProcessFileData;
struct ProcessFileData {
TrackerMinerFiles *miner;
GCancellable *cancellable;
GFile *file;
gchar *mime_type;
GTask *task;
};
struct TrackerMinerFilesPrivate {
TrackerConfig *config;
TrackerStorage *storage;
......@@ -119,8 +109,6 @@ struct TrackerMinerFilesPrivate {
gboolean mount_points_initialized;
guint stale_volumes_check_id;
GList *extraction_queue;
};
enum {
......@@ -195,12 +183,12 @@ static void set_up_application_indexing (TrackerMinerFiles *m
static void index_applications_changed_cb (GObject *gobject,
GParamSpec *arg1,
gpointer user_data);
static gboolean miner_files_process_file (TrackerMinerFS *fs,
static gchar * miner_files_process_file (TrackerMinerFS *fs,
GFile *file,
GTask *task);
static gboolean miner_files_process_file_attributes (TrackerMinerFS *fs,
GFileInfo *info);
static gchar * miner_files_process_file_attributes (TrackerMinerFS *fs,
GFile *file,
GTask *task);
GFileInfo *info);
static gchar * miner_files_remove_children (TrackerMinerFS *fs,
GFile *file);
static gchar * miner_files_remove_file (TrackerMinerFS *fs,
......@@ -794,8 +782,6 @@ miner_files_finalize (GObject *object)
priv->stale_volumes_check_id = 0;
}
g_list_free (priv->extraction_queue);
G_OBJECT_CLASS (tracker_miner_files_parent_class)->finalize (object);
}
......@@ -2079,65 +2065,27 @@ miner_files_create_folder_information_element (TrackerMinerFiles *miner,
return resource;
}
static void
process_file_data_free (ProcessFileData *data)
{
g_object_unref (data->miner);
g_object_unref (data->cancellable);
g_object_unref (data->file);
g_object_unref (data->task);
g_free (data->mime_type);
g_slice_free (ProcessFileData, data);
}
static void
process_file_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
static gchar *
miner_files_process_file (TrackerMinerFS *fs,
GFile *file,
GFileInfo *file_info)
{
TrackerMinerFilesPrivate *priv;
TrackerResource *resource, *folder_resource = NULL;
ProcessFileData *data;
const gchar *mime_type, *graph;
gchar *parent_urn;
gchar *delete_properties_sparql = NULL;
GFileInfo *file_info;
guint64 time_;
GFile *file, *parent;
GFile *parent;
gchar *uri, *sparql_str, *sparql_update_str, *time_str, *ie_update_str = NULL, *graph_file_str = NULL;
GError *error = NULL;
gboolean is_special;
gboolean is_directory;
data = user_data;
file = G_FILE (object);
file_info = g_file_query_info_finish (file, result, &error);
priv = TRACKER_MINER_FILES (data->miner)->private;
if (error) {
/* Something bad happened, notify about the error */
tracker_miner_fs_notify_finish (TRACKER_MINER_FS (data->miner), data->task, NULL, error);
priv->extraction_queue = g_list_remove (priv->extraction_queue, data);
process_file_data_free (data);
return;
}
is_special = (g_file_info_get_file_type (file_info) == G_FILE_TYPE_SPECIAL ?
TRUE : FALSE);
if (is_special) {
error = g_error_new (TRACKER_MINER_ERROR,
0,
"File is a device, socket or pipe. Refusing to index it.");
tracker_miner_fs_notify_finish (TRACKER_MINER_FS (data->miner), data->task, NULL, error);
priv->extraction_queue = g_list_remove (priv->extraction_queue, data);
process_file_data_free (data);
return;
}
priv = TRACKER_MINER_FILES (fs)->private;
priv->start_extractor = TRUE;
uri = g_file_get_uri (file);
mime_type = g_file_info_get_content_type (file_info);
data->mime_type = g_strdup (mime_type);
is_directory = (g_file_info_get_file_type (file_info) == G_FILE_TYPE_DIRECTORY ?
TRUE : FALSE);
......@@ -2169,7 +2117,7 @@ process_file_cb (GObject *object,
tracker_resource_add_uri (resource, "rdf:type", "nfo:FileDataObject");
parent = g_file_get_parent (file);
parent_urn = folder_urn_or_bnode (data->miner, parent, FALSE, NULL);
parent_urn = folder_urn_or_bnode (TRACKER_MINER_FILES (fs), parent, FALSE, NULL);
g_object_unref (parent);
if (parent_urn) {
......@@ -2197,13 +2145,13 @@ process_file_cb (GObject *object,
if (is_directory) {
folder_resource =
miner_files_create_folder_information_element (data->miner,
miner_files_create_folder_information_element (TRACKER_MINER_FILES (fs),
file,
mime_type,
is_directory);
}
miner_files_add_to_datasource (data->miner, file, resource, folder_resource);
miner_files_add_to_datasource (TRACKER_MINER_FILES (fs), file, resource, folder_resource);
sparql_update_str = tracker_resource_print_sparql_update (resource, NULL, DEFAULT_GRAPH);
......@@ -2244,136 +2192,54 @@ process_file_cb (GObject *object,
g_free (delete_properties_sparql);
g_free (graph_file_str);
tracker_miner_fs_notify_finish (TRACKER_MINER_FS (data->miner), data->task,
sparql_str, NULL);
priv->extraction_queue = g_list_remove (priv->extraction_queue, data);
process_file_data_free (data);
g_object_run_dispose (G_OBJECT (resource));
g_object_unref (resource);
g_object_unref (file_info);
g_free (sparql_str);
g_free (uri);
g_free (sparql_update_str);
}
static gboolean
miner_files_process_file (TrackerMinerFS *fs,
GFile *file,
GTask *task)
{
TrackerMinerFilesPrivate *priv;
ProcessFileData *data;
const gchar *attrs;
data = g_slice_new0 (ProcessFileData);
data->miner = TRACKER_MINER_FILES (g_object_ref (fs));
data->cancellable = g_object_ref (g_task_get_cancellable (task));
data->file = g_object_ref (file);
data->task = g_object_ref (task);
priv = TRACKER_MINER_FILES (fs)->private;
priv->extraction_queue = g_list_prepend (priv->extraction_queue, data);
priv->start_extractor = TRUE;
attrs = G_FILE_ATTRIBUTE_STANDARD_TYPE ","
G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE ","
G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME ","
G_FILE_ATTRIBUTE_STANDARD_SIZE ","
G_FILE_ATTRIBUTE_TIME_MODIFIED ","
G_FILE_ATTRIBUTE_TIME_ACCESS;
g_file_query_info_async (file,
attrs,
G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
G_PRIORITY_DEFAULT,
data->cancellable,
process_file_cb,
data);
return sparql_str;
return TRUE;
}
static void
process_file_attributes_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
static gchar *
miner_files_process_file_attributes (TrackerMinerFS *fs,
GFile *file,
GFileInfo *info)
{
TrackerResource *resource;
ProcessFileData *data;
GFileInfo *file_info;
guint64 time_;
GFile *file;
gchar *uri, *time_str, *sparql_str;
GError *error = NULL;
data = user_data;
file = G_FILE (object);
file_info = g_file_query_info_finish (file, result, &error);
if (error) {
/* Something bad happened, notify about the error */
tracker_miner_fs_notify_finish (TRACKER_MINER_FS (data->miner), data->task, NULL, error);
process_file_data_free (data);
return;
}
uri = g_file_get_uri (file);
resource = tracker_resource_new (uri);
if (!info) {
info = g_file_query_info (file,
G_FILE_ATTRIBUTE_TIME_MODIFIED ","
G_FILE_ATTRIBUTE_TIME_ACCESS,
G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
NULL, NULL);
}
/* Update nfo:fileLastModified */
time_ = g_file_info_get_attribute_uint64 (file_info, G_FILE_ATTRIBUTE_TIME_MODIFIED);
time_ = g_file_info_get_attribute_uint64 (info, G_FILE_ATTRIBUTE_TIME_MODIFIED);
time_str = tracker_date_to_string (time_);
tracker_resource_set_string (resource, "nfo:fileLastModified", time_str);
g_free (time_str);
/* Update nfo:fileLastAccessed */
time_ = g_file_info_get_attribute_uint64 (file_info, G_FILE_ATTRIBUTE_TIME_ACCESS);
time_ = g_file_info_get_attribute_uint64 (info, G_FILE_ATTRIBUTE_TIME_ACCESS);
time_str = tracker_date_to_string (time_);
tracker_resource_set_string (resource, "nfo:fileLastAccessed", time_str);
g_free (time_str);
g_object_unref (file_info);
g_free (uri);
/* Notify about the success */
sparql_str = tracker_resource_print_sparql_update (resource, NULL, DEFAULT_GRAPH);
tracker_miner_fs_notify_finish (TRACKER_MINER_FS (data->miner), data->task,
sparql_str, NULL);
process_file_data_free (data);
g_object_unref (resource);
g_free (sparql_str);
}
static gboolean
miner_files_process_file_attributes (TrackerMinerFS *fs,
GFile *file,
GTask *task)
{
ProcessFileData *data;
const gchar *attrs;
data = g_slice_new0 (ProcessFileData);
data->miner = TRACKER_MINER_FILES (g_object_ref (fs));
data->cancellable = g_object_ref (g_task_get_cancellable (task));
data->file = g_object_ref (file);
data->task = g_object_ref (task);
/* Query only attributes that may change in an ATTRIBUTES_UPDATED event */
attrs = G_FILE_ATTRIBUTE_TIME_MODIFIED ","
G_FILE_ATTRIBUTE_TIME_ACCESS;
g_file_query_info_async (file,
attrs,
G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
G_PRIORITY_DEFAULT,
data->cancellable,
process_file_attributes_cb,
data);
return TRUE;
return sparql_str;
}
static void
......
......@@ -28,14 +28,13 @@ G_DEFINE_TYPE (TestMiner, test_miner, TRACKER_TYPE_MINER_FS)
TrackerMinerFSTestFixture, NULL, \
fixture_setup, func, fixture_teardown)
static gboolean
static gchar *
test_miner_process_file (TrackerMinerFS *miner,
GFile *file,
GTask *task)
GFileInfo *info)
{
TrackerResource *resource;
GError *error = NULL;
GFileInfo *info;
GDateTime *modification_time;
gchar *sparql, *str;
const gchar *urn;
......@@ -83,12 +82,11 @@ test_miner_process_file (TrackerMinerFS *miner,
}
sparql = tracker_resource_print_sparql_update (resource, NULL, "Graph");
tracker_miner_fs_notify_finish (miner, task, sparql, NULL);
g_object_unref (resource);
g_free (sparql);
g_object_unref (info);
return TRUE;
return sparql;
}
static gchar *
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment