Commit e02749b4 authored by Carlos Garcia Campos's avatar Carlos Garcia Campos
Browse files

io-http2: use the item cancellable for send data operations

And use the data source cancellable to create the io source to wait for
the send data operations.
parent c83062d1
Pipeline #285521 failed with stages
in 5 minutes and 25 seconds
......@@ -562,6 +562,9 @@ on_data_readable (GInputStream *stream,
{
SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
g_cancellable_cancel (data->data_source_cancellable);
g_clear_object (&data->data_source_cancellable);
NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id));
g_clear_pointer (&data->data_source_poll, g_source_unref);
......@@ -579,6 +582,9 @@ on_data_read (GInputStream *source,
h2_debug (data->io, data, "[SEND_BODY] Read %zd", read);
g_cancellable_cancel (data->data_source_cancellable);
g_clear_object (&data->data_source_cancellable);
/* This operation may have outlived the message data in which
case this will have been cancelled. */
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
......@@ -625,13 +631,6 @@ on_data_source_read_callback (nghttp2_session *session,
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
SoupClientMessageIOHTTP2 *io = get_io_data (data->msg);
/* This cancellable is only used for async data source operations,
* only exists while reading is happening, and will be cancelled
* at any point if the data is freed.
*/
if (!data->data_source_cancellable)
data->data_source_cancellable = g_cancellable_new ();
/* We support pollable streams in the best case because they
* should perform better with one fewer copy of each buffer and no threading. */
if (G_IS_POLLABLE_INPUT_STREAM (source->ptr) && g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (source->ptr))) {
......@@ -650,12 +649,14 @@ on_data_source_read_callback (nghttp2_session *session,
g_assert (data->data_source_poll == NULL);
h2_debug (io, data, "[SEND_BODY] Polling");
data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->data_source_cancellable);
data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->cancellable);
g_source_set_callback (data->data_source_poll, (GSourceFunc)on_data_readable, data, NULL);
g_source_set_priority (data->data_source_poll, get_data_io_priority (data));
g_source_attach (data->data_source_poll, g_main_context_get_thread_default ());
g_error_free (error);
g_assert (!data->data_source_cancellable);
data->data_source_cancellable = g_cancellable_new ();
return NGHTTP2_ERR_DEFERRED;
}
......@@ -678,7 +679,7 @@ on_data_source_read_callback (nghttp2_session *session,
if (!data->data_source_buffer)
data->data_source_buffer = g_byte_array_new ();
gsize buffer_len = data->data_source_buffer->len;
guint buffer_len = data->data_source_buffer->len;
if (buffer_len) {
h2_debug (io, data, "[SEND_BODY] Sending %zu", buffer_len);
g_assert (buffer_len <= length); /* QUESTION: Maybe not reliable */
......@@ -688,19 +689,19 @@ on_data_source_read_callback (nghttp2_session *session,
return buffer_len;
} else if (data->data_source_eof) {
h2_debug (io, data, "[SEND_BODY] EOF");
g_clear_object (&data->data_source_cancellable);
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
} else if (data->data_source_error) {
g_clear_object (&data->data_source_cancellable);
set_error_for_data (data, g_steal_pointer (&data->data_source_error));
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
} else {
h2_debug (io, data, "[SEND_BODY] Reading async");
g_byte_array_set_size (data->data_source_buffer, length);
g_assert (!data->data_source_cancellable);
data->data_source_cancellable = g_cancellable_new ();
g_input_stream_read_async (in_stream, data->data_source_buffer->data, length,
get_data_io_priority (data),
data->data_source_cancellable,
data->cancellable,
(GAsyncReadyCallback)on_data_read, data);
return NGHTTP2_ERR_DEFERRED;
}
......@@ -754,10 +755,7 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data)
g_clear_error (&data->data_source_error);
g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
if (data->data_source_cancellable) {
g_cancellable_cancel (data->data_source_cancellable);
g_clear_object (&data->data_source_cancellable);
}
g_clear_object (&data->data_source_cancellable);
g_clear_error (&data->error);
......@@ -1024,6 +1022,8 @@ soup_client_message_io_http2_get_source (SoupMessage *msg,
/* TODO: Handle mixing writes in? */
if (data->paused)
base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
else if (data->state < STATE_WRITE_DONE && data->data_source_cancellable)
base_source = g_cancellable_source_new (data->data_source_cancellable);
else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session))
base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
else if (data->state < STATE_READ_DONE && data->decoded_data_istream)
......@@ -1196,7 +1196,7 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
g_object_ref (msg);
while (progress && get_io_data (msg) == io && !data->paused && data->state < state)
while (progress && get_io_data (msg) == io && !data->paused && !data->data_source_cancellable && data->state < state)
progress = io_run (data, blocking, cancellable, &my_error);
if (my_error) {
......
......@@ -314,7 +314,7 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), (guint8*)" Part 2", 8);
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream));
}
g_main_context_iteration (async_context, FALSE);
g_main_context_iteration (async_context, TRUE);
}
g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Part 1 - Part 2");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment