Commit b00b0ade authored by Jürg Billeter's avatar Jürg Billeter Committed by Martyn Russell

libtracker-bus: Do not use GDBusProxy

The finalizer of GDBusProxy schedules an idle function to disconnect
the name owner changed signal. This leaks a GMainContext if the
corresponding loop was already terminated.

Fixes NB#285426.
parent e10a8886
...@@ -7,6 +7,7 @@ AM_VALAFLAGS = \ ...@@ -7,6 +7,7 @@ AM_VALAFLAGS = \
--header tracker-bus.h \ --header tracker-bus.h \
--vapi tracker-bus.vapi \ --vapi tracker-bus.vapi \
--pkg gio-2.0 --pkg gio-unix-2.0 --pkg posix \ --pkg gio-2.0 --pkg gio-unix-2.0 --pkg posix \
--vapidir $(top_srcdir)/src/vapi \
$(BUILD_VALAFLAGS) \ $(BUILD_VALAFLAGS) \
$(top_srcdir)/src/libtracker-common/libtracker-common.vapi \ $(top_srcdir)/src/libtracker-common/libtracker-common.vapi \
$(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi
......
...@@ -17,81 +17,10 @@ ...@@ -17,81 +17,10 @@
* Boston, MA 02110-1301, USA. * Boston, MA 02110-1301, USA.
*/ */
[DBus (name = "org.freedesktop.Tracker1.Resources")]
private interface Tracker.Bus.Resources : DBusProxy {
public abstract void load (string uri, Cancellable? cancellable) throws Sparql.Error, DBusError;
[DBus (name = "Load")]
public abstract async void load_async (string uri, Cancellable? cancellable) throws Sparql.Error, DBusError;
}
[DBus (name = "org.freedesktop.Tracker1.Steroids")]
private interface Tracker.Bus.Steroids : DBusProxy {
public abstract async string[] query (string query, UnixOutputStream result_stream, Cancellable? cancellable) throws Sparql.Error, DBusError;
public abstract async void update (UnixInputStream sparql_stream, Cancellable? cancellable) throws Sparql.Error, DBusError;
[DBus (signature = "aaa{ss}")]
public abstract async Variant update_blank (UnixInputStream sparql_stream, Cancellable? cancellable) throws Sparql.Error, DBusError;
public abstract async void batch_update (UnixInputStream sparql_stream, Cancellable? cancellable) throws Sparql.Error, DBusError;
[DBus (signature = "as")]
public abstract async Variant update_array (UnixInputStream sparql_stream, Cancellable? cancellable) throws Sparql.Error, DBusError;
[DBus (visible = false)]
public void update_begin (UnixInputStream sparql_stream, int priority, Cancellable? cancellable, AsyncReadyCallback callback) {
if (priority <= GLib.Priority.DEFAULT) {
update.begin (sparql_stream, cancellable, callback);
} else {
batch_update.begin (sparql_stream, cancellable, callback);
}
}
}
[DBus (name = "org.freedesktop.Tracker1.Statistics")]
private interface Tracker.Bus.Statistics : DBusProxy {
public abstract string[,] Get (Cancellable? cancellable) throws DBusError;
public async abstract string[,] Get_async (Cancellable? cancellable) throws DBusError;
}
// Actual class definition
public class Tracker.Bus.Connection : Tracker.Sparql.Connection { public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
Resources resources_object;
Steroids steroids_object;
Statistics statistics_object;
public Connection () throws Sparql.Error, IOError, DBusError { public Connection () throws Sparql.Error, IOError, DBusError {
} // ensure that error domain is registered with GDBus
new Sparql.Error.INTERNAL ("");
void ensure_resources_object () throws IOError, DBusError {
if (resources_object != null) {
return;
}
resources_object = GLib.Bus.get_proxy_sync (BusType.SESSION,
TRACKER_DBUS_SERVICE,
TRACKER_DBUS_OBJECT_RESOURCES,
DBusProxyFlags.DO_NOT_LOAD_PROPERTIES | DBusProxyFlags.DO_NOT_CONNECT_SIGNALS);
resources_object.set_default_timeout (int.MAX);
}
void ensure_steroids_object () throws IOError, DBusError {
if (steroids_object != null) {
return;
}
steroids_object = GLib.Bus.get_proxy_sync (BusType.SESSION,
TRACKER_DBUS_SERVICE,
TRACKER_DBUS_OBJECT_STEROIDS,
DBusProxyFlags.DO_NOT_LOAD_PROPERTIES | DBusProxyFlags.DO_NOT_CONNECT_SIGNALS);
steroids_object.set_default_timeout (int.MAX);
}
void ensure_statistics_object () throws IOError, DBusError {
if (statistics_object != null) {
return;
}
statistics_object = GLib.Bus.get_proxy_sync (BusType.SESSION,
TRACKER_DBUS_SERVICE,
TRACKER_DBUS_OBJECT_STATISTICS,
DBusProxyFlags.DO_NOT_LOAD_PROPERTIES | DBusProxyFlags.DO_NOT_CONNECT_SIGNALS);
} }
void pipe (out UnixInputStream input, out UnixOutputStream output) throws IOError { void pipe (out UnixInputStream input, out UnixOutputStream output) throws IOError {
...@@ -103,6 +32,29 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -103,6 +32,29 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
output = new UnixOutputStream (pipefd[1], true); output = new UnixOutputStream (pipefd[1], true);
} }
void handle_error_reply (DBusMessage message) throws Sparql.Error, IOError, DBusError {
try {
message.to_gerror ();
} catch (IOError e_io) {
throw e_io;
} catch (Sparql.Error e_sparql) {
throw e_sparql;
} catch (DBusError e_dbus) {
throw e_dbus;
} catch (Error e) {
throw new IOError.FAILED (e.message);
}
}
void send_query (DBusConnection connection, string sparql, UnixOutputStream output, Cancellable? cancellable, AsyncReadyCallback? callback) throws GLib.IOError {
var message = new DBusMessage.method_call (TRACKER_DBUS_SERVICE, TRACKER_DBUS_OBJECT_STEROIDS, TRACKER_DBUS_INTERFACE_STEROIDS, "Query");
var fd_list = new UnixFDList ();
message.set_body (new Variant ("(sh)", sparql, fd_list.append (output.fd)));
message.set_unix_fd_list (fd_list);
connection.send_message_with_reply.begin (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable, callback);
}
public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError { public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError {
// use separate main context for sync operation // use separate main context for sync operation
var context = new MainContext (); var context = new MainContext ();
...@@ -119,16 +71,16 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -119,16 +71,16 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
} }
public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_steroids_object ();
UnixInputStream input; UnixInputStream input;
UnixOutputStream output; UnixOutputStream output;
pipe (out input, out output); pipe (out input, out output);
var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
// send D-Bus request // send D-Bus request
AsyncResult dbus_res = null; AsyncResult dbus_res = null;
bool received_result = false; bool received_result = false;
steroids_object.query.begin (sparql, output, cancellable, (o, res) => { send_query (connection, sparql, output, cancellable, (o, res) => {
dbus_res = res; dbus_res = res;
if (received_result) { if (received_result) {
query_async.callback (); query_async.callback ();
...@@ -146,11 +98,24 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -146,11 +98,24 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
if (dbus_res == null) { if (dbus_res == null) {
yield; yield;
} }
string[] variable_names = steroids_object.query.end (dbus_res);
var reply = connection.send_message_with_reply.end (dbus_res);
handle_error_reply (reply);
string[] variable_names = (string[]) reply.get_body ().get_child_value (0);
mem_stream.close (); mem_stream.close ();
return new FDCursor (mem_stream.steal_data (), mem_stream.data_size, variable_names); return new FDCursor (mem_stream.steal_data (), mem_stream.data_size, variable_names);
} }
void send_update (DBusConnection connection, string method, UnixInputStream input, Cancellable? cancellable, AsyncReadyCallback? callback) throws GLib.IOError {
var message = new DBusMessage.method_call (TRACKER_DBUS_SERVICE, TRACKER_DBUS_OBJECT_STEROIDS, TRACKER_DBUS_INTERFACE_STEROIDS, method);
var fd_list = new UnixFDList ();
message.set_body (new Variant ("(h)", fd_list.append (input.fd)));
message.set_unix_fd_list (fd_list);
connection.send_message_with_reply.begin (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable, callback);
}
public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
// use separate main context for sync operation // use separate main context for sync operation
var context = new MainContext (); var context = new MainContext ();
...@@ -167,16 +132,16 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -167,16 +132,16 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
} }
public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_steroids_object ();
UnixInputStream input; UnixInputStream input;
UnixOutputStream output; UnixOutputStream output;
pipe (out input, out output); pipe (out input, out output);
var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
// send D-Bus request // send D-Bus request
AsyncResult dbus_res = null; AsyncResult dbus_res = null;
bool sent_update = false; bool sent_update = false;
steroids_object.update_begin (input, priority, cancellable, (o, res) => { send_update (connection, priority <= GLib.Priority.DEFAULT ? "Update" : "BatchUpdate", input, cancellable, (o, res) => {
dbus_res = res; dbus_res = res;
if (sent_update) { if (sent_update) {
update_async.callback (); update_async.callback ();
...@@ -196,24 +161,21 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -196,24 +161,21 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
yield; yield;
} }
if (priority <= GLib.Priority.DEFAULT) { var reply = connection.send_message_with_reply.end (dbus_res);
steroids_object.update.end (dbus_res); handle_error_reply (reply);
} else {
steroids_object.batch_update.end (dbus_res);
}
} }
public async override GenericArray<Error?>? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public async override GenericArray<Error?>? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_steroids_object ();
UnixInputStream input; UnixInputStream input;
UnixOutputStream output; UnixOutputStream output;
pipe (out input, out output); pipe (out input, out output);
var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
// send D-Bus request // send D-Bus request
AsyncResult dbus_res = null; AsyncResult dbus_res = null;
bool sent_update = false; bool sent_update = false;
steroids_object.update_array.begin (input, cancellable, (o, res) => { send_update (connection, "UpdateArray", input, cancellable, (o, res) => {
dbus_res = res; dbus_res = res;
if (sent_update) { if (sent_update) {
update_array_async.callback (); update_array_async.callback ();
...@@ -236,10 +198,13 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -236,10 +198,13 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
yield; yield;
} }
var reply = connection.send_message_with_reply.end (dbus_res);
handle_error_reply (reply);
// process results (errors) // process results (errors)
var result = new GenericArray<Error?> (); var result = new GenericArray<Error?> ();
Variant resultv; Variant resultv;
resultv = steroids_object.update_array.end (dbus_res); resultv = reply.get_body ().get_child_value (0);
var iter = resultv.iterator (); var iter = resultv.iterator ();
string code, message; string code, message;
while (iter.next ("s", out code)) { while (iter.next ("s", out code)) {
...@@ -274,16 +239,16 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -274,16 +239,16 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
} }
public async override GLib.Variant? update_blank_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public async override GLib.Variant? update_blank_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_steroids_object ();
UnixInputStream input; UnixInputStream input;
UnixOutputStream output; UnixOutputStream output;
pipe (out input, out output); pipe (out input, out output);
var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
// send D-Bus request // send D-Bus request
AsyncResult dbus_res = null; AsyncResult dbus_res = null;
bool sent_update = false; bool sent_update = false;
steroids_object.update_blank.begin (input, cancellable, (o, res) => { send_update (connection, "UpdateBlank", input, cancellable, (o, res) => {
dbus_res = res; dbus_res = res;
if (sent_update) { if (sent_update) {
update_blank_async.callback (); update_blank_async.callback ();
...@@ -303,32 +268,40 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -303,32 +268,40 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
yield; yield;
} }
return steroids_object.update_blank.end (dbus_res); var reply = connection.send_message_with_reply.end (dbus_res);
handle_error_reply (reply);
return reply.get_body ().get_child_value (0);
} }
public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_resources_object (); var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
resources_object.load (file.get_uri (), cancellable); var message = new DBusMessage.method_call (TRACKER_DBUS_SERVICE, TRACKER_DBUS_OBJECT_RESOURCES, TRACKER_DBUS_INTERFACE_RESOURCES, "Load");
message.set_body (new Variant ("(s)", file.get_uri ()));
if (cancellable != null && cancellable.is_cancelled ()) { var reply = connection.send_message_with_reply_sync (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable);
throw new IOError.CANCELLED ("Operation was cancelled"); handle_error_reply (reply);
}
} }
public async override void load_async (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public async override void load_async (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_resources_object (); var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
yield resources_object.load_async (file.get_uri (), cancellable); var message = new DBusMessage.method_call (TRACKER_DBUS_SERVICE, TRACKER_DBUS_OBJECT_RESOURCES, TRACKER_DBUS_INTERFACE_RESOURCES, "Load");
message.set_body (new Variant ("(s)", file.get_uri ()));
if (cancellable != null && cancellable.is_cancelled ()) { var reply = yield connection.send_message_with_reply (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable);
throw new IOError.CANCELLED ("Operation was cancelled"); handle_error_reply (reply);
}
} }
public override Sparql.Cursor? statistics (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public override Sparql.Cursor? statistics (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_statistics_object (); var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
var message = new DBusMessage.method_call (TRACKER_DBUS_SERVICE, TRACKER_DBUS_OBJECT_STATISTICS, TRACKER_DBUS_INTERFACE_STATISTICS, "Get");
string[,] results = statistics_object.Get (cancellable); var reply = connection.send_message_with_reply_sync (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable);
handle_error_reply (reply);
string[,] results = (string[,]) reply.get_body ().get_child_value (0);
Sparql.ValueType[] types = new Sparql.ValueType[2]; Sparql.ValueType[] types = new Sparql.ValueType[2];
string[] var_names = new string[2]; string[] var_names = new string[2];
...@@ -345,9 +318,14 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { ...@@ -345,9 +318,14 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
} }
public async override Sparql.Cursor? statistics_async (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { public async override Sparql.Cursor? statistics_async (Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
ensure_statistics_object (); var connection = GLib.Bus.get_sync (BusType.SESSION, cancellable);
var message = new DBusMessage.method_call (TRACKER_DBUS_SERVICE, TRACKER_DBUS_OBJECT_STATISTICS, TRACKER_DBUS_INTERFACE_STATISTICS, "Get");
var reply = yield connection.send_message_with_reply (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable);
handle_error_reply (reply);
string[,] results = yield statistics_object.Get_async (cancellable); string[,] results = (string[,]) reply.get_body ().get_child_value (0);
Sparql.ValueType[] types = new Sparql.ValueType[2]; Sparql.ValueType[] types = new Sparql.ValueType[2];
string[] var_names = new string[2]; string[] var_names = new string[2];
......
include $(top_srcdir)/Makefile.decl include $(top_srcdir)/Makefile.decl
EXTRA_DIST = \
gio-2.0.vapi
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment