Commit 846aef66 authored by Carlos Garcia Campos's avatar Carlos Garcia Campos
Browse files

io-http2: ensure we actually send the reset stream frames

In case there's no more IO after the message is finished, we don't
really send the reset stream frame. We should ensure there's a valid
write after calling nghttp2_submit_rst_stream.
parent bdefdf01
......@@ -69,6 +69,7 @@ typedef struct {
GMainContext *async_context;
GHashTable *messages;
GHashTable *closed_messages;
nghttp2_session *session;
......@@ -77,6 +78,7 @@ typedef struct {
gssize write_buffer_size;
gssize written_bytes;
GSource *reset_stream_source;
gboolean is_shutdown;
} SoupClientMessageIOHTTP2;
......@@ -110,9 +112,11 @@ typedef struct {
GError *error;
gboolean paused;
guint32 stream_id;
gboolean can_be_restarted;
} SoupHTTP2MessageData;
static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data);
static void
NGCHECK (int return_code)
......@@ -487,17 +491,26 @@ on_before_frame_send_callback (nghttp2_session *session,
return 0;
}
static gboolean
remove_closed_stream (SoupHTTP2MessageData *data,
gpointer value,
nghttp2_frame *frame)
{
return data->stream_id == frame->hd.stream_id;
}
static int
on_frame_send_callback (nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
SoupClientMessageIOHTTP2 *io = user_data;
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
g_assert (data);
h2_debug (user_data, data, "[SEND] [HEADERS] finished=%d",
h2_debug (io, data, "[SEND] [HEADERS] finished=%d",
(frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) ? 1 : 0);
if (data->metrics)
......@@ -516,7 +529,7 @@ on_frame_send_callback (nghttp2_session *session,
if (data->state < STATE_WRITE_DATA)
advance_state_from (data, STATE_WRITE_HEADERS, STATE_WRITE_DATA);
h2_debug (user_data, data, "[SEND] [DATA] bytes=%zu, finished=%d",
h2_debug (io, data, "[SEND] [DATA] bytes=%zu, finished=%d",
frame->data.hd.length, frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
if (data->metrics) {
data->metrics->request_body_bytes_sent += frame->hd.length + FRAME_HEADER_SIZE;
......@@ -530,10 +543,11 @@ on_frame_send_callback (nghttp2_session *session,
}
break;
case NGHTTP2_RST_STREAM:
h2_debug (user_data, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, (gpointer)frame);
break;
default:
h2_debug (user_data, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
break;
}
......@@ -743,13 +757,21 @@ add_message_to_io_data (SoupClientMessageIOHTTP2 *io,
}
static void
soup_http2_message_data_free (SoupHTTP2MessageData *data)
soup_http2_message_data_close (SoupHTTP2MessageData *data)
{
if (data->body_istream)
/* Message data in close state is just waiting for reset stream to be sent
* to be removed from the messages hash table. Everything is reset but
* stream_id and io.
*/
if (data->body_istream) {
g_signal_handlers_disconnect_by_data (data->body_istream, data);
g_clear_object (&data->body_istream);
}
data->msg = NULL;
data->metrics = NULL;
data->cancellable = NULL;
g_clear_pointer (&data->item, soup_message_queue_item_unref);
g_clear_object (&data->body_istream);
g_clear_object (&data->decoded_data_istream);
if (data->io_source) {
......@@ -757,9 +779,10 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data)
g_clear_pointer (&data->io_source, g_source_unref);
}
if (data->data_source_poll)
if (data->data_source_poll) {
g_source_destroy (data->data_source_poll);
g_clear_pointer (&data->data_source_poll, g_source_unref);
g_clear_pointer (&data->data_source_poll, g_source_unref);
}
g_clear_error (&data->data_source_error);
g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
......@@ -768,6 +791,14 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data)
g_clear_error (&data->error);
data->completion_cb = NULL;
data->completion_data = NULL;
}
static void
soup_http2_message_data_free (SoupHTTP2MessageData *data)
{
soup_http2_message_data_close (data);
g_free (data);
}
......@@ -920,16 +951,29 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
g_object_ref (msg);
NGCHECK (nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id,
completion == SOUP_MESSAGE_IO_COMPLETE ? NGHTTP2_NO_ERROR : NGHTTP2_CANCEL));
nghttp2_session_set_stream_user_data (io->session, data->stream_id, NULL);
if (!g_hash_table_remove (io->messages, msg))
g_warn_if_reached ();
if (!io->is_shutdown) {
NGCHECK (nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id,
completion == SOUP_MESSAGE_IO_COMPLETE ? NGHTTP2_NO_ERROR : NGHTTP2_CANCEL));
soup_http2_message_data_close (data);
if (!g_hash_table_steal (io->messages, msg))
g_warn_if_reached ();
if (!g_hash_table_add (io->closed_messages, data))
g_warn_if_reached ();
} else {
if (!g_hash_table_remove (io->messages, msg))
g_warn_if_reached ();
}
if (completion_cb)
completion_cb (G_OBJECT (msg), SOUP_MESSAGE_IO_COMPLETE, completion_data);
g_object_unref (msg);
if (!io->is_shutdown)
io_write_until_stream_reset_is_sent (data);
}
static void
......@@ -1242,6 +1286,43 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
return done;
}
static gboolean
io_write_until_stream_reset_is_sent_ready (GObject *stream,
SoupHTTP2MessageData *data)
{
io_write_until_stream_reset_is_sent (data);
return G_SOURCE_REMOVE;
}
static void
io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data)
{
SoupClientMessageIOHTTP2 *io = data->io;
GError *error = NULL;
if (io->reset_stream_source) {
g_source_destroy (io->reset_stream_source);
g_clear_pointer (&io->reset_stream_source, g_source_unref);
}
while (g_hash_table_lookup (io->closed_messages, data)) {
if (!nghttp2_session_want_write (io->session)) {
error = g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would block"));
break;
}
if (!io_write (io, FALSE, FALSE, &error))
break;
}
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
io->reset_stream_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (io->ostream), NULL);
g_source_set_callback (io->reset_stream_source, (GSourceFunc)io_write_until_stream_reset_is_sent_ready, data, NULL);
g_source_attach (io->reset_stream_source, g_main_context_get_thread_default ());
}
g_clear_error (&error);
}
static gboolean
soup_client_message_io_http2_run_until_read (SoupClientMessageIO *iface,
......@@ -1379,10 +1460,15 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
{
SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
if (io->reset_stream_source) {
g_source_destroy (io->reset_stream_source);
g_clear_pointer (&io->reset_stream_source, g_source_unref);
}
g_clear_object (&io->stream);
g_clear_pointer (&io->async_context, g_main_context_unref);
g_clear_pointer (&io->session, nghttp2_session_del);
g_clear_pointer (&io->messages, g_hash_table_unref);
g_clear_pointer (&io->closed_messages, g_hash_table_unref);
g_free (io);
}
......@@ -1448,6 +1534,7 @@ soup_client_message_io_http2_init (SoupClientMessageIOHTTP2 *io)
nghttp2_session_callbacks_del (callbacks);
io->messages = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify)soup_http2_message_data_free);
io->closed_messages = g_hash_table_new_full (g_direct_hash, g_direct_equal, (GDestroyNotify)soup_http2_message_data_free, NULL);
io->iface.funcs = &io_funcs;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment