Commit d691bf0b authored by Carlos Garcia Campos's avatar Carlos Garcia Campos
Browse files

Do not use the send operation cancellable after send completes

This is not a problem if the app uses the same cancellable for reading
the stream, but if a different cancellable is used, cancelling the first
one would cancel the stream read operation too.
parent 62affd25
Pipeline #287163 passed with stages
in 5 minutes and 42 seconds
......@@ -1128,6 +1128,15 @@ soup_client_message_io_http1_is_reusable (SoupClientMessageIO *iface)
return io->is_reusable;
}
static GCancellable *
soup_client_message_io_http1_get_cancellable (SoupClientMessageIO *iface,
SoupMessage *msg)
{
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
return io->msg_io ? io->msg_io->item->cancellable : NULL;
}
static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http1_destroy,
soup_client_message_io_http1_finished,
......@@ -1145,6 +1154,7 @@ static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http1_is_open,
soup_client_message_io_http1_in_progress,
soup_client_message_io_http1_is_reusable,
soup_client_message_io_http1_get_cancellable
};
SoupClientMessageIO *
......
......@@ -91,7 +91,6 @@ typedef struct {
SoupMessageQueueItem *item;
SoupMessage *msg;
SoupMessageMetrics *metrics;
GCancellable *cancellable;
GInputStream *decoded_data_istream;
GInputStream *body_istream;
GTask *task;
......@@ -657,7 +656,7 @@ on_frame_recv_callback (nghttp2_session *session,
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && data->body_istream) {
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (data->body_istream));
if (data->state == STATE_READ_DATA_START)
io_try_sniff_content (data, FALSE, data->cancellable);
io_try_sniff_content (data, FALSE, data->item->cancellable);
}
break;
case NGHTTP2_RST_STREAM:
......@@ -690,7 +689,7 @@ on_data_chunk_recv_callback (nghttp2_session *session,
g_assert (msgdata->body_istream != NULL);
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (msgdata->body_istream), data, len);
if (msgdata->state == STATE_READ_DATA_START)
io_try_sniff_content (msgdata, FALSE, msgdata->cancellable);
io_try_sniff_content (msgdata, FALSE, msgdata->item->cancellable);
return 0;
}
......@@ -897,7 +896,7 @@ on_data_source_read_callback (nghttp2_session *session,
GPollableInputStream *in_stream = G_POLLABLE_INPUT_STREAM (source->ptr);
GError *error = NULL;
gssize read = g_pollable_input_stream_read_nonblocking (in_stream, buf, length, data->cancellable, &error);
gssize read = g_pollable_input_stream_read_nonblocking (in_stream, buf, length, data->item->cancellable, &error);
if (read) {
h2_debug (data->io, data, "[SEND_BODY] Read %zd", read);
......@@ -909,7 +908,7 @@ on_data_source_read_callback (nghttp2_session *session,
g_assert (data->data_source_poll == NULL);
h2_debug (data->io, data, "[SEND_BODY] Polling");
data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->cancellable);
data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->item->cancellable);
g_source_set_callback (data->data_source_poll, (GSourceFunc)on_data_readable, data, NULL);
g_source_set_priority (data->data_source_poll, get_data_io_priority (data));
g_source_attach (data->data_source_poll, g_main_context_get_thread_default ());
......@@ -957,7 +956,7 @@ on_data_source_read_callback (nghttp2_session *session,
g_byte_array_set_size (data->data_source_buffer, length);
g_input_stream_read_async (in_stream, data->data_source_buffer->data, length,
get_data_io_priority (data),
data->cancellable,
data->item->cancellable,
(GAsyncReadyCallback)on_data_read, data);
return NGHTTP2_ERR_DEFERRED;
}
......@@ -967,7 +966,7 @@ on_data_source_read_callback (nghttp2_session *session,
/* HTTP2 IO functions */
static SoupHTTP2MessageData *
add_message_to_io_data (SoupClientMessageIOHTTP2 *io,
add_message_to_io_data (SoupClientMessageIOHTTP2 *io,
SoupMessageQueueItem *item,
SoupMessageIOCompletionFn completion_cb,
gpointer completion_data)
......@@ -977,7 +976,6 @@ add_message_to_io_data (SoupClientMessageIOHTTP2 *io,
data->item = soup_message_queue_item_ref (item);
data->msg = item->msg;
data->metrics = soup_message_get_metrics (data->msg);
data->cancellable = item->cancellable;
data->completion_cb = completion_cb;
data->completion_data = completion_data;
data->stream_id = 0;
......@@ -1003,7 +1001,6 @@ soup_http2_message_data_close (SoupHTTP2MessageData *data)
data->msg = NULL;
data->metrics = NULL;
data->cancellable = NULL;
g_clear_pointer (&data->item, soup_message_queue_item_unref);
g_clear_object (&data->decoded_data_istream);
......@@ -1296,6 +1293,16 @@ soup_client_message_io_http2_is_reusable (SoupClientMessageIO *iface)
return soup_client_message_io_http2_is_open (iface);
}
static GCancellable *
soup_client_message_io_http2_get_cancellable (SoupClientMessageIO *iface,
SoupMessage *msg)
{
SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
SoupHTTP2MessageData *data = get_data_for_message (io, msg);
return data ? data->item->cancellable : NULL;
}
static void
client_stream_eof (SoupClientInputStream *stream,
gpointer user_data)
......@@ -1530,7 +1537,8 @@ static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http2_skip,
soup_client_message_io_http2_is_open,
soup_client_message_io_http2_in_progress,
soup_client_message_io_http2_is_reusable
soup_client_message_io_http2_is_reusable,
soup_client_message_io_http2_get_cancellable
};
G_GNUC_PRINTF(1, 0)
......
......@@ -110,6 +110,9 @@ soup_client_input_stream_read_fn (GInputStream *stream,
SoupClientInputStreamPrivate *priv = soup_client_input_stream_get_instance_private (SOUP_CLIENT_INPUT_STREAM (stream));
gssize nread;
if (g_cancellable_set_error_if_cancelled (soup_message_io_get_cancellable (priv->msg), error))
return -1;
nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
read_fn (stream, buffer, count, cancellable, error);
......@@ -131,6 +134,9 @@ soup_client_input_stream_skip (GInputStream *stream,
SoupClientInputStreamPrivate *priv = soup_client_input_stream_get_instance_private (SOUP_CLIENT_INPUT_STREAM (stream));
gssize nread;
if (g_cancellable_set_error_if_cancelled (soup_message_io_get_cancellable (priv->msg), error))
return -1;
nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
skip (stream, count, cancellable, error);
......@@ -152,6 +158,9 @@ soup_client_input_stream_read_nonblocking (GPollableInputStream *stream,
SoupClientInputStreamPrivate *priv = soup_client_input_stream_get_instance_private (SOUP_CLIENT_INPUT_STREAM (stream));
gssize nread;
if (g_cancellable_set_error_if_cancelled (soup_message_io_get_cancellable (priv->msg), error))
return -1;
nread = soup_client_input_stream_parent_pollable_interface->
read_nonblocking (stream, buffer, count, error);
......
......@@ -133,3 +133,10 @@ soup_client_message_io_is_reusable (SoupClientMessageIO *io)
{
return io->funcs->is_reusable (io);
}
GCancellable *
soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
SoupMessage *msg)
{
return io->funcs->get_cancellable (io, msg);
}
......@@ -53,6 +53,8 @@ typedef struct {
gboolean (*in_progress) (SoupClientMessageIO *io,
SoupMessage *msg);
gboolean (*is_reusable) (SoupClientMessageIO *io);
GCancellable *(*get_cancellable) (SoupClientMessageIO *io,
SoupMessage *msg);
} SoupClientMessageIOFuncs;
struct _SoupClientMessageIO {
......@@ -101,3 +103,5 @@ gboolean soup_client_message_io_is_open (SoupClientMessageIO
gboolean soup_client_message_io_in_progress (SoupClientMessageIO *io,
SoupMessage *msg);
gboolean soup_client_message_io_is_reusable (SoupClientMessageIO *io);
GCancellable *soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
SoupMessage *msg);
......@@ -72,6 +72,8 @@ gboolean soup_message_io_run_until_read_finish (SoupMessage *msg,
GInputStream *soup_message_io_get_response_istream (SoupMessage *msg,
GError **error);
GCancellable *soup_message_io_get_cancellable (SoupMessage *msg);
void soup_message_wrote_headers (SoupMessage *msg);
void soup_message_wrote_body_data (SoupMessage *msg,
gsize chunk_size);
......
......@@ -14,10 +14,10 @@
#include "soup.h"
SoupMessageQueueItem *
soup_message_queue_item_new (SoupSession *session,
SoupMessage *msg,
gboolean async,
GCancellable *cancellable)
soup_message_queue_item_new (SoupSession *session,
SoupMessage *msg,
gboolean async,
GCancellable *cancellable)
{
SoupMessageQueueItem *item;
......@@ -28,9 +28,6 @@ soup_message_queue_item_new (SoupSession *session,
item->cancellable = cancellable ? g_object_ref (cancellable) : g_cancellable_new ();
item->priority = soup_message_get_priority (msg);
g_signal_connect_swapped (msg, "restarted",
G_CALLBACK (g_cancellable_reset),
item->cancellable);
return item;
}
......
......@@ -2290,6 +2290,17 @@ soup_message_io_skip (SoupMessage *msg,
return soup_client_message_io_skip (priv->io_data, msg, blocking, cancellable, error);
}
GCancellable *
soup_message_io_get_cancellable (SoupMessage *msg)
{
SoupMessagePrivate *priv = soup_message_get_instance_private (msg);
if (!priv->io_data)
return NULL;
return soup_client_message_io_get_cancellable (priv->io_data, msg);
}
void
soup_message_send_item (SoupMessage *msg,
SoupMessageQueueItem *item,
......
......@@ -2903,6 +2903,10 @@ async_send_request_return_result (SoupMessageQueueItem *item,
task = item->task;
item->task = NULL;
/* This cancellable was set for the send operation that is done now */
g_object_unref (item->cancellable);
item->cancellable = g_cancellable_new ();
if (error)
g_task_return_error (task, error);
else if (item->error) {
......@@ -3439,6 +3443,10 @@ soup_session_send (SoupSession *session,
g_object_unref (ostream);
}
/* This cancellable was set for the send operation that is done now */
g_object_unref (item->cancellable);
item->cancellable = g_cancellable_new ();
if (my_error)
g_propagate_error (error, my_error);
else if (item->error) {
......@@ -3845,7 +3853,7 @@ soup_session_websocket_connect_async (SoupSession *session,
item = soup_session_append_queue_item (session, msg, TRUE, cancellable);
item->io_priority = io_priority;
task = g_task_new (session, cancellable, callback, user_data);
task = g_task_new (session, item->cancellable, callback, user_data);
g_task_set_task_data (task, item, (GDestroyNotify) soup_message_queue_item_unref);
soup_message_add_status_code_handler (msg, "got-informational",
......@@ -3948,7 +3956,7 @@ soup_session_preconnect_async (SoupSession *session,
item->connect_only = TRUE;
item->io_priority = io_priority;
task = g_task_new (session, cancellable, callback, user_data);
task = g_task_new (session, item->cancellable, callback, user_data);
g_task_set_priority (task, io_priority);
g_task_set_task_data (task, item, (GDestroyNotify)soup_message_queue_item_unref);
......
......@@ -242,6 +242,68 @@ do_cancellation_test (Test *test, gconstpointer data)
g_main_context_unref (async_context);
}
static void
do_one_cancel_after_send_request_test (SoupSession *session,
gboolean reuse_cancellable,
gboolean cancelled_by_session)
{
SoupMessage *msg;
GCancellable *cancellable;
GInputStream *istream;
GOutputStream *ostream;
guint flags = SOUP_TEST_REQUEST_CANCEL_AFTER_SEND_FINISH;
GBytes *body;
GError *error = NULL;
if (cancelled_by_session)
flags |= SOUP_TEST_REQUEST_CANCEL_BY_SESSION;
msg = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/");
cancellable = g_cancellable_new ();
istream = soup_test_request_send (session, msg, cancellable, flags, &error);
g_assert_no_error (error);
g_assert_nonnull (istream);
/* If we use a new cancellable to read the stream
* it shouldn't fail with cancelled error.
*/
if (!reuse_cancellable) {
g_object_unref (cancellable);
cancellable = g_cancellable_new ();
}
ostream = g_memory_output_stream_new_resizable ();
g_output_stream_splice (ostream, istream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
cancellable, &error);
if (reuse_cancellable || cancelled_by_session) {
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_CANCELLED);
g_clear_error (&error);
} else {
g_assert_no_error (error);
body = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (ostream));
g_assert_cmpstr (g_bytes_get_data (body, NULL), ==, "Hello world");
g_bytes_unref (body);
}
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, FALSE);
g_object_unref (cancellable);
g_object_unref (ostream);
g_object_unref (istream);
g_object_unref (msg);
}
static void
do_cancellation_after_send_test (Test *test, gconstpointer data)
{
do_one_cancel_after_send_request_test (test->session, TRUE, FALSE);
do_one_cancel_after_send_request_test (test->session, FALSE, FALSE);
do_one_cancel_after_send_request_test (test->session, FALSE, TRUE);
}
static void
do_post_sync_test (Test *test, gconstpointer data)
{
......@@ -956,6 +1018,10 @@ main (int argc, char **argv)
setup_session,
do_cancellation_test,
teardown_session);
g_test_add ("/http2/cancellation-after-send", Test, NULL,
setup_session,
do_cancellation_after_send_test,
teardown_session);
g_test_add ("/http2/invalid-header", Test, NULL,
setup_session,
do_invalid_header_test,
......
......@@ -564,6 +564,72 @@ do_cancel_while_reading_preemptive_req_test (void)
soup_test_session_abort_unref (session);
}
static void
do_one_cancel_after_send_request_test (SoupSession *session,
gboolean reuse_cancellable,
gboolean cancelled_by_session)
{
SoupMessage *msg;
GCancellable *cancellable;
GInputStream *istream;
GOutputStream *ostream;
guint flags = SOUP_TEST_REQUEST_CANCEL_AFTER_SEND_FINISH;
GBytes *body;
GError *error = NULL;
if (cancelled_by_session)
flags |= SOUP_TEST_REQUEST_CANCEL_BY_SESSION;
msg = soup_message_new_from_uri ("GET", base_uri);
cancellable = g_cancellable_new ();
istream = soup_test_request_send (session, msg, cancellable, flags, &error);
g_assert_no_error (error);
g_assert_nonnull (istream);
/* If we use a new cancellable to read the stream
* it shouldn't fail with cancelled error.
*/
if (!reuse_cancellable) {
g_object_unref (cancellable);
cancellable = g_cancellable_new ();
}
ostream = g_memory_output_stream_new_resizable ();
g_output_stream_splice (ostream, istream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
cancellable, &error);
if (reuse_cancellable || cancelled_by_session) {
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_CANCELLED);
g_clear_error (&error);
} else {
g_assert_no_error (error);
body = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (ostream));
g_assert_cmpstr (g_bytes_get_data (body, NULL), ==, "index");
g_bytes_unref (body);
}
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, FALSE);
g_object_unref (cancellable);
g_object_unref (ostream);
g_object_unref (istream);
g_object_unref (msg);
}
static void
do_cancel_after_send_request_tests (void)
{
SoupSession *session;
session = soup_test_session_new (NULL);
do_one_cancel_after_send_request_test (session, TRUE, FALSE);
do_one_cancel_after_send_request_test (session, FALSE, FALSE);
do_one_cancel_after_send_request_test (session, FALSE, TRUE);
soup_test_session_abort_unref (session);
}
static void
do_msg_flags_test (void)
{
......@@ -724,6 +790,7 @@ main (int argc, char **argv)
g_test_add_func ("/misc/cancel-while-reading/req/immediate", do_cancel_while_reading_immediate_req_test);
g_test_add_func ("/misc/cancel-while-reading/req/delayed", do_cancel_while_reading_delayed_req_test);
g_test_add_func ("/misc/cancel-while-reading/req/preemptive", do_cancel_while_reading_preemptive_req_test);
g_test_add_func ("/misc/cancel-after-send-request", do_cancel_after_send_request_tests);
g_test_add_func ("/misc/msg-flags", do_msg_flags_test);
g_test_add_func ("/misc/connection-id", do_connection_id_test);
g_test_add_func ("/misc/remote-address", do_remote_address_test);
......
......@@ -2,6 +2,7 @@
#include "test-utils.h"
#include "soup-misc.h"
#include "soup-session-private.h"
#include <glib/gprintf.h>
#ifdef G_OS_UNIX
......@@ -817,7 +818,10 @@ soup_test_request_send (SoupSession *session,
if (flags & SOUP_TEST_REQUEST_CANCEL_AFTER_SEND_FINISH) {
GMainContext *context;
g_cancellable_cancel (cancellable);
if (flags & SOUP_TEST_REQUEST_CANCEL_BY_SESSION)
soup_session_cancel_message (session, msg);
else
g_cancellable_cancel (cancellable);
context = g_main_loop_get_context (data.loop);
while (g_main_context_pending (context))
......
......@@ -58,6 +58,7 @@ typedef enum {
SOUP_TEST_REQUEST_CANCEL_IMMEDIATE = (1 << 1),
SOUP_TEST_REQUEST_CANCEL_PREEMPTIVE = (1 << 2),
SOUP_TEST_REQUEST_CANCEL_AFTER_SEND_FINISH = (1 << 3),
SOUP_TEST_REQUEST_CANCEL_BY_SESSION = (1 << 4)
} SoupTestRequestFlags;
SoupSession *soup_test_session_new (const char *propname, ...);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment