600-applications-camera.py 10.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#!/usr/bin/python
#
# Copyright (C) 2011, Nokia Corporation <ivan.frade@nokia.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
#

"""
Tests trying to simulate the behaviour of applications working with tracker
"""

25
import os
26 27
import random

28
import unittest as ut
29
from common.utils.applicationstest import CommonTrackerApplicationTest as CommonTrackerApplicationTest
30
from common.utils.helpers import log
31 32


33 34 35 36
NMM_PHOTO = 'http://www.tracker-project.org/temp/nmm#Photo'
NMM_VIDEO = 'http://www.tracker-project.org/temp/nmm#Video'


37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
class TrackerCameraTestSuite (CommonTrackerApplicationTest):
    """
    Common functionality for camera tests.
    """

    def insert_photo_resource_info (self, urn, file_url):
        """
        Insert new photo resource in the store, including nie:mimeType and nie:url
        """
        insert = """
        INSERT { <%(urn)s>
            a nie:InformationElement,
            nie:DataObject,
            nfo:Image,
            nfo:Media,
            nfo:Visual,
            nmm:Photo
        }

        DELETE { <%(urn)s> nie:mimeType ?_1 }
        WHERE { <%(urn)s> nie:mimeType ?_1 }

        INSERT { <%(urn)s>
            a rdfs:Resource ;
            nie:mimeType \"image/jpeg\"
        }

        DELETE { <%(urn)s> nie:url ?_2 }
        WHERE { <%(urn)s> nie:url ?_2 }

        INSERT { <%(urn)s>
            a rdfs:Resource ;
            nie:url \"%(file_url)s\" ;
            nie:isStoredAs <%(urn)s>
        }
        """ % locals()
        self.tracker.update (insert)
        self.assertEquals (self.get_urn_count_by_url (file_url), 1)

    def insert_video_resource_info (self, urn, file_url):
        """
        Insert new video resource in the store, including nie:mimeType and nie:url
        """
        insert = """
        INSERT { <%(urn)s>
            a nie:InformationElement,
            nie:DataObject,
            nfo:Video,
            nfo:Media,
            nfo:Visual,
            nmm:Video
        }

        DELETE { <%(urn)s> nie:mimeType ?_1 }
        WHERE { <%(urn)s> nie:mimeType ?_1 }

        INSERT { <%(urn)s>
            a rdfs:Resource ;
            nie:mimeType \"video/mp4\"
        }

        DELETE { <%(urn)s> nie:url ?_2 }
        WHERE { <%(urn)s> nie:url ?_2 }

        INSERT { <%(urn)s>
            a rdfs:Resource ;
            nie:url \"%(file_url)s\" ;
            nie:isStoredAs <%(urn)s>
        }
        """ % locals()
        self.tracker.update (insert)
        self.assertEquals (self.get_urn_count_by_url (file_url), 1)

    def insert_dummy_location_info (self, fileurn, geolocationurn, postaladdressurn):
        """
        Insert placeholder location info for a file
        """
        location_insert = """
        INSERT { <%s> a             nco:PostalAddress ;
                      nco:country  \"SPAIN\" ;
                      nco:locality \"Tres Cantos\"
        }

        INSERT { <%s> a                 slo:GeoLocation ;
                      slo:postalAddress <%s>
        }

        INSERT { <%s> a            rdfs:Resource ;
                      slo:location <%s>
        }
        """ % (postaladdressurn, geolocationurn, postaladdressurn, fileurn, geolocationurn)
        self.tracker.update (location_insert)


class TrackerCameraPicturesApplicationTests (TrackerCameraTestSuite):
132

133
    def test_01_camera_picture (self):
134 135 136 137 138 139 140 141 142
        """
        Camera simulation:

        1. Create resource in the store for the new file
        2. Write the file
        3. Wait for miner-fs to index it
        4. Ensure no duplicates are found
        """

143
        fileurn = "tracker://test_camera_picture_01/" + str(random.randint (0,100))
144 145 146
        origin_filepath = os.path.join (self.get_data_dir (), self.get_test_image ())
        dest_filepath = os.path.join (self.get_dest_dir (), self.get_test_image ())
        dest_fileuri = "file://" + dest_filepath
147

148
        self.insert_photo_resource_info (fileurn, dest_fileuri)
149

150
        # Copy the image to the dest path
151 152
        self.slowcopy_file (origin_filepath, dest_filepath)
        assert os.path.exists (dest_filepath)
153
        dest_id, dest_urn = self.system.store.await_resource_inserted (NMM_PHOTO, dest_fileuri)
154
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 1)
155 156

        # Clean the new file so the test directory is as before
157
        log ("Remove and wait")
158
        os.remove (dest_filepath)
159
        self.system.store.await_resource_deleted (NMM_PHOTO, dest_id)
160
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 0)
161

162
    def test_02_camera_picture_geolocation (self):
163 164 165 166 167 168 169 170 171
        """
        Camera simulation:

        1. Create resource in the store for the new file
        2. Set nlo:location
        2. Write the file
        3. Wait for miner-fs to index it
        4. Ensure no duplicates are found
        """
172

