tracker-backend.vala 12.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
 * Boston, MA  02110-1301, USA.
 */

20
class Tracker.Sparql.Backend : Connection {
21
	bool initialized;
22 23 24
	Tracker.Sparql.Connection direct = null;
	Tracker.Sparql.Connection bus = null;
	enum Backend {
25 26 27 28
		AUTO,
		DIRECT,
		BUS
	}
29

30
	public Backend () throws Sparql.Error, IOError, DBusError, SpawnError {
31 32 33
		try {
			// Makes sure the sevice is available
			debug ("Waiting for service to become available...");
34

35 36 37 38 39 40 41
			// do not use proxy to work around race condition in GDBus
			// NB#259760
			var bus = GLib.Bus.get_sync (BusType.SESSION);
			var msg = new DBusMessage.method_call (TRACKER_DBUS_SERVICE, TRACKER_DBUS_OBJECT_STATUS, TRACKER_DBUS_INTERFACE_STATUS, "Wait");
			bus.send_message_with_reply_sync (msg, 0, /* timeout */ int.MAX, null).to_gerror ();

			debug ("Service is ready");
42

43 44
			debug ("Constructing connection");
			load_plugins ();
45
			debug ("Backend is ready");
46 47 48
		} catch (GLib.Error e) {
			throw new Sparql.Error.INTERNAL (e.message);
		}
49 50

		initialized = true;
51 52
	}

53
	public override void dispose () {
54 55 56
		// trying to lock on partially initialized instances will deadlock
		if (initialized) {
			door.lock ();
57

58 59 60 61 62 63 64 65 66 67
			try {
				// Ensure this instance is not used for any new calls to Tracker.Sparql.Connection.get.
				// However, a call to Tracker.Sparql.Connection.get between g_object_unref and the
				// above lock might have increased the reference count of this instance to 2 (or more).
				// Therefore, we must not clean up direct/bus connection in dispose.
				if (singleton == this) {
					singleton = null;
				}
			} finally {
				door.unlock ();
68 69 70 71 72 73
			}
		}

		base.dispose ();
	}

74
	public override Cursor query (string sparql, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
75
		debug ("%s(): '%s'", Log.METHOD, sparql);
76 77 78 79 80 81 82
		if (direct != null) {
			return direct.query (sparql, cancellable);
		} else {
			return bus.query (sparql, cancellable);
		}
	}

83
	public async override Cursor query_async (string sparql, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
84
		debug ("%s(): '%s'", Log.METHOD, sparql);
85 86 87 88 89 90 91
		if (direct != null) {
			return yield direct.query_async (sparql, cancellable);
		} else {
			return yield bus.query_async (sparql, cancellable);
		}
	}

92
	public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
93
		debug ("%s(priority:%d): '%s'", Log.METHOD, priority, sparql);
94 95 96
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Update support not available for direct-only connection");
		}
97
		bus.update (sparql, priority, cancellable);
98 99
	}

100
	public override GLib.Variant? update_blank (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
101
		debug ("%s(priority:%d): '%s'", Log.METHOD, priority, sparql);
102 103 104
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Update support not available for direct-only connection");
		}
105
		return bus.update_blank (sparql, priority, cancellable);
106 107
	}

108
	public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
109
		debug ("%s(priority:%d): '%s'", Log.METHOD, priority, sparql);
110 111 112
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Update support not available for direct-only connection");
		}
113 114 115
		yield bus.update_async (sparql, priority, cancellable);
	}

116 117 118 119
	public async override GenericArray<Error?>? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Update support not available for direct-only connection");
		}
120 121 122
		return yield bus.update_array_async (sparql, priority, cancellable);
	}

123
	public async override GLib.Variant? update_blank_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
124
		debug ("%s(priority:%d): '%s'", Log.METHOD, priority, sparql);
125 126 127
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Update support not available for direct-only connection");
		}
128 129 130
		return yield bus.update_blank_async (sparql, priority, cancellable);
	}

131
	public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
