Commit b70e0889 authored by Ivan Frade's avatar Ivan Frade

New tests

parent 23310404
This diff is collapsed.
#!/usr/bin/env python
#
# Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
#
import dbus
import unittest
import random
TRACKER = 'org.freedesktop.Tracker1'
TRACKER_OBJ = '/org/freedesktop/Tracker1/Resources'
RESOURCES_IFACE = "org.freedesktop.Tracker1.Resources"
class TestMetacontacts (unittest.TestCase):
def setUp (self):
bus = dbus.SessionBus ()
tracker = bus.get_object (TRACKER, TRACKER_OBJ)
self.resources = dbus.Interface (tracker,
dbus_interface=RESOURCES_IFACE);
def test_metacontact_usage (self):
"""
1. Insert Person and IM Contact with the same Metacontact
2. Query by metacontact (there should be one)
3. Add a new IM Contact and link it to the previous Metacontact
4. Query by metacontact (there should be on)
3. Remove the instances added
"""
initial_data = """
INSERT {
<telephaty:///o/f/t/accounts/ivan_frade_gmail_com> a nco:IMAccount .
nco:default-contact-me nco:hasIMAccount <telephaty:///o/f/t/accounts/ivan_frade_gmail_com>.
nco:default-contact-me nco:hasIMAccount <telephaty:///o/f/t/accounts/ivan_jabber_org>.
<urn:uuid:metacontact-ivan> a nco:MetaContact.
<contact://test_metacontacts/person1> a nco:PersonContact ;
nco:metacontact <urn:uuid:metacontact-ivan> ;
nco:fullname 'Ivan in local addressbook'.
<contact://test_metacontacts/im1> a nco:IMContact ;
nco:metacontact <urn:uuid:metacontact-ivan> ;
nco:fromIMAccount <telephaty:///o/f/t/accounts/ivan_frade_gmail_com> ;
nco:fullname 'Ivan at gmail'.
}
"""
self.resources.SparqlUpdate (initial_data)
query = """
SELECT ?c WHERE {
?c nco:metacontact <urn:uuid:metacontact-ivan> .
}
"""
results = self.resources.SparqlQuery (query)
self.assertEquals (len(results), 2)
new_account = """
INSERT {
<contact://test_metacontacts/im2> a nco:IMContact ;
nco:metacontact <urn:uuid:metacontact-ivan> ;
nco:fromIMAccount <telephaty:///o/f/t/accounts/ivan_jabber_org> ;
nco:fullname 'Ivan at gmail'.
}
"""
self.resources.SparqlUpdate (new_account)
results = self.resources.SparqlQuery (query)
self.assertEquals (len(results), 3)
delete = """
DELETE {
<telephaty:///o/f/t/accounts/ivan_frade_gmail_com> a rdfs:Resource .
nco:default-contact-me nco:hasIMAccount <telephaty:///o/f/t/accounts/ivan_frade_gmail_com>.
nco:default-contact-me nco:hasIMAccount <telephaty:///o/f/t/accounts/ivan_jabber_org>.
<urn:uuid:metacontact-ivan> a rdfs:Resource.
<contact://test_metacontacts/person1> a rdfs:Resource.
<contact://test_metacontacts/im1> a rdfs:Resource .
<contact://test_metacontacts/im2> a rdfs:Resource .
}
"""
self.resources.SparqlUpdate (delete)
def test_metacontact_merge (self):
"""
1. Insert Person (metacontact A)
2. Insert IM Contact (no metacontact)
3. Merge IM Contact into PersonContact (sharing metacontact A)
4. Remove
3. Remove the instances added
"""
initial_data = """
INSERT {
<telephaty:///o/f/t/accounts/ivan_frade_gmail_com> a nco:IMAccount .
nco:default-contact-me nco:hasIMAccount <telephaty:///o/f/t/accounts/ivan_frade_gmail_com>.
<urn:uuid:metacontact-ivan> a nco:MetaContact.
<contact://test_metacontacts/person1> a nco:PersonContact ;
nco:metacontact <urn:uuid:metacontact-ivan> ;
nco:fullname 'Ivan in local addressbook'.
<contact://test_metacontacts/im1> a nco:IMContact ;
nco:fromIMAccount <telephaty:///o/f/t/accounts/ivan_frade_gmail_com> ;
nco:fullname 'Ivan at gmail'.
}
"""
self.resources.SparqlUpdate (initial_data)
query = """
SELECT ?c WHERE {
?c nco:metacontact <urn:uuid:metacontact-ivan> .
}
"""
results = self.resources.SparqlQuery (query)
self.assertEquals (len(results), 1)
merge = """
INSERT {
<contact://test_metacontacts/im1> nco:metacontact <urn:uuid:metacontact-ivan> .
}
"""
self.resources.SparqlUpdate (merge)
results = self.resources.SparqlQuery (query)
self.assertEquals (len(results), 2)
delete = """
DELETE {
<telephaty:///o/f/t/accounts/ivan_frade_gmail_com> a rdfs:Resource .
nco:default-contact-me nco:hasIMAccount <telephaty:///o/f/t/accounts/ivan_frade_gmail_com>.
<urn:uuid:metacontact-ivan> a rdfs:Resource.
<contact://test_metacontacts/person1> a rdfs:Resource.
<contact://test_metacontacts/im1> a rdfs:Resource .
}
"""
self.resources.SparqlUpdate (delete)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python
#!/usr/bin/python
#
# Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
#
......@@ -18,23 +18,23 @@
# 02110-1301, USA.
#
"""
These tests use only the store. They insert instances with known text
and run sparql with fts functions to check the results.
"""
import dbus
import unittest
import random
TRACKER = 'org.freedesktop.Tracker1'
TRACKER_OBJ = '/org/freedesktop/Tracker1/Resources'
RESOURCES_IFACE = "org.freedesktop.Tracker1.Resources"
class TestFTSFunctions (unittest.TestCase):
def setUp (self):
bus = dbus.SessionBus ()
tracker = bus.get_object (TRACKER, TRACKER_OBJ)
self.resources = dbus.Interface (tracker,
dbus_interface=RESOURCES_IFACE);
from common.utils import configuration as cfg
import unittest2 as ut
#import unittest as ut
from common.utils.storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
class TestFTSFunctions (CommonTrackerStoreTest):
"""
Insert data with text and check the fts:xxxx functions are returning the expected results
"""
def test_fts_rank (self):
"""
1. Insert a Contact1 with 'abcdefxyz' as fullname and nickname
......@@ -58,7 +58,7 @@ class TestFTSFunctions (unittest.TestCase):
nco:nickname 'abcdefxyz abcdefxyz' .
}
"""
self.resources.SparqlUpdate (insert_sparql)
self.tracker.update (insert_sparql)
query = """
SELECT ?contact WHERE {
......@@ -66,7 +66,7 @@ class TestFTSFunctions (unittest.TestCase):
fts:match 'abcdefxyz' .
} ORDER BY DESC (fts:rank(?contact))
"""
results = self.resources.SparqlQuery (query)
results = self.tracker.query (query)
self.assertEquals (len(results), 3)
self.assertEquals (results[0][0], "contact://test/fts-function/rank/3")
......@@ -104,7 +104,7 @@ class TestFTSFunctions (unittest.TestCase):
nco:nickname 'abcdefxyz abcdefxyz' .
}
"""
self.resources.SparqlUpdate (insert_sparql)
self.tracker.update (insert_sparql)
query = """
SELECT fts:offsets (?contact) WHERE {
......@@ -112,7 +112,7 @@ class TestFTSFunctions (unittest.TestCase):
fts:match 'abcdefxyz' .
}
"""
results = self.resources.SparqlQuery (query)
results = self.tracker.query (query)
self.assertEquals (len(results), 3)
self.assertEquals (len (results[0][0].split(",")), 4) # (u'151,1,161,1')
self.assertEquals (len (results[1][0].split(",")), 2) # (u'161,1')
......@@ -129,4 +129,4 @@ class TestFTSFunctions (unittest.TestCase):
if __name__ == '__main__':
unittest.main()
ut.main()
#!/usr/bin/python
#
# Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
#
"""
Test that the threads in the daemon are working:
A very long query shouldn't block smaller queries.
"""
import os, dbus
import gobject
import glib
import time
from dbus.mainloop.glib import DBusGMainLoop
from common.utils import configuration as cfg
import unittest2 as ut
#import unittest as ut
from common.utils.storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
MAX_TEST_TIME = 60 # seconds to finish the tests (to avoid infinite waitings)
AMOUNT_SIMPLE_QUERIES = 10
COMPLEX_QUERY_TIMEOUT = 15000 # ms (How long do we wait for an answer to the complex query)
SIMPLE_QUERY_FREQ = 2 # seconds (How freq do we send a simple query to the daemon)
class TestThreadedStore (CommonTrackerStoreTest):
"""
When the database is big, running a complex query takes ages.
After cancelling the query, any following query is queued
Reported in bug NB#183499
"""
def setUp (self):
self.main_loop = gobject.MainLoop ()
self.simple_queries_counter = AMOUNT_SIMPLE_QUERIES
self.simple_queries_answers = 0
def __populate_database (self):
self.assertTrue (os.path.exists ('ttl'))
for ttl_file in ["010-nco_EmailAddress.ttl",
"011-nco_PostalAddress.ttl",
"012-nco_PhoneNumber.ttl",
"014-nco_ContactEmail.ttl",
"015-nco_ContactCall.ttl",
"018-nco_PersonContact.ttl",
"012-nco_PhoneNumber.ttl",
"016-nco_ContactIM.ttl"]:
full_path = os.path.abspath(os.path.join ("ttl", ttl_file))
print full_path
self.tracker.get_tracker_iface ().Load ("file://" + full_path,
timeout=30000)
def test_complex_query (self):
start = time.time ()
self.__populate_database ()
end = time.time ()
print "Loading: %.3f sec." % (end-start)
COMPLEX_QUERY = """
SELECT ?url nie:url(?photo) nco:imContactStatusMessage (?url)
tracker:coalesce(nco:nameFamily (?url), nco:nameFamily (?url), nco:nameGiven (?org), ?email, ?phone, nco:blogUrl (?url))
WHERE {
{ ?url a nco:PersonContact.
?url fts:match 'fami*'.
} UNION {
?url a nco:PersonContact.
?url nco:hasEmailAddress ?add.
?add fts:match 'fami*'.
} UNION {
?url a nco:PersonContact.
?url nco:hasPostalAddress ?post.
?post fts:match 'fami*'.
}
OPTIONAL { ?url nco:photo ?photo.}
OPTIONAL { ?url nco:org ?org. }
OPTIONAL { ?url maemo:relevance ?relevance.}
OPTIONAL { ?url nco:hasPhoneNumber ?hasphone. ?hasPhone nco:phoneNumber ?phone.}
OPTIONAL { ?url nco:hasEmailAddress ?hasemail. ?hasemail nco:emailAddress ?email.}
} ORDER BY ?relevance LIMIT 100"""
# Standard timeout
print "Send complex query"
self.complex_start = time.time ()
self.tracker.get_tracker_iface ().SparqlQuery (COMPLEX_QUERY, timeout=COMPLEX_QUERY_TIMEOUT,
reply_handler=self.reply_complex,
error_handler=self.error_handler_complex)
self.timeout_id = glib.timeout_add_seconds (MAX_TEST_TIME, self.__timeout_on_idle)
glib.timeout_add_seconds (SIMPLE_QUERY_FREQ, self.__simple_query)
self.main_loop.run ()
def __simple_query (self):
print "Send simple query (%d)" % (self.simple_queries_counter)
SIMPLE_QUERY = "SELECT ?name WHERE { ?u a nco:PersonContact; nco:fullname ?name. }"
self.tracker.get_tracker_iface ().SparqlQuery (SIMPLE_QUERY,
timeout=10000,
reply_handler=self.reply_simple,
error_handler=self.error_handler)
self.simple_queries_counter -= 1
if (self.simple_queries_counter == 0):
print "Stop sending queries (wait)"
return False
return True
def reply_simple (self, results):
print "Simple query answered"
self.assertNotEquals (len (results), 0)
self.simple_queries_answers += 1
if (self.simple_queries_answers == AMOUNT_SIMPLE_QUERIES):
print "All simple queries answered"
self.main_loop.quit ()
def reply_complex (self, results):
print "Complex query: %.3f" % (time.time () - self.complex_start)
def error_handler (self, error_msg):
print "ERROR in dbus call", error_msg
def error_handler_complex (self, error_msg):
print "Complex query timedout in DBus (", error_msg, ")"
def __timeout_on_idle (self):
print "Timeout... asumming idle"
self.main_loop.quit ()
return False
if __name__ == "__main__":
ut.main ()
#!/usr/bin/python
#
# Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
#
"""
Test that after insertion/remove/updates in the store, the signals
are emitted. Theses tests are not extensive (only few selected signals
are tested)
"""
import unittest2 as ut
from common.utils.storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
from common.utils import configuration as cfg
import gobject
import glib
import dbus
from dbus.mainloop.glib import DBusGMainLoop
SUBJECTS_ADDED_SIGNAL = "SubjectsAdded"
SUBJECTS_REMOVED_SIGNAL = "SubjectsRemoved"
SUBJECTS_CHANGED_SIGNAL = "SubjectsChanged"
NCO_CONTACT_PATH = "/org/freedesktop/Tracker1/Resources/Classes/nco/Contact"
SIGNALS_IFACE = "org.freedesktop.Tracker1.Resources.Class"
REASONABLE_TIMEOUT = 10 # Time waiting for the signal to be emitted
class TrackerStoreSignalsTests (CommonTrackerStoreTest):
"""
Insert/update/remove instances from nco:PersonContact
and check that the signals are emitted
"""
def setUp (self):
self.loop = gobject.MainLoop()
dbus_loop = DBusGMainLoop(set_as_default=True)
self.bus = dbus.SessionBus (dbus_loop)
self.timeout_id = 0
def __connect_signal (self, signal_name, callback):
"""
After connecting to the signal, call self.__wait_for_signal.
That function will wait in a loop, so make sure that the callback
calls self.loop.quit ()
"""
if not signal_name in [SUBJECTS_ADDED_SIGNAL, SUBJECTS_REMOVED_SIGNAL, SUBJECTS_CHANGED_SIGNAL]:
print "What kind of signal are you trying to connect?!"
assert False
self.cb_id = self.bus.add_signal_receiver (callback,
signal_name=signal_name,
path = NCO_CONTACT_PATH,
dbus_interface = SIGNALS_IFACE)
def __wait_for_signal (self):
"""
In the callback of the signals, there should be a self.loop.quit ()
"""
self.timeout_id = glib.timeout_add_seconds (REASONABLE_TIMEOUT, self.__timeout_on_idle)
self.loop.run ()
def __timeout_on_idle (self):
self.loop.quit ()
self.fail ("Timeout, the signal never came!")
def __disconnect_signals_after_test (fn):
"""
Here maybe i got a bit carried away with python instrospection.
This decorator makes the function run in a try/finally, and disconnect
all the signals afterwards.
It means that the signal callbacks just need to ensure the results are fine.
Don't touch this unless you know what are you doing.
"""
def new (self, *args):
try:
fn (self, *args)
finally:
if (self.timeout_id != 0):
glib.source_remove (self.timeout_id )
self.timeout_id = 0
self.loop.quit ()
self.bus._clean_up_signal_match (self.cb_id)
return new
@__disconnect_signals_after_test
def __contact_added_cb (self, contacts_added):
self.assertEquals (len (contacts_added), 1)
self.assertIn ("test://signals-contact-add", contacts_added)
def test_01_insert_contact (self):
CONTACT = """
INSERT {
<test://signals-contact-add> a nco:PersonContact ;
nco:nameGiven 'Contact-name added';
nco:nameFamily 'Contact-family added';
nie:generator 'test-14-signals' ;
nco:contactUID '1321321312312';
nco:hasPhoneNumber <tel:555555555> .
}
"""
self.__connect_signal (SUBJECTS_ADDED_SIGNAL, self.__contact_added_cb)
self.tracker.update (CONTACT)
self.__wait_for_signal ()
self.tracker.update ("""
DELETE { <test://signals-contact-add> a rdfs:Resource }
""")
@__disconnect_signals_after_test
def __contact_removed_cb (self, contacts_removed):
self.assertEquals (len (contacts_removed), 1)
self.assertIn ("test://signals-contact-remove", contacts_removed)
def test_02_remove_contact (self):
CONTACT = """
INSERT {
<test://signals-contact-remove> a nco:PersonContact ;
nco:nameGiven 'Contact-name removed';
nco:nameFamily 'Contact-family removed'.
}
"""
self.tracker.update (CONTACT)
self.__connect_signal (SUBJECTS_REMOVED_SIGNAL, self.__contact_removed_cb)
self.tracker.update ("""
DELETE { <test://signals-contact-remove> a rdfs:Resource }
""")
self.__wait_for_signal ()
@__disconnect_signals_after_test
def __contact_updated_cb (self, contacts_updated, props_updated):
self.assertEquals (len (contacts_updated), 1)
self.assertIn ("test://signals-contact-update", contacts_updated)
self.assertEquals (len (props_updated), 1)
self.assertIn ("http://www.semanticdesktop.org/ontologies/2007/03/22/nco#fullname", props_updated)
def test_03_update_contact (self):
self.tracker.update ("INSERT { <test://signals-contact-update> a nco:PersonContact }")
self.__connect_signal (SUBJECTS_CHANGED_SIGNAL, self.__contact_updated_cb)
self.tracker.update ("INSERT { <test://signals-contact-update> nco:fullname 'wohoo'}")
self.__wait_for_signal ()
self.tracker.update ("DELETE { <test://signals-contact-update> a rdfs:Resource}")
if __name__ == "__main__":
ut.main()
#!/usr/bin/python
#
# Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
#
import os
import dbus # For the exception handling
from common.utils.system import TrackerSystemAbstraction
from common.utils.helpers import StoreHelper
from common.utils import configuration as cfg
from common.utils.storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
import unittest2 as ut
"""
Call backup, restore, force the journal replay and check the data is correct afterwards
"""
class BackupRestoreTest (CommonTrackerStoreTest):
"""
Backup and restore to/from valid/invalid files
"""
def setUp (self):
self.TEST_INSTANCE = "test://backup-restore/1"
self.BACKUP_FILE = "file://" + os.path.join (cfg.TEST_TMP_DIR, "tracker-backup-test-1")
if (os.path.exists (self.BACKUP_FILE)):
os.unlink (self.BACKUP_FILE)
def __insert_test_instance (self):
self.tracker.update ("INSERT { <%s> a nco:Contact; nco:fullname 'test-backup' } "
% (self.TEST_INSTANCE))
def __delete_test_instance (self):
self.tracker.update ("DELETE { <%s> a rdfs:Resource } " % (self.TEST_INSTANCE))
def __is_test_instance_there (self):
result = self.tracker.query ("SELECT ?u WHERE { ?u a nco:Contact; nco:fullname 'test-backup'}")
if (len (result) == 1 and len (result[0]) == 1 and result[0][0] == self.TEST_INSTANCE):
return True
return False
def test_backup_01(self):
"""
Inserted data is restored after backup
1.Insert contact
2.Take backup.
3.Delete contact. (check it is not there)
4.Restore the file.
5.Check the contact is back there
"""
self.__insert_test_instance ()
instances_before = self.tracker.count_instances ("nco:Contact")
self.tracker.backup (self.BACKUP_FILE)
self.__delete_test_instance ()
instances_now = self.tracker.count_instances ("nco:Contact")
self.assertEquals (instances_before-1, instances_now)
self.tracker.restore (self.BACKUP_FILE)
instances_after = self.tracker.count_instances ("nco:Contact")
self.assertEquals (instances_before, instances_after)
self.assertTrue (self.__is_test_instance_there ())
# Clean the DB for the next test
self.__delete_test_instance ()
def test_backup_02 (self):
"""
Data inserted after backup is lost in restore
1.Take backup of db.
2.Insert a contact.
3.Restore the db.
4.Search for the contact inserted.
"""
# Precondition: test backup contact shouldn't be there
self.assertFalse (self.__is_test_instance_there ())
self.tracker.backup (self.BACKUP_FILE)
self.__insert_test_instance ()
self.assertTrue (self.__is_test_instance_there ())
self.tracker.restore (self.BACKUP_FILE)
self.assertFalse (self.__is_test_instance_there ())
def test_backup_03 (self):
"""
Restore from a random text file
"""
TEST_FILE = os.path.join (cfg.TEST_TMP_DIR, "trash_file")
trashfile = open (TEST_FILE, "w")
trashfile.write ("Here some useless text that obviously is NOT a backup")
trashfile.close ()
self.assertRaises (dbus.DBusException,
self.tracker.restore,
"file://" + TEST_FILE)
os.unlink (TEST_FILE)
def test_backup_04 (self):
"""
Restore from a random binary file
"""
TEST_FILE = os.path.join (cfg.TEST_TMP_DIR, "trash_file.dat")
import struct
trashfile = open (TEST_FILE, "wb")
for n in range (0, 50):
data = struct.pack ('i', n)
trashfile.write (data)
trashfile.close ()
instances_before = self.tracker.count_instances ("nie:InformationElement")
self.assertRaises (dbus.DBusException,
self.tracker.restore,
"file://" + TEST_FILE)
os.unlink (TEST_FILE)
def test_backup_05(self):
"""
Take backup of db to a invalid path.
Expected: Backup should not be taken and tracker should behave normally.
"""
self.assertRaises (dbus.DBusException,
self.tracker.backup,
"file://%s/this/is/a/non-existant/folder/backup"