Commit 760cc6b2 authored by Carlos Garnacho's avatar Carlos Garnacho

libtracker-miner: Gather as many SPARQL updates as possible for every batch

We currently block the processing queue if the parent is seen in any stage
of processing, the situation is unblocked by flushing early, so processing
can resume after the SPARQL updates were performed.

This may lead to suboptimal buffer occupation, ultimately dependent on
the filesystem layout.

To improve this situation, rely on blank node labels being stable across
the whole SPARQL update string, and add a blank node labeling scheme that
allows files within a same SPARQL batch reference each other through these
blank node labels instead of IRIs.

This allows maximum buffer occupation regardless of the filesystem layout,
we still have to wait after a SPARQL update if a file being processed
references (i.e. child/parent relationship) another file added in the
SPARQL update being currently done. But that happens once per batch,
instead of once per folder.
parent f8dffcc8
Pipeline #193974 passed with stage
in 1 minute and 50 seconds
......@@ -1536,8 +1536,7 @@ should_wait (TrackerMinerFS *fs,
GFile *parent;
/* Is the item already being processed? */
if (tracker_task_pool_find (fs->priv->task_pool, file) ||
tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), file)) {
if (tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, file) == TRACKER_BUFFER_STATE_FLUSHING) {
/* Yes, a previous event on same item currently
* being processed */
fs->priv->item_queue_blocker = g_object_ref (file);
......@@ -1547,8 +1546,7 @@ should_wait (TrackerMinerFS *fs,
/* Is the item's parent being processed right now? */
parent = g_file_get_parent (file);
if (parent) {
if (tracker_task_pool_find (fs->priv->task_pool, parent) ||
tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), parent)) {
if (tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, parent) == TRACKER_BUFFER_STATE_FLUSHING) {
/* Yes, a previous event on the parent of this item
* currently being processed */
fs->priv->item_queue_blocker = parent;
......@@ -1826,34 +1824,7 @@ miner_handle_next_item (TrackerMinerFS *fs)
case TRACKER_MINER_FS_EVENT_UPDATED:
parent = g_file_get_parent (file);
if (!parent ||
tracker_indexing_tree_file_is_root (fs->priv->indexing_tree, file) ||
!tracker_indexing_tree_get_root (fs->priv->indexing_tree, file, NULL) ||
tracker_file_notifier_get_file_iri (fs->priv->file_notifier, parent, FALSE)) {
keep_processing = item_add_or_update (fs, file, priority, attributes_update);
} else {
gchar *uri;
/* We got an event on a file that has not its parent indexed
* even though it should. Given item_queue_get_next_file()
* above should return FALSE whenever the parent file is
* being processed, this means the parent is neither
* being processed nor indexed, no good.
*
* Bail out in these cases by removing all queued files
* inside the missing file. Whatever it was, it shall
* hopefully be fixed on next index.
*/
uri = g_file_get_uri (parent);
g_warning ("Parent '%s' not indexed yet", uri);
g_free (uri);
tracker_priority_queue_foreach_remove (fs->priv->items,
(GEqualFunc) queue_event_is_equal_or_descendant,
parent,
(GDestroyNotify) queue_event_free);
keep_processing = TRUE;
}
keep_processing = item_add_or_update (fs, file, priority, attributes_update);
if (parent) {
g_object_unref (parent);
......@@ -2548,3 +2519,26 @@ tracker_miner_fs_get_data_provider (TrackerMinerFS *fs)
return fs->priv->data_provider;
}
gchar *
tracker_miner_fs_get_file_bnode (TrackerMinerFS *fs,
GFile *file)
{
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
if (tracker_task_pool_find (fs->priv->task_pool, file) ||
tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, file) == TRACKER_BUFFER_STATE_QUEUED) {
gchar *uri, *bnode, *checksum;
uri = g_file_get_uri (file);
checksum = g_compute_checksum_for_string (G_CHECKSUM_MD5, uri, -1);
bnode = g_strdup_printf ("_:%s", checksum);
g_free (checksum);
g_free (uri);
return bnode;
}
return NULL;
}
......@@ -161,7 +161,8 @@ void tracker_miner_fs_notify_finish (TrackerMinerFS *f
/* URNs */
const gchar *tracker_miner_fs_get_folder_urn (TrackerMinerFS *fs,
GFile *file);
gchar * tracker_miner_fs_get_file_bnode (TrackerMinerFS *fs,
GFile *file);
/* Progress */
gboolean tracker_miner_fs_has_items_to_process (TrackerMinerFS *fs);
......
......@@ -516,3 +516,25 @@ tracker_sparql_buffer_push_finish (TrackerSparqlBuffer *buffer,
return task;
}
TrackerSparqlBufferState
tracker_sparql_buffer_get_state (TrackerSparqlBuffer *buffer,
GFile *file)
{
TrackerSparqlBufferPrivate *priv;
TrackerTask *task;
g_return_val_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer), TRACKER_BUFFER_STATE_UNKNOWN);
g_return_val_if_fail (G_IS_FILE (file), TRACKER_BUFFER_STATE_UNKNOWN);
priv = tracker_sparql_buffer_get_instance_private (TRACKER_SPARQL_BUFFER (buffer));
task = tracker_task_pool_find (TRACKER_TASK_POOL (buffer), file);
if (!task)
return TRACKER_BUFFER_STATE_UNKNOWN;
if (priv->tasks && g_ptr_array_find (priv->tasks, task, NULL))
return TRACKER_BUFFER_STATE_QUEUED;
return TRACKER_BUFFER_STATE_FLUSHING;
}
......@@ -42,6 +42,13 @@ G_BEGIN_DECLS
typedef struct _TrackerSparqlBuffer TrackerSparqlBuffer;
typedef struct _TrackerSparqlBufferClass TrackerSparqlBufferClass;
typedef enum
{
TRACKER_BUFFER_STATE_UNKNOWN,
TRACKER_BUFFER_STATE_QUEUED,
TRACKER_BUFFER_STATE_FLUSHING,
} TrackerSparqlBufferState;
struct _TrackerSparqlBuffer
{
TrackerTaskPool parent_instance;
......@@ -70,6 +77,9 @@ TrackerTask * tracker_sparql_buffer_push_finish (TrackerSparqlBuffer *bu
GAsyncResult *res,
GError **error);
TrackerSparqlBufferState tracker_sparql_buffer_get_state (TrackerSparqlBuffer *buffer,
GFile *file);
TrackerTask * tracker_sparql_task_new_take_sparql_str (GFile *file,
gchar *sparql_str);
TrackerTask * tracker_sparql_task_new_with_sparql_str (GFile *file,
......
......@@ -1937,6 +1937,26 @@ index_applications_changed_cb (GObject *gobject,
}
}
static gchar *
folder_urn_or_bnode (TrackerMinerFiles *mf,
GFile *file,
gboolean *is_iri)
{
const gchar *urn;
if (is_iri)
*is_iri = FALSE;
urn = tracker_miner_fs_get_folder_urn (TRACKER_MINER_FS (mf), file);
if (urn) {
if (is_iri)
*is_iri = TRUE;
return g_strdup (urn);
}
return tracker_miner_fs_get_file_bnode (TRACKER_MINER_FS (mf), file);
}
static void
miner_files_add_to_datasource (TrackerMinerFiles *mf,
GFile *file,
......@@ -1950,16 +1970,18 @@ miner_files_add_to_datasource (TrackerMinerFiles *mf,
if (tracker_indexing_tree_file_is_root (indexing_tree, file)) {
tracker_resource_set_relation (resource, "nie:dataSource", element_resource);
} else {
const gchar *root_urn = NULL;
gchar *identifier = NULL;
GFile *root;
root = tracker_indexing_tree_get_root (indexing_tree, file, NULL);
if (root)
root_urn = tracker_miner_fs_get_folder_urn (TRACKER_MINER_FS (mf), root);
identifier = folder_urn_or_bnode (mf, root, NULL);
if (root_urn)
tracker_resource_set_uri (resource, "nie:dataSource", root_urn);
if (identifier)
tracker_resource_set_uri (resource, "nie:dataSource", identifier);
g_free (identifier);
}
}
......@@ -1991,13 +2013,13 @@ miner_files_create_folder_information_element (TrackerMinerFiles *miner,
gboolean is_directory)
{
TrackerResource *resource, *file_resource;
const gchar *urn = NULL;
gchar *uri;
gchar *urn, *uri;
/* Preserve URN for nfo:Folders */
urn = tracker_miner_fs_get_folder_urn (TRACKER_MINER_FS (miner), file);
urn = folder_urn_or_bnode (miner, file, NULL);
resource = tracker_resource_new (urn);
g_free (urn);
tracker_resource_set_string (resource, "nie:mimeType", mime_type);
tracker_resource_add_uri (resource, "rdf:type", "nie:InformationElement");
......@@ -2094,7 +2116,7 @@ process_file_cb (GObject *object,
TrackerResource *resource, *folder_resource = NULL;
ProcessFileData *data;
const gchar *mime_type, *graph;
const gchar *parent_urn;
gchar *parent_urn;
gchar *delete_properties_sparql = NULL, *mount_point_sparql;
GFileInfo *file_info;
guint64 time_;
......@@ -2164,11 +2186,13 @@ process_file_cb (GObject *object,
tracker_resource_add_uri (resource, "rdf:type", "nfo:FileDataObject");
parent = g_file_get_parent (file);
parent_urn = tracker_miner_fs_get_folder_urn (TRACKER_MINER_FS (data->miner), parent);
parent_urn = folder_urn_or_bnode (data->miner, parent, NULL);
g_object_unref (parent);
if (parent_urn)
if (parent_urn) {
tracker_resource_set_uri (resource, "nfo:belongsToContainer", parent_urn);
g_free (parent_urn);
}
tracker_resource_set_string (resource, "nfo:fileName",
g_file_info_get_display_name (file_info));
......@@ -2443,7 +2467,6 @@ miner_files_move_file (TrackerMinerFS *fs,
gboolean recursive)
{
GString *sparql = g_string_new (NULL);
const gchar *new_parent_iri = NULL;
gchar *uri, *source_uri, *display_name, *container_clause = NULL;
gchar *path, *basename;
GFile *new_parent;
......@@ -2459,10 +2482,23 @@ miner_files_move_file (TrackerMinerFS *fs,
/* Get new parent information */
new_parent = g_file_get_parent (file);
if (new_parent)
new_parent_iri = tracker_miner_fs_get_folder_urn (fs, new_parent);
if (new_parent_iri)
container_clause = g_strdup_printf ("; nfo:belongsToContainer <%s>", new_parent_iri);
if (new_parent) {
gchar *new_parent_id;
gboolean is_iri;
new_parent_id = folder_urn_or_bnode (TRACKER_MINER_FILES (fs),
new_parent, &is_iri);
if (new_parent_id) {
container_clause =
g_strdup_printf ("; nfo:belongsToContainer %s%s%s",
is_iri ? "<" : "",
new_parent_id,
is_iri ? ">" : "");
}
g_free (new_parent_id);
}
g_string_append_printf (sparql,
"DELETE { "
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment