Commit 35f1bac5 authored by Carlos Garcia Campos's avatar Carlos Garcia Campos Committed by Carlos Garcia Campos

WebSockets: only poll IO stream when needed

Instead of having two pollable sources constantly running, always try to
read/write without blocking and start polling if the operation returns
G_IO_ERROR_WOULD_BLOCK. This patch also fixes test
/websocket/direct/close-after-close that was passing but not actually
testing what we wanted, because the client close was never sent. When
the mutex is released, the frame has been queued, but not sent.
parent f95a047a
Pipeline #109294 passed with stage
in 1 minute and 3 seconds
......@@ -152,6 +152,7 @@ struct _SoupWebsocketConnectionPrivate {
};
#define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT 128 * 1024
#define READ_BUFFER_SIZE 1024
  • 1024 seems a bit small... I think glib is using 8K these days internally. Shall we bump it here as well?

  • It shouldn't be hard to measure performance, for example with Autobahn. If it has a significant impact we should probably bump it.

Please register or sign in to reply
G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT)
......@@ -163,6 +164,11 @@ static void emit_error_and_close (SoupWebsocketConnection *self,
static void protocol_error_and_close (SoupWebsocketConnection *self);
static gboolean on_web_socket_input (GObject *pollable_stream,
gpointer user_data);
static gboolean on_web_socket_output (GObject *pollable_stream,
gpointer user_data);
/* Code below is based on g_utf8_validate() implementation,
* but handling NULL characters as valid, as expected by
* WebSockets and compliant with RFC 3629.
......@@ -291,7 +297,20 @@ on_iostream_closed (GObject *source,
}
static void
stop_input (SoupWebsocketConnection *self)
soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
{
SoupWebsocketConnectionPrivate *pv = self->pv;
if (pv->input_source)
return;
pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
g_source_attach (pv->input_source, pv->main_context);
}
static void
soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self)
{
SoupWebsocketConnectionPrivate *pv = self->pv;
......@@ -304,7 +323,20 @@ stop_input (SoupWebsocketConnection *self)
}
static void
stop_output (SoupWebsocketConnection *self)
soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
{
SoupWebsocketConnectionPrivate *pv = self->pv;
if (pv->output_source)
return;
pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
g_source_attach (pv->output_source, pv->main_context);
}
static void
soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self)
{
SoupWebsocketConnectionPrivate *pv = self->pv;
......@@ -349,8 +381,8 @@ close_io_stream (SoupWebsocketConnection *self)
close_io_stop_timeout (self);
if (!pv->io_closing) {
stop_input (self);
stop_output (self);
soup_websocket_connection_stop_input_source (self);
soup_websocket_connection_stop_output_source (self);
pv->io_closing = TRUE;
g_debug ("closing io stream");
g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
......@@ -368,7 +400,7 @@ shutdown_wr_io_stream (SoupWebsocketConnection *self)
GIOStream *base_iostream;
GError *error = NULL;
stop_output (self);
soup_websocket_connection_stop_output_source (self);
base_iostream = SOUP_IS_IO_STREAM (pv->io_stream) ?
soup_io_stream_get_base_iostream (SOUP_IO_STREAM (pv->io_stream)) :
......@@ -644,9 +676,6 @@ too_big_error_and_close (SoupWebsocketConnection *self,
self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client",
payload_len, self->pv->max_incoming_payload_size);
emit_error_and_close (self, error, TRUE);
/* The input is in an invalid state now */
stop_input (self);
}
static void
......@@ -1080,32 +1109,31 @@ process_incoming (SoupWebsocketConnection *self)
;
}
static gboolean
on_web_socket_input (GObject *pollable_stream,
gpointer user_data)
static void
soup_websocket_connection_read (SoupWebsocketConnection *self)
{
SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
SoupWebsocketConnectionPrivate *pv = self->pv;
GError *error = NULL;
gboolean end = FALSE;
gssize count;
gsize len;
soup_websocket_connection_stop_input_source (self);
do {
len = pv->incoming->len;
g_byte_array_set_size (pv->incoming, len + 1024);
g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE);
count = g_pollable_input_stream_read_nonblocking (pv->input,
pv->incoming->data + len,
1024, NULL, &error);
READ_BUFFER_SIZE, NULL, &error);
if (count < 0) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (error);
count = 0;
} else {
emit_error_and_close (self, error, TRUE);
return TRUE;
return;
}
} else if (count == 0) {
end = TRUE;
......@@ -1125,16 +1153,24 @@ on_web_socket_input (GObject *pollable_stream,
}
close_io_stream (self);
return;
}
return TRUE;
soup_websocket_connection_start_input_source (self);
}
static gboolean
on_web_socket_output (GObject *pollable_stream,
gpointer user_data)
on_web_socket_input (GObject *pollable_stream,
gpointer user_data)
{
soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data));
return G_SOURCE_REMOVE;
}
static void
soup_websocket_connection_write (SoupWebsocketConnection *self)
{
SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
SoupWebsocketConnectionPrivate *pv = self->pv;
const guint8 *data;
GError *error = NULL;
......@@ -1142,19 +1178,18 @@ on_web_socket_output (GObject *pollable_stream,
gssize count;
gsize len;
soup_websocket_connection_stop_output_source (self);
if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) {
g_debug ("Ignoring message since the connection is closed");
stop_output (self);
return TRUE;
return;
}
frame = g_queue_peek_head (&pv->outgoing);
/* No more frames to send */
if (frame == NULL) {
stop_output (self);
return TRUE;
}
if (frame == NULL)
return;
data = g_bytes_get_data (frame->data, &len);
g_assert (len > 0);
......@@ -1174,7 +1209,7 @@ on_web_socket_output (GObject *pollable_stream,
frame->pending = TRUE;
} else {
emit_error_and_close (self, error, TRUE);
return FALSE;
return;
}
}
......@@ -1192,23 +1227,21 @@ on_web_socket_output (GObject *pollable_stream,
}
}
frame_free (frame);
if (g_queue_is_empty (&pv->outgoing))
return;
}
return TRUE;
soup_websocket_connection_start_output_source (self);
}
static void
start_output (SoupWebsocketConnection *self)
static gboolean
on_web_socket_output (GObject *pollable_stream,
gpointer user_data)
{
SoupWebsocketConnectionPrivate *pv = self->pv;
soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data));
if (pv->output_source)
return;
g_debug ("starting output source");
pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
g_source_attach (pv->output_source, pv->main_context);
return G_SOURCE_REMOVE;
}
static void
......@@ -1249,7 +1282,7 @@ queue_frame (SoupWebsocketConnection *self,
g_queue_push_tail (&pv->outgoing, frame);
}
start_output (self);
soup_websocket_connection_write (self);
}
static void
......@@ -1274,9 +1307,7 @@ soup_websocket_connection_constructed (GObject *object)
pv->output = G_POLLABLE_OUTPUT_STREAM (os);
g_return_if_fail (g_pollable_output_stream_can_poll (pv->output));
pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
g_source_attach (pv->input_source, pv->main_context);
soup_websocket_connection_start_input_source (self);
}
static void
......
......@@ -1017,6 +1017,7 @@ close_after_close_server_thread (gpointer user_data)
const char frames[] =
"\x88\x09\x03\xe8""reason1"
"\x88\x09\x03\xe8""reason2";
GSocket *socket;
GError *error = NULL;
g_mutex_lock (&test->mutex);
......@@ -1026,7 +1027,8 @@ close_after_close_server_thread (gpointer user_data)
frames, sizeof (frames) -1, &written, NULL, &error);
g_assert_no_error (error);
g_assert_cmpuint (written, ==, sizeof (frames) - 1);
g_io_stream_close (test->raw_server, NULL, &error);
socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server));
g_socket_shutdown (socket, FALSE, TRUE, &error);
g_assert_no_error (error);
return NULL;
......@@ -1050,6 +1052,7 @@ test_close_after_close (Test *test,
WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED);
g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, SOUP_WEBSOCKET_CLOSE_NORMAL);
g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1");
g_io_stream_close (test->raw_server, NULL, NULL);
}
static gpointer
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment