From 571f0cf7d9fb389e35008cc2ec4fde5fc480a047 Mon Sep 17 00:00:00 2001 From: Adrian Szyndela Date: Wed, 2 Feb 2022 14:49:06 +0100 Subject: [PATCH 1/2] gdbus: use larger buffer for larger reads from socket When reading a new message from a socket used for D-Bus, the first read was 16 bytes to include message size, and then the exact message size was read into buffer. Now, data is read into a buffer of default size 4096 bytes. It gives some chance that the data already includes whole messages, and some of subsequent polls and reads are not needed anymore. This speeds up receiving of messages. --- gio/gdbusprivate.c | 197 ++++++++++++++++++++++++++++----------------- 1 file changed, 124 insertions(+), 73 deletions(-) diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 0b8630ab2d..98f7fc6e81 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -61,6 +61,11 @@ #include "glibintl.h" +#define READ_BUFFER_MIN_READ_SIZE_FOR_HEADER 16 +/* Buffer size of 4096 seems to be big enough to contain most of messages + * on a standard desktop. */ +#define READ_BUFFER_MIN_BUFFER_SIZE 4096 + static gboolean _g_dbus_worker_do_initial_read (gpointer data); static void schedule_pending_close (GDBusWorker *worker); @@ -565,6 +570,36 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker) static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker); +static gssize _g_dbus_worker_buffer_get_next_full_message_length (GDBusWorker *worker) +{ + GError *error; + + if (worker->read_buffer == NULL) + return 0; + + if (worker->read_buffer_cur_size >= READ_BUFFER_MIN_READ_SIZE_FOR_HEADER) + { + gssize message_len; + /* OK, got the header - determine how many more bytes are needed */ + error = NULL; + message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, + READ_BUFFER_MIN_READ_SIZE_FOR_HEADER, + &error); + if (message_len == -1) + { + g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message); + _g_dbus_worker_emit_disconnected (worker, FALSE, error); + g_error_free (error); + return -1; + } + if ((gsize)message_len <= worker->read_buffer_cur_size) + return message_len; + + worker->read_buffer_bytes_wanted = message_len; + } + return 0; +} + /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ static void _g_dbus_worker_do_read_cb (GInputStream *input_stream, @@ -574,6 +609,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, GDBusWorker *worker = user_data; GError *error; gssize bytes_read; + gssize message_len; g_mutex_lock (&worker->read_lock); @@ -720,97 +756,106 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, read_message_print_transport_debug (bytes_read, worker); worker->read_buffer_cur_size += bytes_read; - if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) + + while ((message_len = _g_dbus_worker_buffer_get_next_full_message_length (worker) ) > 0) { /* OK, got what we asked for! */ - if (worker->read_buffer_bytes_wanted == 16) - { - gssize message_len; - /* OK, got the header - determine how many more bytes are needed */ - error = NULL; - message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, - 16, - &error); - if (message_len == -1) - { - g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message); - _g_dbus_worker_emit_disconnected (worker, FALSE, error); - g_error_free (error); - goto out; - } + GDBusMessage *message; + error = NULL; - worker->read_buffer_bytes_wanted = message_len; - _g_dbus_worker_do_read_unlocked (worker); + /* TODO: use connection->priv->auth to decode the message */ + + message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer, + message_len, + worker->capabilities, + &error); + if (message == NULL) + { + gchar *s; + s = _g_dbus_hexdump (worker->read_buffer, message_len, 2); + g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n" + "The error is: %s\n" + "The payload is as follows:\n" + "%s", + message_len, + error->message, + s); + g_free (s); + _g_dbus_worker_emit_disconnected (worker, FALSE, error); + g_error_free (error); + goto out; } - else + +#ifdef G_OS_UNIX + if (worker->read_fd_list != NULL) { - GDBusMessage *message; - error = NULL; + guint fds_needed; + GUnixFDList *fd_list_for_message; - /* TODO: use connection->priv->auth to decode the message */ + fds_needed = g_dbus_message_get_num_unix_fds (message); - message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer, - worker->read_buffer_cur_size, - worker->capabilities, - &error); - if (message == NULL) + if ((guint)g_unix_fd_list_get_length (worker->read_fd_list) > fds_needed) { - gchar *s; - s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); - g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n" - "The error is: %s\n" - "The payload is as follows:\n" - "%s", - worker->read_buffer_cur_size, - error->message, - s); - g_free (s); - _g_dbus_worker_emit_disconnected (worker, FALSE, error); - g_error_free (error); - goto out; - } + gint num_fds; + GUnixFDList *old_worker_read_fd_list; + gint *fds; -#ifdef G_OS_UNIX - if (worker->read_fd_list != NULL) + old_worker_read_fd_list = worker->read_fd_list; + fds = g_unix_fd_list_steal_fds (old_worker_read_fd_list, &num_fds); + fd_list_for_message = g_unix_fd_list_new_from_array (fds, fds_needed); + worker->read_fd_list = g_unix_fd_list_new_from_array (fds + fds_needed, num_fds - fds_needed); + g_object_unref (old_worker_read_fd_list); + g_free (fds); + } + else { - g_dbus_message_set_unix_fd_list (message, worker->read_fd_list); - g_object_unref (worker->read_fd_list); + fd_list_for_message = worker->read_fd_list; worker->read_fd_list = NULL; } + + if (fds_needed > 0) + g_dbus_message_set_unix_fd_list (message, fd_list_for_message); + g_object_unref (fd_list_for_message); + } #endif - if (G_UNLIKELY (_g_dbus_debug_message ())) + if (G_UNLIKELY (_g_dbus_debug_message ())) + { + gchar *s; + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Message:\n" + " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", + message_len); + s = g_dbus_message_print (message, 2); + g_print ("%s", s); + g_free (s); + if (G_UNLIKELY (_g_dbus_debug_payload ())) { - gchar *s; - _g_dbus_debug_print_lock (); - g_print ("========================================================================\n" - "GDBus-debug:Message:\n" - " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", - worker->read_buffer_cur_size); - s = g_dbus_message_print (message, 2); - g_print ("%s", s); + s = _g_dbus_hexdump (worker->read_buffer, message_len, 2); + g_print ("%s\n", s); g_free (s); - if (G_UNLIKELY (_g_dbus_debug_payload ())) - { - s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); - g_print ("%s\n", s); - g_free (s); - } - _g_dbus_debug_print_unlock (); } + _g_dbus_debug_print_unlock (); + } - /* yay, got a message, go deliver it */ - _g_dbus_worker_queue_or_deliver_received_message (worker, g_steal_pointer (&message)); + /* yay, got a message, go deliver it */ + _g_dbus_worker_queue_or_deliver_received_message (worker, g_steal_pointer (&message)); - /* start reading another message! */ - worker->read_buffer_bytes_wanted = 0; - worker->read_buffer_cur_size = 0; - _g_dbus_worker_do_read_unlocked (worker); + /* set up reading another message */ + if (worker->read_buffer_cur_size > (gsize)message_len) + { + memmove (worker->read_buffer, + worker->read_buffer + message_len, + worker->read_buffer_cur_size - message_len); } + worker->read_buffer_cur_size -= message_len; + worker->read_buffer_bytes_wanted = 0; } - else + + if (message_len == 0) { - /* didn't get all the bytes we requested - so repeat the request... */ + /* start reading another message or repeat the request to get all the bytes */ _g_dbus_worker_do_read_unlocked (worker); } @@ -835,15 +880,21 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) /* if bytes_wanted is zero, it means start reading a message */ if (worker->read_buffer_bytes_wanted == 0) { - worker->read_buffer_cur_size = 0; - worker->read_buffer_bytes_wanted = 16; + if (worker->socket != NULL) + { + worker->read_buffer_bytes_wanted = READ_BUFFER_MIN_BUFFER_SIZE; + } + else + { + worker->read_buffer_cur_size = 0; + worker->read_buffer_bytes_wanted = READ_BUFFER_MIN_READ_SIZE_FOR_HEADER; + } } /* ensure we have a (big enough) buffer */ if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size) { - /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */ - worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096); + worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, READ_BUFFER_MIN_BUFFER_SIZE); worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size); } -- GitLab From 7848a5dd9b39c7f08ad65c81e670e22d09a4424f Mon Sep 17 00:00:00 2001 From: Adrian Szyndela Date: Thu, 7 Apr 2022 19:22:49 +0200 Subject: [PATCH 2/2] gdbus: don't move complete messages after receiving Using larger reading requests to socket can result in having more than a single message in a buffer after a single call to recvmsg(). We can use the received data to create GDBusMessages without moving the data to the buffer start. This commit changes the receiving to: - create GDBusMessages from completely received messages data, regardless of its position in the receiving buffer; - move data in the buffer only if a message is truncated at the end of the buffer. Additionally, as the message data can start at non-aligned address, all the reads through the pointer are controlled. --- gio/gdbusmessage.c | 31 ++++++++++++++--------- gio/gdbusprivate.c | 61 +++++++++++++++++++++++++++++----------------- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/gio/gdbusmessage.c b/gio/gdbusmessage.c index ecef6cd3c5..f038819453 100644 --- a/gio/gdbusmessage.c +++ b/gio/gdbusmessage.c @@ -2091,6 +2091,8 @@ g_dbus_message_bytes_needed (guchar *blob, GError **error) { gssize ret; + guint32 headers_array_size; + guint32 body_size; ret = -1; @@ -2098,23 +2100,28 @@ g_dbus_message_bytes_needed (guchar *blob, g_return_val_if_fail (error == NULL || *error == NULL, -1); g_return_val_if_fail (blob_len >= 16, -1); - if (blob[0] == 'l') - { - /* core header (12 bytes) + ARRAY of STRUCT of (BYTE,VARIANT) */ - ret = 12 + 4 + GUINT32_FROM_LE (((guint32 *) blob)[3]); - /* round up so it's a multiple of 8 */ - ret = 8 * ((ret + 7)/8); - /* finally add the body size */ - ret += GUINT32_FROM_LE (((guint32 *) blob)[1]); - } - else if (blob[0] == 'B') + if (blob[0] == 'l' || blob[0] == 'B') { + memcpy (&headers_array_size, blob + 3*sizeof (guint32), sizeof (headers_array_size)); + memcpy (&body_size, blob + 1 * sizeof (guint32), sizeof (body_size)); + + if (blob[0] == 'l') + { + headers_array_size = GUINT32_FROM_LE (headers_array_size); + body_size = GUINT32_FROM_LE (body_size); + } + else + { + headers_array_size = GUINT32_FROM_BE (headers_array_size); + body_size = GUINT32_FROM_BE (body_size); + } + /* core header (12 bytes) + ARRAY of STRUCT of (BYTE,VARIANT) */ - ret = 12 + 4 + GUINT32_FROM_BE (((guint32 *) blob)[3]); + ret = 12 + 4 + headers_array_size; /* round up so it's a multiple of 8 */ ret = 8 * ((ret + 7)/8); /* finally add the body size */ - ret += GUINT32_FROM_BE (((guint32 *) blob)[1]); + ret += body_size; } else { diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 98f7fc6e81..0c88cf9ae5 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -378,6 +378,7 @@ struct GDBusWorker gchar *read_buffer; gsize read_buffer_allocated_size; gsize read_buffer_cur_size; + gsize read_buffer_cur_pos; gsize read_buffer_bytes_wanted; GUnixFDList *read_fd_list; GSocketControlMessage **read_ancillary_messages; @@ -573,16 +574,19 @@ static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker); static gssize _g_dbus_worker_buffer_get_next_full_message_length (GDBusWorker *worker) { GError *error; + gssize remaining_data_in_buffer; if (worker->read_buffer == NULL) return 0; - if (worker->read_buffer_cur_size >= READ_BUFFER_MIN_READ_SIZE_FOR_HEADER) + remaining_data_in_buffer = worker->read_buffer_cur_size - worker->read_buffer_cur_pos; + + if (remaining_data_in_buffer >= READ_BUFFER_MIN_READ_SIZE_FOR_HEADER) { gssize message_len; /* OK, got the header - determine how many more bytes are needed */ error = NULL; - message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, + message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer + worker->read_buffer_cur_pos, READ_BUFFER_MIN_READ_SIZE_FOR_HEADER, &error); if (message_len == -1) @@ -592,11 +596,15 @@ static gssize _g_dbus_worker_buffer_get_next_full_message_length (GDBusWorker *w g_error_free (error); return -1; } - if ((gsize)message_len <= worker->read_buffer_cur_size) + if (message_len <= remaining_data_in_buffer) return message_len; worker->read_buffer_bytes_wanted = message_len; } + else + { + worker->read_buffer_bytes_wanted = 0; + } return 0; } @@ -761,18 +769,19 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, { /* OK, got what we asked for! */ GDBusMessage *message; + guchar *read_buffer = (guchar *)worker->read_buffer + worker->read_buffer_cur_pos; error = NULL; /* TODO: use connection->priv->auth to decode the message */ - message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer, + message = g_dbus_message_new_from_blob (read_buffer, message_len, worker->capabilities, &error); if (message == NULL) { gchar *s; - s = _g_dbus_hexdump (worker->read_buffer, message_len, 2); + s = _g_dbus_hexdump (read_buffer, message_len, 2); g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n" "The error is: %s\n" "The payload is as follows:\n" @@ -832,7 +841,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, g_free (s); if (G_UNLIKELY (_g_dbus_debug_payload ())) { - s = _g_dbus_hexdump (worker->read_buffer, message_len, 2); + s = _g_dbus_hexdump (read_buffer, message_len, 2); g_print ("%s\n", s); g_free (s); } @@ -842,19 +851,23 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, /* yay, got a message, go deliver it */ _g_dbus_worker_queue_or_deliver_received_message (worker, g_steal_pointer (&message)); - /* set up reading another message */ - if (worker->read_buffer_cur_size > (gsize)message_len) - { - memmove (worker->read_buffer, - worker->read_buffer + message_len, - worker->read_buffer_cur_size - message_len); - } - worker->read_buffer_cur_size -= message_len; - worker->read_buffer_bytes_wanted = 0; + worker->read_buffer_cur_pos += message_len; } if (message_len == 0) { + /* set up reading another data package */ + if (worker->read_buffer_cur_pos > 0) + { + if (worker->read_buffer_cur_pos < worker->read_buffer_cur_size) + { + memmove (worker->read_buffer, + worker->read_buffer + worker->read_buffer_cur_pos, + worker->read_buffer_cur_size - worker->read_buffer_cur_pos); + } + worker->read_buffer_cur_size -= worker->read_buffer_cur_pos; + worker->read_buffer_cur_pos = 0; + } /* start reading another message or repeat the request to get all the bytes */ _g_dbus_worker_do_read_unlocked (worker); } @@ -2584,26 +2597,30 @@ read_message_print_transport_debug (gssize bytes_read, gsize size; gint32 serial; gint32 message_length; + guchar *read_buffer; if (G_LIKELY (!_g_dbus_debug_transport ())) goto out; - size = bytes_read + worker->read_buffer_cur_size; + read_buffer = (guchar *)worker->read_buffer + worker->read_buffer_cur_pos; + + size = bytes_read + worker->read_buffer_cur_size - worker->read_buffer_cur_pos; serial = 0; message_length = 0; if (size >= 16) - message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL); + message_length = g_dbus_message_bytes_needed ((guchar *) read_buffer, size, NULL); if (size >= 1) { - switch (worker->read_buffer[0]) + if (size >= 12) + memcpy (&serial, read_buffer + 2 * (sizeof (guint32)), sizeof (serial)); + + switch (read_buffer[0]) { case 'l': - if (size >= 12) - serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]); + serial = GUINT32_FROM_LE (serial); break; case 'B': - if (size >= 12) - serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]); + serial = GUINT32_FROM_BE (serial); break; default: /* an error will be set elsewhere if this happens */ -- GitLab