132 133
		var uri = file.get_uri ();
		debug ("%s(): '%s'", Log.METHOD, uri);
134 135 136
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Update support not available for direct-only connection");
		}
137
		bus.load (file, cancellable);
138 139
	}

140
	public async override void load_async (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
141 142
		var uri = file.get_uri ();
		debug ("%s(): '%s'", Log.METHOD, uri);
143 144 145
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Update support not available for direct-only connection");
		}
146
		yield bus.load_async (file, cancellable);
147 148
	}

149
	public override Cursor? statistics (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
150
		debug ("%s()", Log.METHOD);
151 152 153
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Statistics support not available for direct-only connection");
		}
154 155 156
		return bus.statistics (cancellable);
	}

157
	public async override Cursor? statistics_async (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
158
		debug ("%s()", Log.METHOD);
159 160 161
		if (bus == null) {
			throw new Sparql.Error.UNSUPPORTED ("Statistics support not available for direct-only connection");
		}
162 163 164
		return yield bus.statistics_async (cancellable);
	}

165
	// Plugin loading functions
166
	private void load_plugins () throws GLib.Error {
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
		string env_backend = Environment.get_variable ("TRACKER_SPARQL_BACKEND");
		Backend backend = Backend.AUTO;

		if (env_backend != null) {
			if (env_backend.ascii_casecmp ("direct") == 0) {
				backend = Backend.DIRECT;
				debug ("Using backend = 'DIRECT'");
			} else if (env_backend.ascii_casecmp ("bus") == 0) {
				backend = Backend.BUS;
				debug ("Using backend = 'BUS'");
			} else {
				warning ("Environment variable TRACKER_SPARQL_BACKEND set to unknown value '%s'", env_backend);
			}
		}

		if (backend == Backend.AUTO) {
183
			debug ("Using backend = 'AUTO'");
184 185 186 187
		}

		switch (backend) {
		case backend.AUTO:
188 189 190
			try {
				direct = new Tracker.Direct.Connection ();
			} catch (Error e) {
191
				debug ("Unable to initialize direct backend: " + e.message);
192
			}
193

194
			bus = new Tracker.Bus.Connection ();
195
			break;
196

197
		case backend.DIRECT:
198
			direct = new Tracker.Direct.Connection ();
199 200 201
			break;

		case backend.BUS:
202
			bus = new Tracker.Bus.Connection ();
203 204 205 206
			break;

		default:
			assert_not_reached ();
207
		}
208 209
	}

210 211 212
	static weak Connection? singleton;
	static bool log_initialized;
	static StaticMutex door;
213