173
        fileurn = "tracker://test_camera_picture_02/" + str(random.randint (0,100))
174 175
        dest_filepath = os.path.join (self.get_dest_dir (), self.get_test_image ())
        dest_fileuri = "file://" + dest_filepath
176

177 178
        geolocationurn = "tracker://test_camera_picture_02_geolocation/" + str(random.randint (0,100))
        postaladdressurn = "tracker://test_camera_picture_02_postaladdress/" + str(random.randint (0,100))
179

180
        self.insert_photo_resource_info (fileurn, dest_fileuri)
181

182 183
        # FIRST, open the file for writing, and just write some garbage, to simulate that
        # we already started recording the video...
184
        fdest = open (dest_filepath, 'wb')
185 186 187
        fdest.write ("some garbage written here")
        fdest.write ("to simulate we're recording something...")
        fdest.seek (0)
188

189
        # SECOND, set slo:location
190
        self.insert_dummy_location_info (fileurn, geolocationurn, postaladdressurn)
191 192

        #THIRD, start copying the image to the dest path
193 194
        original_file = os.path.join (self.get_data_dir (),self.get_test_image ())
        self.slowcopy_file_fd (original_file, fdest)
195
        fdest.close ()
196
        assert os.path.exists (dest_filepath)
197 198

        # FOURTH, ensure we have only 1 resource
199
        dest_id, dest_urn = self.system.store.await_resource_inserted (NMM_PHOTO, dest_fileuri)
200
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 1)
201 202

        # Clean the new file so the test directory is as before
203
        log ("Remove and wait")
204
        os.remove (dest_filepath)
205
        self.system.store.await_resource_deleted (NMM_PHOTO, dest_id)
206
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 0)
207 208


209
class TrackerCameraVideosApplicationTests (TrackerCameraTestSuite):
210 211

    def test_01_camera_video (self):
212 213 214 215 216 217 218 219 220 221
        """
        Camera video recording simulation:

        1. Create resource in the store for the new file
        2. Write the file
        3. Wait for miner-fs to index it
        4. Ensure no duplicates are found
        """

        fileurn = "tracker://test_camera_video_01/" + str(random.randint (0,100))
222 223 224
        origin_filepath = os.path.join (self.get_data_dir (), self.get_test_video ())
        dest_filepath = os.path.join (self.get_dest_dir (), self.get_test_video ())
        dest_fileuri = "file://" + dest_filepath
225

226
        self.insert_video_resource_info(fileurn, dest_fileuri)
227

228
        # Copy the image to the dest path
229 230
        self.slowcopy_file (origin_filepath, dest_filepath)
        assert os.path.exists (dest_filepath)
231
        dest_id, dest_urn = self.system.store.await_resource_inserted (NMM_PHOTO, dest_fileuri)
232
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 1)
233 234

        # Clean the new file so the test directory is as before
235
        log ("Remove and wait")
236
        os.remove (dest_filepath)
237
        self.system.store.await_resource_deleted (NMM_PHOTO, dest_id)
238
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 0)
239 240


241
    def test_02_camera_video_geolocation (self):
242 243 244 245 246 247 248 249 250 251 252
        """
        Camera simulation:

        1. Create resource in the store for the new file
        2. Set nlo:location
        2. Write the file
        3. Wait for miner-fs to index it
        4. Ensure no duplicates are found
        """

        fileurn = "tracker://test_camera_video_02/" + str(random.randint (0,100))
253 254 255
        origin_filepath = os.path.join (self.get_data_dir (), self.get_test_video ())
        dest_filepath = os.path.join (self.get_dest_dir (), self.get_test_video ())
        dest_fileuri = "file://" + dest_filepath
256 257 258 259

        geolocationurn = "tracker://test_camera_video_02_geolocation/" + str(random.randint (0,100))
        postaladdressurn = "tracker://test_camera_video_02_postaladdress/" + str(random.randint (0,100))

260
        self.insert_video_resource_info (fileurn, dest_fileuri)
261

262 263
        # FIRST, open the file for writing, and just write some garbage, to simulate that
        # we already started recording the video...
264
        fdest = open (dest_filepath, 'wb')
265 266 267
        fdest.write ("some garbage written here")
        fdest.write ("to simulate we're recording something...")
        fdest.seek (0)
268

269
        # SECOND, set slo:location
270
        self.insert_dummy_location_info (fileurn, geolocationurn, postaladdressurn)
271 272

        #THIRD, start copying the image to the dest path
273
        self.slowcopy_file_fd (origin_filepath, fdest)
274
        fdest.close ()
275
        assert os.path.exists (dest_filepath)
276 277

        # FOURTH, ensure we have only 1 resource
278
        dest_id, dest_urn = self.system.store.await_resource_inserted (NMM_VIDEO, dest_fileuri)
279
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 1)
280

281
        # Clean the new file so the test directory is as before
282
        log ("Remove and wait")
283
        os.remove (dest_filepath)
284
        self.system.store.await_resource_deleted (NMM_VIDEO, dest_id)
285
        self.assertEquals (self.get_urn_count_by_url (dest_fileuri), 0)
286 287 288 289 290

if __name__ == "__main__":
	ut.main()