Commit 3357288c authored by Carlos Garcia Campos's avatar Carlos Garcia Campos
Browse files

io-http2: simplify async io handling

Use a global polling source for reading and process any pending io
operation after every successful read. Every time we submit data to the
session we try to write, using a single global polling source in case
the operation would block until everything is written. This simplifies
the io handling and avoids the creation and destruction of a lot of
polling sources.
parent c8cbfd4b
......@@ -43,11 +43,11 @@ struct _SoupBodyInputStreamHttp2 {
typedef struct {
GSList *chunks;
GPollableInputStream *parent_stream;
gsize start_offset;
gsize len;
gsize pos;
gboolean completed;
GCancellable *need_more_data_cancellable;
} SoupBodyInputStreamHttp2Private;
static void soup_body_input_stream_http2_pollable_iface_init (GPollableInputStreamInterface *iface);
......@@ -72,24 +72,15 @@ static guint signals [LAST_SIGNAL] = { 0 };
* Returns: a new #GInputStream
*/
GInputStream *
soup_body_input_stream_http2_new (GPollableInputStream *parent_stream)
soup_body_input_stream_http2_new ()
{
GInputStream *stream;
SoupBodyInputStreamHttp2Private *priv;
g_assert (G_IS_POLLABLE_INPUT_STREAM (parent_stream));
stream = g_object_new (SOUP_TYPE_BODY_INPUT_STREAM_HTTP2, NULL);
priv = soup_body_input_stream_http2_get_instance_private (SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
priv->parent_stream = g_object_ref (parent_stream);
return stream;
return G_INPUT_STREAM (g_object_new (SOUP_TYPE_BODY_INPUT_STREAM_HTTP2, NULL));
}
void
soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
const guint8 *data,
gsize size)
const guint8 *data,
gsize size)
{
SoupBodyInputStreamHttp2Private *priv;
......@@ -100,6 +91,21 @@ soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
priv->chunks = g_slist_append (priv->chunks, g_bytes_new (data, size));
priv->len += size;
if (priv->need_more_data_cancellable) {
g_cancellable_cancel (priv->need_more_data_cancellable);
g_clear_object (&priv->need_more_data_cancellable);
}
}
gboolean
soup_body_input_stream_http2_is_blocked (SoupBodyInputStreamHttp2 *stream)
{
SoupBodyInputStreamHttp2Private *priv;
g_return_val_if_fail (SOUP_IS_BODY_INPUT_STREAM_HTTP2 (stream), FALSE);
priv = soup_body_input_stream_http2_get_instance_private (stream);
return priv->need_more_data_cancellable != NULL;
}
static gssize
......@@ -174,9 +180,7 @@ soup_body_input_stream_http2_read_real (GInputStream *stream,
if (count == 0 && blocking && !priv->completed) {
GError *read_error = NULL;
g_signal_emit (memory_stream, signals[NEED_MORE_DATA], 0,
cancellable,
TRUE,
&read_error);
cancellable, &read_error);
if (read_error) {
g_propagate_error (error, read_error);
......@@ -214,28 +218,7 @@ soup_body_input_stream_http2_read_nonblocking (GPollableInputStream *stream,
gsize read = soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE, buffer, count, NULL, &inner_error);
if (read == 0 && !priv->completed && !inner_error) {
/* Try requesting more reads from the io backend */
GError *inner_error = NULL;
g_signal_emit (memory_stream, signals[NEED_MORE_DATA], 0,
NULL, FALSE, &inner_error);
if (inner_error) {
g_propagate_error (error, inner_error);
return -1;
}
if (priv->completed)
return soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE, buffer, count, NULL, error);
if (priv->pos < priv->len) {
read = soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE, buffer, count, NULL, NULL);
if (read > 0)
return read;
}
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, "Operation would block");
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would block"));
return -1;
}
......@@ -250,6 +233,10 @@ soup_body_input_stream_http2_complete (SoupBodyInputStreamHttp2 *stream)
{
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
priv->completed = TRUE;
if (priv->need_more_data_cancellable) {
g_cancellable_cancel (priv->need_more_data_cancellable);
g_clear_object (&priv->need_more_data_cancellable);
}
}
static gssize
......@@ -352,13 +339,9 @@ soup_body_input_stream_http2_close_finish (GInputStream *stream,
static gboolean
soup_body_input_stream_http2_is_readable (GPollableInputStream *stream)
{
SoupBodyInputStreamHttp2 *memory_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (memory_stream);
if (priv->pos < priv->len || priv->completed)
return TRUE;
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (priv->parent_stream));
return priv->pos < priv->len || priv->completed;
}
static GSource *
......@@ -368,10 +351,9 @@ soup_body_input_stream_http2_create_source (GPollableInputStream *stream,
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
GSource *base_source, *pollable_source;
if (g_pollable_input_stream_is_readable (stream))
base_source = g_timeout_source_new (0);
else
base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (priv->parent_stream), NULL);
if (!priv->need_more_data_cancellable)
priv->need_more_data_cancellable = g_cancellable_new ();
base_source = g_cancellable_source_new (priv->need_more_data_cancellable);
pollable_source = g_pollable_source_new_full (stream, base_source, cancellable);
g_source_set_name (pollable_source, "SoupMemoryStreamSource");
......@@ -387,6 +369,10 @@ soup_body_input_stream_http2_dispose (GObject *object)
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
priv->completed = TRUE;
if (priv->need_more_data_cancellable) {
g_cancellable_cancel (priv->need_more_data_cancellable);
g_clear_object (&priv->need_more_data_cancellable);
}
G_OBJECT_CLASS (soup_body_input_stream_http2_parent_class)->dispose (object);
}
......@@ -398,7 +384,6 @@ soup_body_input_stream_http2_finalize (GObject *object)
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
g_slist_free_full (priv->chunks, (GDestroyNotify)g_bytes_unref);
g_clear_object (&priv->parent_stream);
G_OBJECT_CLASS (soup_body_input_stream_http2_parent_class)->finalize (object);
}
......@@ -444,5 +429,5 @@ soup_body_input_stream_http2_class_init (SoupBodyInputStreamHttp2Class *klass)
NULL, NULL,
NULL,
G_TYPE_ERROR,
2, G_TYPE_CANCELLABLE, G_TYPE_BOOLEAN);
1, G_TYPE_CANCELLABLE);
}
......@@ -6,7 +6,7 @@
#define SOUP_TYPE_BODY_INPUT_STREAM_HTTP2 (soup_body_input_stream_http2_get_type ())
G_DECLARE_FINAL_TYPE (SoupBodyInputStreamHttp2, soup_body_input_stream_http2, SOUP, BODY_INPUT_STREAM_HTTP2, GInputStream)
GInputStream * soup_body_input_stream_http2_new (GPollableInputStream *parent_stream);
GInputStream * soup_body_input_stream_http2_new (void);
void soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
const guint8 *data,
......@@ -14,4 +14,7 @@ void soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2
void soup_body_input_stream_http2_complete (SoupBodyInputStreamHttp2 *stream);
/* This is only used for tests */
gboolean soup_body_input_stream_http2_is_blocked (SoupBodyInputStreamHttp2 *stream);
G_END_DECLS
......@@ -103,6 +103,7 @@ SoupClientMessageIO *soup_message_get_io_data (SoupMessage *msg);
void soup_message_set_content_sniffer (SoupMessage *msg,
SoupContentSniffer *sniffer);
gboolean soup_message_has_content_sniffer (SoupMessage *msg);
gboolean soup_message_try_sniff_content (SoupMessage *msg,
GInputStream *stream,
gboolean blocking,
......
......@@ -2323,6 +2323,14 @@ soup_message_set_content_sniffer (SoupMessage *msg, SoupContentSniffer *sniffer)
priv->sniffer = sniffer ? g_object_ref (sniffer) : NULL;
}
gboolean
soup_message_has_content_sniffer (SoupMessage *msg)
{
SoupMessagePrivate *priv = soup_message_get_instance_private (msg);
return priv->sniffer != NULL;
}
gboolean
soup_message_try_sniff_content (SoupMessage *msg,
GInputStream *stream,
......
......@@ -26,8 +26,7 @@ do_large_data_test (void)
#define CHUNK_SIZE (gsize)1024 * 1024 * 512 // 512 MiB
#define TEST_SIZE CHUNK_SIZE * 20 // 10 GiB
GInputStream *parent_stream = g_memory_input_stream_new ();
GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
GInputStream *stream = soup_body_input_stream_http2_new ();
SoupBodyInputStreamHttp2 *mem_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
gsize data_needed = TEST_SIZE;
guint8 *memory_chunk = g_new (guint8, CHUNK_SIZE);
......@@ -58,15 +57,13 @@ do_large_data_test (void)
g_free (trash_buffer);
g_free (memory_chunk);
g_object_unref (parent_stream);
g_object_unref (stream);
}
static void
do_multiple_chunk_test (void)
{
GInputStream *parent_stream = g_memory_input_stream_new ();
GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
GInputStream *stream = soup_body_input_stream_http2_new ();
SoupBodyInputStreamHttp2 *mem_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
const char * const chunks[] = {
"1234", "5678", "9012", "hell", "owor", "ld..",
......@@ -85,7 +82,6 @@ do_multiple_chunk_test (void)
g_assert_cmpstr (buffer, ==, chunks[i]);
}
g_object_unref (parent_stream);
g_object_unref (stream);
}
......@@ -104,8 +100,7 @@ on_skip_ready (GInputStream *stream, GAsyncResult *res, GMainLoop *loop)
static void
do_skip_async_test (void)
{
GInputStream *parent_stream = g_memory_input_stream_new ();
GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
GInputStream *stream = soup_body_input_stream_http2_new ();
SoupBodyInputStreamHttp2 *bistream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
GMainLoop *loop = g_main_loop_new (NULL, FALSE);
......@@ -114,7 +109,6 @@ do_skip_async_test (void)
g_input_stream_skip_async (stream, 2, G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback)on_skip_ready, loop);
g_main_loop_run (loop);
g_object_unref (parent_stream);
g_object_unref (stream);
g_main_loop_unref (loop);
}
......
......@@ -297,8 +297,7 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
{
GMainContext *async_context = g_main_context_ref_thread_default ();
GInputStream *parent_stream = g_memory_input_stream_new ();
GInputStream *in_stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
GInputStream *in_stream = soup_body_input_stream_http2_new ();
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), (guint8*)"Part 1 -", 8);
test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post");
......@@ -307,10 +306,9 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
GBytes *response = NULL;
soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete, &response);
int iteration_count = 20;
while (!response) {
// Let it iterate for a bit waiting on blocked data
if (iteration_count-- == 0) {
if (soup_body_input_stream_http2_is_blocked (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream))) {
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), (guint8*)" Part 2", 8);
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream));
}
......@@ -323,7 +321,6 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
g_main_context_iteration (async_context, FALSE);
g_bytes_unref (response);
g_object_unref (parent_stream);
g_object_unref (in_stream);
g_main_context_unref (async_context);
g_object_unref (test->msg);
......@@ -869,8 +866,7 @@ do_sniffer_sync_test (Test *test, gconstpointer data)
soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/", 11, TRUE, NULL);
/* FIXME: large seems to be broken in sync mode */
/* do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE, NULL); */
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE, NULL);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content", 0, FALSE, NULL);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment