Commit 13327be6 authored by Adrien Bustany's avatar Adrien Bustany Committed by Martyn Russell

libtracker-common: Add "send and splice"

The tracker_dbus_send_and_splice function sends a DBusMessage, splices
from a file descriptor and then blocks on the reply of the message.
parent 026337df
......@@ -20,6 +20,8 @@
#include "config.h"
#include <gio/gio.h>
#include <gio/gunixinputstream.h>
#include <gio/gunixoutputstream.h>
#include <dbus/dbus-glib-bindings.h>
......@@ -47,6 +49,15 @@ typedef struct {
GTimeVal last_time;
} ClientData;
typedef struct {
GInputStream *unix_input_stream;
GInputStream *buffered_input_stream;
GOutputStream *output_stream;
DBusPendingCall *call;
TrackerSendAndSpliceCallback callback;
gpointer user_data;
} SendAndSpliceData;
static GSList *hooks;
static gboolean block_hooks;
......@@ -693,3 +704,229 @@ tracker_dbus_enable_client_lookup (gboolean enabled)
client_lookup_enabled = enabled;
}
/*
* /!\ BIG FAT WARNING /!\
* The message must be destroyed for this function to succeed, so pass a
* message with a refcount of 1 (and say goodbye to it, 'cause you'll never
* see it again
*/
gboolean
tracker_dbus_send_and_splice (DBusConnection *connection,
DBusMessage *message,
int fd,
GCancellable *cancellable,
void **dest_buffer,
gssize *dest_buffer_size,
GError **error)
{
DBusPendingCall *call;
DBusMessage *reply = NULL;
GInputStream *unix_input_stream;
GInputStream *buffered_input_stream;
GOutputStream *output_stream;
GError *inner_error = NULL;
gboolean ret_value = FALSE;
g_return_val_if_fail (connection, FALSE);
g_return_val_if_fail (message, FALSE);
g_return_val_if_fail (dest_buffer, FALSE);
dbus_connection_send_with_reply (connection,
message,
&call,
-1);
dbus_message_unref (message);
if (!call) {
g_set_error (error,
TRACKER_DBUS_ERROR,
TRACKER_DBUS_ERROR_UNSUPPORTED,
"FD passing unsupported or connection disconnected");
return FALSE;
}
unix_input_stream = g_unix_input_stream_new (fd, TRUE);
buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
TRACKER_DBUS_PIPE_BUFFER_SIZE);
output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
g_output_stream_splice (output_stream,
buffered_input_stream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
cancellable,
&inner_error);
if (G_LIKELY (!inner_error)) {
/* Wait for any current d-bus call to finish */
dbus_pending_call_block (call);
/* Check we didn't get an error */
reply = dbus_pending_call_steal_reply (call);
if (G_UNLIKELY (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR)) {
DBusError dbus_error;
dbus_error_init (&dbus_error);
dbus_set_error_from_message (&dbus_error, reply);
dbus_set_g_error (error, &dbus_error);
dbus_error_free (&dbus_error);
} else {
*dest_buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output_stream));
if (dest_buffer_size) {
*dest_buffer_size = g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (output_stream));
}
ret_value = TRUE;
}
} else {
g_set_error (error,
TRACKER_DBUS_ERROR,
TRACKER_DBUS_ERROR_BROKEN_PIPE,
"Couldn't get results from server");
g_error_free (inner_error);
}
g_object_unref (output_stream);
g_object_unref (buffered_input_stream);
g_object_unref (unix_input_stream);
if (reply) {
dbus_message_unref (reply);
}
dbus_pending_call_unref (call);
return ret_value;
}
static SendAndSpliceData*
send_and_splice_data_new (GInputStream *unix_input_stream,
GInputStream *buffered_input_stream,
GOutputStream *output_stream,
DBusPendingCall *call,
TrackerSendAndSpliceCallback callback,
gpointer user_data)
{
SendAndSpliceData *data;
data = g_slice_new0 (SendAndSpliceData);
data->unix_input_stream = unix_input_stream;
data->buffered_input_stream = buffered_input_stream;
data->output_stream = output_stream;
data->call = call;
data->callback = callback;
data->user_data = user_data;
return data;
}
static void
send_and_splice_data_free (SendAndSpliceData *data)
{
g_object_unref (data->unix_input_stream);
g_object_unref (data->buffered_input_stream);
g_object_unref (data->output_stream);
dbus_pending_call_unref (data->call);
}
static void
send_and_splice_async_callback (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
SendAndSpliceData *data = user_data;
DBusMessage *reply = NULL;
GError *error = NULL;
g_output_stream_splice_finish (data->output_stream,
result,
&error);
if (G_LIKELY (!error)) {
dbus_pending_call_block (data->call);
reply = dbus_pending_call_steal_reply (data->call);
if (G_UNLIKELY (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR)) {
DBusError dbus_error;
dbus_error_init (&dbus_error);
dbus_set_error_from_message (&dbus_error, reply);
dbus_set_g_error (&error, &dbus_error);
dbus_error_free (&dbus_error);
(* data->callback) (NULL, -1, error, data->user_data);
g_error_free (error);
} else {
dbus_pending_call_cancel (data->call);
(* data->callback) (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
NULL,
data->user_data);
}
} else {
(* data->callback) (NULL, -1, error, data->user_data);
g_error_free (error);
}
if (reply) {
dbus_message_unref (reply);
}
send_and_splice_data_free (data);
}
void
tracker_dbus_send_and_splice_async (DBusConnection *connection,
DBusMessage *message,
int fd,
GCancellable *cancellable,
TrackerSendAndSpliceCallback callback,
gpointer user_data)
{
DBusPendingCall *call;
GInputStream *unix_input_stream;
GInputStream *buffered_input_stream;
GOutputStream *output_stream;
SendAndSpliceData *data;
g_return_if_fail (connection);
g_return_if_fail (message);
dbus_connection_send_with_reply (connection,
message,
&call,
-1);
dbus_message_unref (message);
if (!call) {
g_critical ("FD passing unsupported or connection disconnected");
return;
}
unix_input_stream = g_unix_input_stream_new (fd, TRUE);
buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
TRACKER_DBUS_PIPE_BUFFER_SIZE);
output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
data = send_and_splice_data_new (unix_input_stream,
buffered_input_stream,
output_stream,
call,
callback,
user_data);
g_output_stream_splice_async (output_stream,
buffered_input_stream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
0,
cancellable,
send_and_splice_async_callback,
data);
}
......@@ -22,6 +22,8 @@
#include <glib/gi18n.h>
#include <gio/gio.h>
#include <dbus/dbus.h>
#include <dbus/dbus-glib-lowlevel.h>
#include <dbus/dbus-glib.h>
......@@ -54,7 +56,7 @@ G_BEGIN_DECLS
\
g_set_error (&assert_error, \
TRACKER_DBUS_ERROR, \
0, \
TRACKER_DBUS_ERROR_ASSERTION_FAILED, \
_("Assertion `%s' failed"), \
#expr); \
\
......@@ -70,7 +72,7 @@ G_BEGIN_DECLS
if G_LIKELY(expr) { } else { \
g_set_error (error, \
TRACKER_DBUS_ERROR, \
0, \
TRACKER_DBUS_ERROR_ASSERTION_FAILED, \
_("Assertion `%s' failed"), \
#expr); \
\
......@@ -78,11 +80,23 @@ G_BEGIN_DECLS
}; \
} G_STMT_END
/* Size of buffers used when sending data over a pipe, using DBus FD passing */
#define TRACKER_DBUS_PIPE_BUFFER_SIZE 65536
#define TRACKER_DBUS_SERVICE_EXTRACT "org.freedesktop.Tracker1.Extract"
#define TRACKER_DBUS_PATH_EXTRACT "/org/freedesktop/Tracker1/Extract"
#define TRACKER_DBUS_INTERFACE_EXTRACT "org.freedesktop.Tracker1.Extract"
typedef struct TrackerDBusRequestHandler TrackerDBusRequestHandler;
typedef void (*TrackerDBusRequestFunc) (guint request_id,
gpointer user_data);
typedef void (*TrackerSendAndSpliceCallback) (void *buffer,
gssize buffer_size,
GError *error,
gpointer user_data);
typedef struct {
guint id;
gpointer data1;
......@@ -95,6 +109,12 @@ typedef enum {
TRACKER_DBUS_EVENTS_TYPE_DELETE
} TrackerDBusEventsType;
typedef enum {
TRACKER_DBUS_ERROR_ASSERTION_FAILED,
TRACKER_DBUS_ERROR_UNSUPPORTED,
TRACKER_DBUS_ERROR_BROKEN_PIPE
} TrackerDBusError;
GQuark tracker_dbus_error_quark (void);
TrackerDBusData *tracker_dbus_data_new (const gpointer arg1,
const gpointer arg2);
......@@ -146,6 +166,20 @@ void tracker_dbus_request_block_hooks (void);
void tracker_dbus_request_unblock_hooks (void);
void tracker_dbus_enable_client_lookup (gboolean enable);
gboolean tracker_dbus_send_and_splice (DBusConnection *connection,
DBusMessage *message,
int fd,
GCancellable *cancellable,
void **dest_buffer,
gssize *dest_buffer_size,
GError **error);
void tracker_dbus_send_and_splice_async (DBusConnection *connection,
DBusMessage *message,
int fd,
GCancellable *cancellable,
TrackerSendAndSpliceCallback callback,
gpointer user_data);
G_END_DECLS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment