Commit 14203118 authored by Sam Thursfield's avatar Sam Thursfield

Merge branch 'wip/carlosg/perf-squeeze' into 'master'

Squeeze miner-fs performance

See merge request !233
parents 5ff38262 760cc6b2
Pipeline #194225 passed with stages
in 13 minutes and 12 seconds
......@@ -83,6 +83,7 @@ typedef struct {
TrackerSparqlStatement *content_query;
TrackerSparqlStatement *urn_query;
TrackerSparqlStatement *exists_query;
GTimer *timer;
......@@ -703,14 +704,40 @@ sparql_urn_ensure_statement (TrackerFileNotifier *notifier,
tracker_sparql_connection_query_statement (priv->connection,
"SELECT ?ie "
"{"
" ~file a nfo:FileDataObject ;"
" nie:interpretedAs ?ie ."
" GRAPH tracker:FileSystem {"
" ~file a nfo:FileDataObject ;"
" nie:interpretedAs ?ie ."
" }"
"}",
priv->cancellable,
error);
return priv->urn_query;
}
static TrackerSparqlStatement *
sparql_exists_ensure_statement (TrackerFileNotifier *notifier,
GError **error)
{
TrackerFileNotifierPrivate *priv;
priv = tracker_file_notifier_get_instance_private (notifier);
if (priv->exists_query)
return priv->exists_query;
priv->exists_query =
tracker_sparql_connection_query_statement (priv->connection,
"ASK "
"{"
" GRAPH tracker:FileSystem {"
" ~file a nfo:FileDataObject ."
" }"
"}",
priv->cancellable,
error);
return priv->exists_query;
}
static void
query_execute_cb (TrackerSparqlStatement *statement,
GAsyncResult *res,
......@@ -1060,6 +1087,12 @@ monitor_item_updated_cb (TrackerMonitor *monitor,
/* Fetch the interned copy */
canonical = tracker_file_system_get_file (priv->file_system,
file, file_type, NULL);
if (is_directory) {
/* Ensure the folder iri is cached */
tracker_file_notifier_get_file_iri (notifier, canonical, TRUE);
}
g_signal_emit (notifier, signals[FILE_UPDATED], 0, canonical, FALSE);
if (!is_directory) {
......@@ -1526,6 +1559,7 @@ tracker_file_notifier_finalize (GObject *object)
g_clear_object (&priv->content_query);
g_clear_object (&priv->urn_query);
g_clear_object (&priv->exists_query);
g_object_unref (priv->crawler);
g_object_unref (priv->monitor);
......@@ -1943,6 +1977,66 @@ tracker_file_notifier_get_file_iri (TrackerFileNotifier *notifier,
return iri;
}
gboolean
tracker_file_notifier_query_file_exists (TrackerFileNotifier *notifier,
GFile *file)
{
TrackerFileNotifierPrivate *priv;
GFile *canonical;
gboolean found;
g_return_val_if_fail (TRACKER_IS_FILE_NOTIFIER (notifier), FALSE);
g_return_val_if_fail (G_IS_FILE (file), FALSE);
priv = tracker_file_notifier_get_instance_private (notifier);
if (G_UNLIKELY (priv->connection == NULL)) {
return FALSE;
}
canonical = tracker_file_system_get_file (priv->file_system,
file,
G_FILE_TYPE_REGULAR,
NULL);
if (!canonical) {
return FALSE;
}
found = tracker_file_system_get_property_full (priv->file_system,
canonical,
quark_property_mimetype,
NULL);
if (!found) {
TrackerSparqlCursor *cursor;
TrackerSparqlStatement *statement;
gchar *uri;
/* Fetch data for this file synchronously */
statement = sparql_exists_ensure_statement (notifier, NULL);
if (!statement)
return FALSE;
uri = g_file_get_uri (file);
tracker_sparql_statement_bind_string (statement, "file", uri);
g_free (uri);
cursor = tracker_sparql_statement_execute (statement, NULL, NULL);
if (!cursor)
return FALSE;
if (!tracker_sparql_cursor_next (cursor, NULL, NULL)) {
g_object_unref (cursor);
return FALSE;
}
found = tracker_sparql_cursor_get_boolean (cursor, 0);
g_object_unref (cursor);
}
return found;
}
static gboolean
file_notifier_invalidate_file_iri_foreach (GFile *file,
gpointer user_data)
......
......@@ -92,6 +92,8 @@ const gchar * tracker_file_notifier_get_file_iri (TrackerFileNotifier *notif
void tracker_file_notifier_invalidate_file_iri (TrackerFileNotifier *notifier,
GFile *file,
gboolean recursive);
gboolean tracker_file_notifier_query_file_exists (TrackerFileNotifier *notifier,
GFile *file);
GFileType tracker_file_notifier_get_file_type (TrackerFileNotifier *notifier,
GFile *file);
......
......@@ -157,15 +157,6 @@ struct _TrackerMinerFSPrivate {
guint total_files_notified_error;
};
typedef enum {
QUEUE_NONE,
QUEUE_CREATED,
QUEUE_UPDATED,
QUEUE_DELETED,
QUEUE_MOVED,
QUEUE_WAIT,
} QueueState;
typedef enum {
QUEUE_ACTION_NONE = 0,
QUEUE_ACTION_DELETE_FIRST = 1 << 0,
......@@ -1224,14 +1215,12 @@ sparql_buffer_task_finished_cb (GObject *object,
static UpdateProcessingTaskContext *
update_processing_task_context_new (TrackerMiner *miner,
gint priority,
const gchar *urn,
GCancellable *cancellable)
{
UpdateProcessingTaskContext *ctxt;
ctxt = g_slice_new0 (UpdateProcessingTaskContext);
ctxt->miner = miner;
ctxt->urn = g_strdup (urn);
ctxt->priority = priority;
if (cancellable) {
......@@ -1244,8 +1233,6 @@ update_processing_task_context_new (TrackerMiner *miner,
static void
update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
{
g_free (ctxt->urn);
if (ctxt->cancellable) {
g_object_unref (ctxt->cancellable);
}
......@@ -1253,6 +1240,18 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
g_slice_free (UpdateProcessingTaskContext, ctxt);
}
static void
cache_parent_folder_urn (TrackerMinerFS *fs,
GFile *file)
{
GFile *parent;
parent = g_file_get_parent (file);
tracker_file_notifier_get_file_iri (fs->priv->file_notifier,
parent, TRUE);
g_object_unref (parent);
}
static void
on_signal_gtask_complete (GObject *source,
GAsyncResult *res,
......@@ -1271,7 +1270,7 @@ on_signal_gtask_complete (GObject *source,
task = tracker_task_pool_find (fs->priv->task_pool, file);
g_assert (task != NULL);
ctxt = tracker_task_get_data (task);
ctxt = g_task_get_task_data (G_TASK (res));
uri = g_file_get_uri (file);
if (error) {
......@@ -1282,14 +1281,7 @@ on_signal_gtask_complete (GObject *source,
} else {
fs->priv->total_files_notified++;
if (ctxt->urn) {
/* The SPARQL builder will already contain the necessary
* DELETE statements for the properties being updated */
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Updating item '%s' with urn '%s'",
uri, ctxt->urn));
} else {
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Creating new item '%s'", uri));
}
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Creating/updating item '%s'", uri));
sparql_task = tracker_sparql_task_new_take_sparql_str (file, sparql);
}
......@@ -1325,13 +1317,6 @@ on_signal_gtask_complete (GObject *source,
}
}
/* Last reference is kept by the pool, removing the task from
* the pool cleans up the task too!
*
* NOTE that calling this any earlier actually causes invalid
* reads because the task frees up the
* UpdateProcessingTaskContext and GFile.
*/
tracker_task_pool_remove (fs->priv->task_pool, task);
if (tracker_miner_fs_has_items_to_process (fs) == FALSE &&
......@@ -1354,7 +1339,6 @@ item_add_or_update (TrackerMinerFS *fs,
GCancellable *cancellable;
gboolean processing;
TrackerTask *task;
const gchar *urn;
gchar *uri;
GTask *gtask;
......@@ -1363,25 +1347,23 @@ item_add_or_update (TrackerMinerFS *fs,
cancellable = g_cancellable_new ();
g_object_ref (file);
urn = tracker_file_notifier_get_file_iri (fs->priv->file_notifier,
file, FALSE);
/* Create task and add it to the pool as a WAIT task (we need to extract
* the file metadata and such) */
ctxt = update_processing_task_context_new (TRACKER_MINER (fs),
priority,
urn,
cancellable);
task = tracker_task_new (file, ctxt,
(GDestroyNotify) update_processing_task_context_free);
tracker_task_pool_add (priv->task_pool, task);
tracker_task_unref (task);
/* Call ::process-file to see if we handle this resource or not */
uri = g_file_get_uri (file);
gtask = g_task_new (fs, ctxt->cancellable, on_signal_gtask_complete, file);
g_task_set_task_data (gtask, ctxt,
(GDestroyNotify) update_processing_task_context_free);
task = tracker_task_new (file, g_object_ref (gtask), g_object_unref);
tracker_task_pool_add (priv->task_pool, task);
tracker_task_unref (task);
if (!attributes_update) {
TRACKER_NOTE (MINER_FS_EVENTS, g_message ("Processing file '%s'...", uri));
......@@ -1455,7 +1437,6 @@ item_move (TrackerMinerFS *fs,
{
gchar *uri, *source_uri, *sparql;
GFileInfo *file_info;
const gchar *source_iri;
gboolean source_exists;
TrackerDirectoryFlags source_flags, flags;
gboolean recursive;
......@@ -1470,9 +1451,8 @@ item_move (TrackerMinerFS *fs,
NULL, NULL);
/* Get 'source' ID */
source_iri = tracker_file_notifier_get_file_iri (fs->priv->file_notifier,
source_file, TRUE);
source_exists = (source_iri != NULL);
source_exists = tracker_file_notifier_query_file_exists (fs->priv->file_notifier,
source_file);
if (!file_info) {
gboolean retval;
......@@ -1529,6 +1509,10 @@ item_move (TrackerMinerFS *fs,
(source_flags & TRACKER_DIRECTORY_FLAG_RECURSE) != 0)
item_remove (fs, source_file, TRUE, source_task_sparql);
/* Cache URN for source/dest folders */
cache_parent_folder_urn (fs, source_file);
cache_parent_folder_urn (fs, dest_file);
g_signal_emit (fs, signals[MOVE_FILE], 0, dest_file, source_file, recursive, &sparql);
if (sparql && sparql[0] != '\0') {
......@@ -1552,8 +1536,7 @@ should_wait (TrackerMinerFS *fs,
GFile *parent;
/* Is the item already being processed? */
if (tracker_task_pool_find (fs->priv->task_pool, file) ||
tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), file)) {
if (tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, file) == TRACKER_BUFFER_STATE_FLUSHING) {
/* Yes, a previous event on same item currently
* being processed */
fs->priv->item_queue_blocker = g_object_ref (file);
......@@ -1563,8 +1546,7 @@ should_wait (TrackerMinerFS *fs,
/* Is the item's parent being processed right now? */
parent = g_file_get_parent (file);
if (parent) {
if (tracker_task_pool_find (fs->priv->task_pool, parent) ||
tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), parent)) {
if (tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, parent) == TRACKER_BUFFER_STATE_FLUSHING) {
/* Yes, a previous event on the parent of this item
* currently being processed */
fs->priv->item_queue_blocker = parent;
......@@ -1842,34 +1824,7 @@ miner_handle_next_item (TrackerMinerFS *fs)
case TRACKER_MINER_FS_EVENT_UPDATED:
parent = g_file_get_parent (file);
if (!parent ||
tracker_indexing_tree_file_is_root (fs->priv->indexing_tree, file) ||
!tracker_indexing_tree_get_root (fs->priv->indexing_tree, file, NULL) ||
tracker_file_notifier_get_file_iri (fs->priv->file_notifier, parent, TRUE)) {
keep_processing = item_add_or_update (fs, file, priority, attributes_update);
} else {
gchar *uri;
/* We got an event on a file that has not its parent indexed
* even though it should. Given item_queue_get_next_file()
* above should return FALSE whenever the parent file is
* being processed, this means the parent is neither
* being processed nor indexed, no good.
*
* Bail out in these cases by removing all queued files
* inside the missing file. Whatever it was, it shall
* hopefully be fixed on next index.
*/
uri = g_file_get_uri (parent);
g_warning ("Parent '%s' not indexed yet", uri);
g_free (uri);
tracker_priority_queue_foreach_remove (fs->priv->items,
(GEqualFunc) queue_event_is_equal_or_descendant,
parent,
(GDestroyNotify) queue_event_free);
keep_processing = TRUE;
}
keep_processing = item_add_or_update (fs, file, priority, attributes_update);
if (parent) {
g_object_unref (parent);
......@@ -2077,10 +2032,6 @@ miner_fs_queue_event (TrackerMinerFS *fs,
(GDestroyNotify) queue_event_free);
}
/* Ensure IRI is cached */
tracker_file_notifier_get_file_iri (fs->priv->file_notifier,
event->file, TRUE);
trace_eq_event (event);
link = tracker_priority_queue_add (fs->priv->items, event, priority);
......@@ -2089,20 +2040,6 @@ miner_fs_queue_event (TrackerMinerFS *fs,
}
}
static gboolean
filter_event (TrackerMinerFS *fs,
TrackerMinerFSEventType type,
GFile *file,
GFile *source_file)
{
TrackerMinerFSClass *klass = TRACKER_MINER_FS_GET_CLASS (fs);
if (!klass->filter_event)
return FALSE;
return klass->filter_event (fs, type, file, source_file);
}
static void
file_notifier_file_created (TrackerFileNotifier *notifier,
GFile *file,
......@@ -2111,9 +2048,6 @@ file_notifier_file_created (TrackerFileNotifier *notifier,
TrackerMinerFS *fs = user_data;
QueueEvent *event;
if (filter_event (fs, TRACKER_MINER_FS_EVENT_CREATED, file, NULL))
return;
event = queue_event_new (TRACKER_MINER_FS_EVENT_CREATED, file);
miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, file));
}
......@@ -2126,9 +2060,6 @@ file_notifier_file_deleted (TrackerFileNotifier *notifier,
TrackerMinerFS *fs = user_data;
QueueEvent *event;
if (filter_event (fs, TRACKER_MINER_FS_EVENT_DELETED, file, NULL))
return;
if (tracker_file_notifier_get_file_type (notifier, file) == G_FILE_TYPE_DIRECTORY) {
/* Cancel all pending tasks on files inside the path given by file */
tracker_task_pool_foreach (fs->priv->task_pool,
......@@ -2149,10 +2080,6 @@ file_notifier_file_updated (TrackerFileNotifier *notifier,
TrackerMinerFS *fs = user_data;
QueueEvent *event;
if (!attributes_only &&
filter_event (fs, TRACKER_MINER_FS_EVENT_UPDATED, file, NULL))
return;
event = queue_event_new (TRACKER_MINER_FS_EVENT_UPDATED, file);
event->attributes_update = attributes_only;
miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, file));
......@@ -2167,9 +2094,6 @@ file_notifier_file_moved (TrackerFileNotifier *notifier,
TrackerMinerFS *fs = user_data;
QueueEvent *event;
if (filter_event (fs, TRACKER_MINER_FS_EVENT_MOVED, dest, source))
return;
event = queue_event_moved_new (source, dest);
miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, source));
}
......@@ -2280,8 +2204,10 @@ task_pool_cancel_foreach (gpointer data,
GFile *file = user_data;
GFile *task_file;
UpdateProcessingTaskContext *ctxt;
GTask *gtask;
ctxt = tracker_task_get_data (task);
gtask = tracker_task_get_data (task);
ctxt = g_task_get_task_data (gtask);
task_file = tracker_task_get_file (task);
if (ctxt &&
......@@ -2408,9 +2334,6 @@ tracker_miner_fs_check_file (TrackerMinerFS *fs,
return;
}
tracker_file_notifier_get_file_iri (fs->priv->file_notifier,
file, TRUE);
event = queue_event_new (TRACKER_MINER_FS_EVENT_UPDATED, file);
trace_eq_event (event);
miner_fs_queue_event (fs, event, priority);
......@@ -2509,66 +2432,7 @@ tracker_miner_fs_get_throttle (TrackerMinerFS *fs)
}
/**
* tracker_miner_fs_get_urn:
* @fs: a #TrackerMinerFS
* @file: a #GFile obtained in #TrackerMinerFS::process-file
*
* If the item exists in the store, this function retrieves
* the URN for a #GFile being currently processed.
* If @file is not being currently processed by @fs, or doesn't
* exist in the store yet, %NULL will be returned.
*
* Returns: (transfer none) (nullable): The URN containing the data associated to @file,
* or %NULL.
*
* Since: 0.8
**/
const gchar *
tracker_miner_fs_get_urn (TrackerMinerFS *fs,
GFile *file)
{
TrackerTask *task;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
/* Check if found in currently processed data */
task = tracker_task_pool_find (fs->priv->task_pool, file);
if (!task) {
gchar *uri;
uri = g_file_get_uri (file);
g_critical ("File '%s' is not being currently processed, "
"so the URN cannot be retrieved.", uri);
g_free (uri);
return NULL;
} else {
UpdateProcessingTaskContext *ctxt;
/* We are only storing the URN in the created/updated tasks */
ctxt = tracker_task_get_data (task);
if (!ctxt) {
gchar *uri;
uri = g_file_get_uri (file);
g_critical ("File '%s' is being processed, but not as a "
"CREATED/UPDATED task, so cannot get URN",
uri);
g_free (uri);
return NULL;
}
return ctxt->urn;
}
}
/**
* tracker_miner_fs_query_urn:
* tracker_miner_fs_get_folder_urn:
* @fs: a #TrackerMinerFS
* @file: a #GFile
*
......@@ -2577,19 +2441,19 @@ tracker_miner_fs_get_urn (TrackerMinerFS *fs,
* If @file doesn't exist in the store yet, %NULL will be returned.
*
* Returns: (transfer full): A newly allocated string with the URN containing the data associated
* Returns: The URN containing the data associated
* to @file, or %NULL.
*
* Since: 0.10
**/
gchar *
tracker_miner_fs_query_urn (TrackerMinerFS *fs,
GFile *file)
const gchar *
tracker_miner_fs_get_folder_urn (TrackerMinerFS *fs,
GFile *file)
{
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
return g_strdup (tracker_file_notifier_get_file_iri (fs->priv->file_notifier, file, TRUE));
return tracker_file_notifier_get_file_iri (fs->priv->file_notifier, file, FALSE);
}
/**
......@@ -2655,3 +2519,26 @@ tracker_miner_fs_get_data_provider (TrackerMinerFS *fs)
return fs->priv->data_provider;
}
gchar *
tracker_miner_fs_get_file_bnode (TrackerMinerFS *fs,
GFile *file)
{
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
if (tracker_task_pool_find (fs->priv->task_pool, file) ||
tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, file) == TRACKER_BUFFER_STATE_QUEUED) {
gchar *uri, *bnode, *checksum;
uri = g_file_get_uri (file);
checksum = g_compute_checksum_for_string (G_CHECKSUM_MD5, uri, -1);
bnode = g_strdup_printf ("_:%s", checksum);
g_free (checksum);
g_free (uri);
return bnode;
}
return NULL;
}
......@@ -83,7 +83,6 @@ struct _TrackerMinerFS {
* @remove_file: Called when a file is removed.
* @remove_children: Called when children have been removed.
* @move_file: Called when a file has moved.
* @filter_event: Called to filter the event happening to a file.
* @padding: Reserved for future API improvements.
*
* Prototype for the abstract class, @process_file must be implemented
......@@ -118,12 +117,6 @@ typedef struct {
GFile *dest,
GFile *source,
gboolean recursive);
gboolean (* filter_event) (TrackerMinerFS *fs,
TrackerMinerFSEventType type,
GFile *file,
GFile *source_file);
/* <Private> */
gpointer padding[20];
} TrackerMinerFSClass;
......@@ -166,11 +159,10 @@ void tracker_miner_fs_notify_finish (TrackerMinerFS *f
GError *error);
/* URNs */
const gchar *tracker_miner_fs_get_urn (TrackerMinerFS *fs,
const gchar *tracker_miner_fs_get_folder_urn (TrackerMinerFS *fs,
GFile *file);
gchar *tracker_miner_fs_query_urn (TrackerMinerFS *fs,
GFile *file);
gchar * tracker_miner_fs_get_file_bnode (TrackerMinerFS *fs,
GFile *file);
/* Progress */
gboolean tracker_miner_fs_has_items_to_process (TrackerMinerFS *fs);
......
......@@ -417,9 +417,6 @@ sparql_buffer_push_to_pool (TrackerSparqlBuffer *buffer,
if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
} else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) {
/* We've filled half of the buffer, flush it as we receive more tasks */
tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full");
}
}
......@@ -519,3 +516,25 @@ tracker_sparql_buffer_push_finish (TrackerSparqlBuffer *buffer,
return task;
}
TrackerSparqlBufferState
tracker_sparql_buffer_get_state (TrackerSparqlBuffer *buffer,
GFile *file)
{
TrackerSparqlBufferPrivate *priv;
TrackerTask *task;
g_return_val_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer), TRACKER_BUFFER_STATE_UNKNOWN);
g_return_val_if_fail (G_IS_FILE (file), TRACKER_BUFFER_STATE_UNKNOWN);
priv = tracker_sparql_buffer_get_instance_private (TRACKER_SPARQL_BUFFER (buffer));