214
	static new Connection get (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, SpawnError {
215
		door.lock ();
216

217 218 219
		try {
			// assign to owned variable to ensure it doesn't get freed between check and return
			var result = singleton;
220

221 222
			if (result == null) {
				log_init ();
223

224
				result = new Tracker.Sparql.Backend ();
225

226 227 228
				if (cancellable != null && cancellable.is_cancelled ()) {
					throw new IOError.CANCELLED ("Operation was cancelled");
				}
229

230 231
				singleton = result;
			}
232

233
			return result;
234 235 236
		} finally {
			door.unlock ();
		}
237 238
	}

239
	public static new Connection get_internal (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, SpawnError {
240 241
		if (MainContext.get_thread_default () == null) {
			// ok to initialize without extra thread
242
			return get (cancellable);
243 244 245 246 247 248 249 250 251
		}

		// run with separate main context to be able to wait for async method
		var context = new MainContext ();
		var loop = new MainLoop (context);
		AsyncResult async_result = null;

		context.push_thread_default ();

252
		get_internal_async.begin (cancellable, (obj, res) => {
253 254 255 256 257 258 259 260 261 262 263
			async_result = res;
			loop.quit ();
		});

		loop.run ();

		context.pop_thread_default ();

		return get_internal_async.end (async_result);
	}

264
	public async static new Connection get_internal_async (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, SpawnError {
265 266 267 268 269 270 271 272 273
		// fast path: avoid extra thread if connection is already available
		if (door.trylock ()) {
			// assign to owned variable to ensure it doesn't get freed between unlock and return
			var result = singleton;

			door.unlock ();

			if (result != null) {
				return result;
274 275
			}
		}
276 277 278 279 280 281 282 283 284 285 286

		// run in a separate thread
		Sparql.Error sparql_error = null;
		IOError io_error = null;
		DBusError dbus_error = null;
		SpawnError spawn_error = null;
		Connection result = null;
		var context = MainContext.get_thread_default ();

		g_io_scheduler_push_job (job => {
			try {
287
				result = get (cancellable);
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
			} catch (IOError e_io) {
				io_error = e_io;
			} catch (Sparql.Error e_spql) {
				sparql_error = e_spql;
			} catch (DBusError e_dbus) {
				dbus_error = e_dbus;
			} catch (SpawnError e_spawn) {
				spawn_error = e_spawn;
			}

			var source = new IdleSource ();
			source.set_callback (() => {
				get_internal_async.callback ();
				return false;
			});
			source.attach (context);

			return false;
		});
		yield;

		if (sparql_error != null) {
			throw sparql_error;
		} else if (io_error != null) {
			throw io_error;
		} else if (dbus_error != null) {
			throw dbus_error;
		} else if (spawn_error != null) {
			throw spawn_error;
		} else {
			return result;
		}
	}

	private static void log_init () {
		if (log_initialized) {
			return;
		}

		log_initialized = true;

		// Avoid debug messages
		int verbosity = 0;
		string env_verbosity = Environment.get_variable ("TRACKER_VERBOSITY");
		if (env_verbosity != null)
Jürg Billeter's avatar
Jürg Billeter committed
333
			verbosity = int.parse (env_verbosity);
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368

		LogLevelFlags remove_levels = 0;

		switch (verbosity) {
		// Log level 3: EVERYTHING
		case 3:
			break;

		// Log level 2: CRITICAL/ERROR/WARNING/INFO/MESSAGE only
		case 2:
			remove_levels = LogLevelFlags.LEVEL_DEBUG;
			break;

		// Log level 1: CRITICAL/ERROR/WARNING/INFO only
		case 1:
			remove_levels = LogLevelFlags.LEVEL_DEBUG |
			              LogLevelFlags.LEVEL_MESSAGE;
			break;

		// Log level 0: CRITICAL/ERROR/WARNING only (default)
		default:
		case 0:
			remove_levels = LogLevelFlags.LEVEL_DEBUG |
			              LogLevelFlags.LEVEL_MESSAGE |
			              LogLevelFlags.LEVEL_INFO;
			break;
		}

		if (remove_levels != 0) {
			GLib.Log.set_handler ("Tracker", remove_levels, remove_log_handler);
		}
	}

	private static void remove_log_handler (string? log_domain, LogLevelFlags log_level, string message) {
		/* do nothing */
369 370 371
	}
}

372
public async static Tracker.Sparql.Connection tracker_sparql_connection_get_async (Cancellable? cancellable = null) throws Tracker.Sparql.Error, IOError, DBusError, SpawnError {
373
	return yield Tracker.Sparql.Backend.get_internal_async (cancellable);
374 375 376
}

public static Tracker.Sparql.Connection tracker_sparql_connection_get (Cancellable? cancellable = null) throws Tracker.Sparql.Error, IOError, DBusError, SpawnError {
377
	return Tracker.Sparql.Backend.get_internal (cancellable);
378 379 380
}

public async static Tracker.Sparql.Connection tracker_sparql_connection_get_direct_async (Cancellable? cancellable = null) throws Tracker.Sparql.Error, IOError, DBusError, SpawnError {
381
	return yield Tracker.Sparql.Backend.get_internal_async (cancellable);
382 383 384
}

public static Tracker.Sparql.Connection tracker_sparql_connection_get_direct (Cancellable? cancellable = null) throws Tracker.Sparql.Error, IOError, DBusError, SpawnError {
385
	return Tracker.Sparql.Backend.get_internal (cancellable);
386
}