Commit 98781aa0 authored by Martin Kampas's avatar Martin Kampas Committed by Martyn Russell

functional-tests: Don't require miner-fs being idle for testing data inserted

In many test cases the tracker_miner_fs_wait_for_idle() call is used to detect
miner operation has completed. It returns when miner's status changes to
"Idle" (or on timeout). Unfortunately at the time miner goes idle it is not
guaranteed the data are already in store - and the related test fails.

The test case 301-miner-resource-removal.py does it a better way - it listens
to GraphUpdated signal sent by store and waits until the desired resource is
announced being added or removed.

There is comment inside 301-miner-resource-removal.py: "FIXME: put this stuff
in StoreHelper". This patch is to follow that comment.

Needed by tracker-tests-310-fts-indexing-use-graph-updated-signal.patch.
parent 73e9adc0
......@@ -54,7 +54,6 @@ CONF_OPTIONS = [
REASONABLE_TIMEOUT = 30
class MinerResourceRemovalTest (ut.TestCase):
graph_updated_handler_id = 0
# Use the same instances of store and miner-fs for the whole test suite,
# because they take so long to do first-time init.
......@@ -81,175 +80,16 @@ class MinerResourceRemovalTest (ut.TestCase):
@classmethod
def tearDownClass (self):
self.store.bus._clean_up_signal_match (self.graph_updated_handler_id)
self.miner_fs.stop ()
self.extractor.stop ()
self.store.stop ()
def setUp (self):
self.inserts_list = []
self.deletes_list = []
self.inserts_match_function = None
self.deletes_match_function = None
self.match_timed_out = False
self.graph_updated_handler_id = self.store.bus.add_signal_receiver (self._graph_updated_cb,
signal_name = "GraphUpdated",
path = "/org/freedesktop/Tracker1/Resources",
dbus_interface = "org.freedesktop.Tracker1.Resources")
self.store.reset_graph_updates_tracking ()
def tearDown (self):
self.system.unset_up_environment ()
# A system to follow GraphUpdated and make sure all changes are tracked.
# This code saves every change notification received, and exposes methods
# to await insertion or deletion of a certain resource which first check
# the list of events already received and wait for more if the event has
# not yet happened.
#
# FIXME: put this stuff in StoreHelper
def _timeout_cb (self):
self.match_timed_out = True
self.store.loop.quit ()
# Don't fail here, exceptions don't get propagated correctly
# from the GMainLoop
def _graph_updated_cb (self, class_name, deletes_list, inserts_list):
"""
Process notifications from tracker-store on resource changes.
"""
matched = False
if inserts_list is not None:
if self.inserts_match_function is not None:
# The match function will remove matched entries from the list
(matched, inserts_list) = self.inserts_match_function (inserts_list)
self.inserts_list += inserts_list
if deletes_list is not None:
if self.deletes_match_function is not None:
(matched, deletes_list) = self.deletes_match_function (deletes_list)
self.deletes_list += deletes_list
def await_resource_inserted (self, rdf_class, url = None, title = None):
"""
Block until a resource matching the parameters becomes available
"""
assert (self.inserts_match_function == None)
def match_cb (inserts_list, in_main_loop = True):
matched = False
filtered_list = []
known_subjects = set ()
#print "Got inserts: ", inserts_list, "\n"
# FIXME: this could be done in an easier way: build one query that filters
# based on every subject id in inserts_list, and returns the id of the one
# that matched :)
for insert in inserts_list:
id = insert[1]
if not matched and id not in known_subjects:
known_subjects.add (id)
where = " ?urn a %s " % rdf_class
if url is not None:
where += "; nie:url \"%s\"" % url
if title is not None:
where += "; nie:title \"%s\"" % title
query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (where, insert[1])
#print "%s\n" % query
result_set = self.store.query (query)
#print result_set, "\n\n"
if len (result_set) > 0:
matched = True
self.matched_resource_urn = result_set[0][0]
self.matched_resource_id = insert[1]
if not matched or id != self.matched_resource_id:
filtered_list += [insert]
if matched and in_main_loop:
glib.source_remove (self.graph_updated_timeout_id)
self.graph_updated_timeout_id = 0
self.inserts_match_function = None
self.store.loop.quit ()
return (matched, filtered_list)
self.matched_resource_urn = None
self.matched_resource_id = None
log ("Await new %s (%i existing inserts)" % (rdf_class, len (self.inserts_list)))
# Check the list of previously received events for matches
(existing_match, self.inserts_list) = match_cb (self.inserts_list, False)
if not existing_match:
self.graph_updated_timeout_id = glib.timeout_add_seconds (REASONABLE_TIMEOUT, self._timeout_cb)
self.inserts_match_function = match_cb
# Run the event loop until the correct notification arrives
self.store.loop.run ()
if self.match_timed_out:
self.fail ("Timeout waiting for resource: class %s, URL %s, title %s" % (rdf_class, url, title))
return (self.matched_resource_id, self.matched_resource_urn)
def await_resource_deleted (self, id, fail_message = None):
"""
Block until we are notified of a resources deletion
"""
assert (self.deletes_match_function == None)
def match_cb (deletes_list, in_main_loop = True):
matched = False
filtered_list = []
#print "Looking for %i in " % id, deletes_list, "\n"
for delete in deletes_list:
if delete[1] == id:
matched = True
else:
filtered_list += [delete]
if matched and in_main_loop:
glib.source_remove (self.graph_updated_timeout_id)
self.graph_updated_timeout_id = 0
self.deletes_match_function = None
self.store.loop.quit ()
return (matched, filtered_list)
log ("Await deletion of %i (%i existing)" % (id, len (self.deletes_list)))
(existing_match, self.deletes_list) = match_cb (self.deletes_list, False)
if not existing_match:
self.graph_updated_timeout_id = glib.timeout_add_seconds (REASONABLE_TIMEOUT, self._timeout_cb)
self.deletes_match_function = match_cb
# Run the event loop until the correct notification arrives
self.store.loop.run ()
if self.match_timed_out:
if fail_message is not None:
self.fail (fail_message)
else:
self.fail ("Resource %i has not been deleted." % id)
return
def create_test_content (self, file_urn, title):
sparql = "INSERT { \
_:ie a nmm:MusicPiece ; \
......@@ -259,8 +99,8 @@ class MinerResourceRemovalTest (ut.TestCase):
self.store.update (sparql)
return self.await_resource_inserted (rdf_class = 'nmm:MusicPiece',
title = title)
return self.store.await_resource_inserted (rdf_class = 'nmm:MusicPiece',
title = title)
def create_test_file (self, file_name):
file_path = get_test_path (file_name)
......@@ -269,8 +109,8 @@ class MinerResourceRemovalTest (ut.TestCase):
file.write ("Test")
file.close ()
return self.await_resource_inserted (rdf_class = 'nfo:Document',
url = get_test_uri (file_name));
return self.store.await_resource_inserted (rdf_class = 'nfo:Document',
url = get_test_uri (file_name));
def assertResourceExists (self, urn):
if self.store.ask ("ASK { <%s> a rdfs:Resource }" % urn) == False:
......@@ -294,10 +134,10 @@ class MinerResourceRemovalTest (ut.TestCase):
os.unlink (get_test_path ("test_1.txt"))
self.await_resource_deleted (file_1_id)
self.await_resource_deleted (ie_1_id,
"Associated logical resource failed to be deleted " \
"when its containing file was removed.")
self.store.await_resource_deleted (file_1_id)
self.store.await_resource_deleted (ie_1_id,
"Associated logical resource failed to be deleted " \
"when its containing file was removed.")
self.assertResourceMissing (file_1_urn)
self.assertResourceMissing (ie_1_urn)
......
......@@ -186,6 +186,8 @@ class StoreHelper (Helper):
PROCESS_NAME = "tracker-store"
BUS_NAME = cfg.TRACKER_BUSNAME
graph_updated_handler_id = 0
def start (self):
Helper.start (self)
......@@ -210,6 +212,172 @@ class StoreHelper (Helper):
self.status_iface.Wait ()
log ("[%s] ready." % self.PROCESS_NAME)
self.reset_graph_updates_tracking ()
self.graph_updated_handler_id = self.bus.add_signal_receiver (self._graph_updated_cb,
signal_name = "GraphUpdated",
path = cfg.TRACKER_OBJ_PATH,
dbus_interface = cfg.RESOURCES_IFACE)
def stop (self):
Helper.stop (self)
self.bus._clean_up_signal_match (self.graph_updated_handler_id)
# A system to follow GraphUpdated and make sure all changes are tracked.
# This code saves every change notification received, and exposes methods
# to await insertion or deletion of a certain resource which first check
# the list of events already received and wait for more if the event has
# not yet happened.
def reset_graph_updates_tracking (self):
self.inserts_list = []
self.deletes_list = []
self.inserts_match_function = None
self.deletes_match_function = None
self.graph_updated_timed_out = False
def _graph_updated_timeout_cb (self):
# Don't fail here, exceptions don't get propagated correctly
# from the GMainLoop
self.graph_updated_timed_out = True
self.loop.quit ()
def _graph_updated_cb (self, class_name, deletes_list, inserts_list):
"""
Process notifications from tracker-store on resource changes.
"""
matched = False
if inserts_list is not None:
if self.inserts_match_function is not None:
# The match function will remove matched entries from the list
(matched, inserts_list) = self.inserts_match_function (inserts_list)
self.inserts_list += inserts_list
if deletes_list is not None:
if self.deletes_match_function is not None:
(matched, deletes_list) = self.deletes_match_function (deletes_list)
self.deletes_list += deletes_list
def await_resource_inserted (self, rdf_class, url = None, title = None):
"""
Block until a resource matching the parameters becomes available
"""
assert (self.inserts_match_function == None)
def match_cb (inserts_list, in_main_loop = True):
matched = False
filtered_list = []
known_subjects = set ()
#print "Got inserts: ", inserts_list, "\n"
# FIXME: this could be done in an easier way: build one query that filters
# based on every subject id in inserts_list, and returns the id of the one
# that matched :)
for insert in inserts_list:
id = insert[1]
if not matched and id not in known_subjects:
known_subjects.add (id)
where = " ?urn a %s " % rdf_class
if url is not None:
where += "; nie:url \"%s\"" % url
if title is not None:
where += "; nie:title \"%s\"" % title
query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (where, insert[1])
#print "%s\n" % query
result_set = self.query (query)
#print result_set, "\n\n"
if len (result_set) > 0:
matched = True
self.matched_resource_urn = result_set[0][0]
self.matched_resource_id = insert[1]
if not matched or id != self.matched_resource_id:
filtered_list += [insert]
if matched and in_main_loop:
glib.source_remove (self.graph_updated_timeout_id)
self.graph_updated_timeout_id = 0
self.inserts_match_function = None
self.loop.quit ()
return (matched, filtered_list)
self.matched_resource_urn = None
self.matched_resource_id = None
log ("Await new %s (%i existing inserts)" % (rdf_class, len (self.inserts_list)))
# Check the list of previously received events for matches
(existing_match, self.inserts_list) = match_cb (self.inserts_list, False)
if not existing_match:
self.graph_updated_timeout_id = glib.timeout_add_seconds (REASONABLE_TIMEOUT,
self._graph_updated_timeout_cb)
self.inserts_match_function = match_cb
# Run the event loop until the correct notification arrives
self.loop.run ()
if self.graph_updated_timed_out:
raise Exception ("Timeout waiting for resource: class %s, URL %s, title %s" % (rdf_class, url, title))
return (self.matched_resource_id, self.matched_resource_urn)
def await_resource_deleted (self, id, fail_message = None):
"""
Block until we are notified of a resources deletion
"""
assert (self.deletes_match_function == None)
def match_cb (deletes_list, in_main_loop = True):
matched = False
filtered_list = []
#print "Looking for %i in " % id, deletes_list, "\n"
for delete in deletes_list:
if delete[1] == id:
matched = True
else:
filtered_list += [delete]
if matched and in_main_loop:
glib.source_remove (self.graph_updated_timeout_id)
self.graph_updated_timeout_id = 0
self.deletes_match_function = None
self.loop.quit ()
return (matched, filtered_list)
log ("Await deletion of %i (%i existing)" % (id, len (self.deletes_list)))
(existing_match, self.deletes_list) = match_cb (self.deletes_list, False)
if not existing_match:
self.graph_updated_timeout_id = glib.timeout_add_seconds (REASONABLE_TIMEOUT,
self._graph_updated_timeout_cb)
self.deletes_match_function = match_cb
# Run the event loop until the correct notification arrives
self.loop.run ()
if self.graph_updated_timed_out:
if fail_message is not None:
raise Exception (fail_message)
else:
raise Exception ("Resource %i has not been deleted." % id)
return
def query (self, query, timeout=5000):
try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment