Commit 69258b57 authored by Carlos Garcia Campos's avatar Carlos Garcia Campos
Browse files

io-http2: ensure we stop polling the network input stream after the decoded data stream is created

In some cases we might end up polling the network input forever, if
a message read iteration handles the end of another message and there's
nothing more to read from the network. In those cases we should make
sure we stop polling the network to poll the decoded data stream
instead.
parent 9c6d4837
Pipeline #286296 failed with stages
in 5 minutes and 45 seconds
......@@ -94,6 +94,8 @@ typedef struct {
GCancellable *cancellable;
GInputStream *decoded_data_istream;
GInputStream *body_istream;
GTask *task;
gboolean in_run_until_read_async;
/* Request body logger */
SoupLogger *logger;
......@@ -120,10 +122,12 @@ typedef struct {
gboolean can_be_restarted;
} SoupHTTP2MessageData;
static void io_run_until_read_async (SoupHTTP2MessageData *data);
static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data);
static void io_idle_read (SoupClientMessageIOHTTP2 *io);
static void io_close (SoupClientMessageIOHTTP2 *io);
static void io_poll (SoupHTTP2MessageData *data);
static void
NGCHECK (int return_code)
......@@ -592,10 +596,23 @@ on_stream_close_callback (nghttp2_session *session,
void *user_data)
{
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
h2_debug (user_data, data, "[SESSION] Closed: %s", nghttp2_http2_strerror (error_code));
if (error_code == NGHTTP2_REFUSED_STREAM && data && data->state < STATE_READ_DATA)
if (!data)
return 0;
if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA)
data->can_be_restarted = TRUE;
if (data->state < STATE_READ_DATA && !data->in_run_until_read_async) {
/* Start polling the decoded data stream instead of the network input stream. */
if (data->io_source) {
g_source_destroy (data->io_source);
g_clear_pointer (&data->io_source, g_source_unref);
}
io_poll (data);
}
return 0;
}
......@@ -1114,14 +1131,27 @@ message_source_check (GSource *source)
return FALSE;
}
static GSource *
soup_client_message_io_http2_get_source (SoupHTTP2MessageData *data,
SoupMessage *msg,
GCancellable *cancellable,
SoupMessageIOSourceFunc callback,
gpointer user_data)
static gboolean
io_poll_ready (SoupMessage *msg,
gpointer user_data)
{
SoupHTTP2MessageData *data = user_data;
io_run_until_read_async (data);
return G_SOURCE_REMOVE;
}
static void
io_poll (SoupHTTP2MessageData *data)
{
GSource *base_source;
GCancellable *cancellable;
g_assert (data->task);
g_assert (!data->io_source);
cancellable = g_task_get_cancellable (data->task);
/* TODO: Handle mixing writes in? */
if (data->paused)
......@@ -1139,11 +1169,14 @@ soup_client_message_io_http2_get_source (SoupHTTP2MessageData *data,
base_source = g_timeout_source_new (0);
}
GSource *source = soup_message_io_source_new (base_source, G_OBJECT (msg), data->paused, message_source_check);
g_source_set_callback (source, (GSourceFunc)callback, user_data, NULL);
return source;
data->io_source = soup_message_io_source_new (base_source, G_OBJECT (data->msg),
data->paused, message_source_check);
g_source_set_callback (data->io_source, (GSourceFunc)io_poll_ready, data, NULL);
g_source_set_priority (data->io_source, g_task_get_priority (data->task));
g_source_attach (data->io_source, data->io->async_context);
}
static void
client_stream_eof (SoupClientInputStream *stream,
gpointer user_data)
......@@ -1461,26 +1494,11 @@ soup_client_message_io_http2_run (SoupClientMessageIO *iface,
g_assert_not_reached ();
}
static void io_run_until_read_async (SoupMessage *msg,
GTask *task);
static gboolean
io_run_until_read_ready (SoupMessage *msg,
gpointer user_data)
{
GTask *task = user_data;
io_run_until_read_async (msg, task);
return G_SOURCE_REMOVE;
}
static void
io_run_until_read_async (SoupMessage *msg,
GTask *task)
io_run_until_read_async (SoupHTTP2MessageData *data)
{
SoupClientMessageIOHTTP2 *io = get_io_data (msg);
SoupHTTP2MessageData *data = get_data_for_message (io, msg);
SoupClientMessageIOHTTP2 *io = data->io;
GTask *task = data->task;
GError *error = NULL;
if (data->io_source) {
......@@ -1488,10 +1506,15 @@ io_run_until_read_async (SoupMessage *msg,
g_clear_pointer (&data->io_source, g_source_unref);
}
if (io_run_until (io, msg, FALSE,
data->in_run_until_read_async = TRUE;
if (io_run_until (io, data->msg, FALSE,
STATE_READ_DATA,
g_task_get_cancellable (task),
&error)) {
data->task = NULL;
data->in_run_until_read_async = FALSE;
g_task_return_boolean (task, TRUE);
g_object_unref (task);
return;
......@@ -1499,23 +1522,23 @@ io_run_until_read_async (SoupMessage *msg,
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (error);
data->io_source = soup_client_message_io_http2_get_source (data, msg, g_task_get_cancellable (task),
(SoupMessageIOSourceFunc)io_run_until_read_ready,
task);
g_source_set_priority (data->io_source, g_task_get_priority (task));
g_source_attach (data->io_source, io->async_context);
io_poll (data);
data->in_run_until_read_async = FALSE;
return;
}
if (get_io_data (msg) == io) {
if (get_io_data (data->msg) == io) {
if (data->can_be_restarted)
data->item->state = SOUP_MESSAGE_RESTARTING;
else
soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
soup_client_message_io_http2_finished ((SoupClientMessageIO *)io, msg);
soup_client_message_io_http2_finished ((SoupClientMessageIO *)data->io, data->msg);
}
data->task = NULL;
data->in_run_until_read_async = FALSE;
g_task_return_error (task, error);
g_object_unref (task);
}
......@@ -1528,11 +1551,12 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
GAsyncReadyCallback callback,
gpointer user_data)
{
GTask *task;
SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
SoupHTTP2MessageData *data = get_data_for_message (io, msg);
task = g_task_new (msg, cancellable, callback, user_data);
g_task_set_priority (task, io_priority);
io_run_until_read_async (msg, task);
data->task = g_task_new (msg, cancellable, callback, user_data);
g_task_set_priority (data->task, io_priority);
io_run_until_read_async (data);
}
static gboolean
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment