Commit 51c08d95 authored by Carlos Garcia Campos's avatar Carlos Garcia Campos
Browse files

Do not allow to queue the same message twice

Add a new error to indicate the message is already in the session queue.
parent 7e39ea20
Pipeline #289338 passed with stages
in 4 minutes and 55 seconds
......@@ -41,7 +41,8 @@ soup_message_queue_item_ref (SoupMessageQueueItem *item)
static void
soup_message_queue_item_destroy (SoupMessageQueueItem *item)
{
g_warn_if_fail (soup_message_get_connection (item->msg) == NULL);
if (!g_error_matches (item->error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE))
g_warn_if_fail (soup_message_get_connection (item->msg) == NULL);
g_object_unref (item->session);
g_object_unref (item->msg);
......
......@@ -200,6 +200,8 @@ static GParamSpec *properties[LAST_PROPERTY] = { NULL, };
* Location header was missing or empty in response
* @SOUP_SESSION_ERROR_REDIRECT_BAD_URI: failed to redirect message because
* Location header contains an invalid URI
* @SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE: the message is already in the
* session queue. Messages can only be reused after unqueued.
*
* A #SoupSession error.
*/
......@@ -3290,6 +3292,35 @@ async_respond_from_cache (SoupSession *session,
return FALSE;
}
static gboolean
soup_session_return_error_if_message_already_in_queue (SoupSession *session,
SoupMessage *msg,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
SoupMessageQueueItem *item;
GTask *task;
if (!soup_session_lookup_queue_item (session, msg))
return FALSE;
/* Set a new SoupMessageQueueItem in finished state as task data for
* soup_session_get_async_result_message() and soup_session_send_finish().
*/
item = soup_message_queue_item_new (session, msg, TRUE, cancellable);
item->state = SOUP_MESSAGE_FINISHED;
item->error = g_error_new_literal (SOUP_SESSION_ERROR,
SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE,
_("Message is already in session queue"));
task = g_task_new (session, cancellable, callback, user_data);
g_task_set_task_data (task, item, (GDestroyNotify)soup_message_queue_item_unref);
g_task_return_error (task, g_error_copy (item->error));
g_object_unref (task);
return TRUE;
}
/**
* soup_session_send_async:
* @session: a #SoupSession
......@@ -3320,6 +3351,9 @@ soup_session_send_async (SoupSession *session,
g_return_if_fail (SOUP_IS_SESSION (session));
if (soup_session_return_error_if_message_already_in_queue (session, msg, cancellable, callback, user_data))
return;
item = soup_session_append_queue_item (session, msg, TRUE, cancellable);
item->io_priority = io_priority;
g_signal_connect (msg, "restarted",
......@@ -3364,13 +3398,15 @@ soup_session_send_finish (SoupSession *session,
if (g_task_had_error (task)) {
SoupMessageQueueItem *item = g_task_get_task_data (task);
if (soup_message_io_in_progress (item->msg))
soup_message_io_finished (item->msg);
else if (item->state != SOUP_MESSAGE_FINISHED)
item->state = SOUP_MESSAGE_FINISHING;
if (!g_error_matches (item->error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE)) {
if (soup_message_io_in_progress (item->msg))
soup_message_io_finished (item->msg);
else if (item->state != SOUP_MESSAGE_FINISHED)
item->state = SOUP_MESSAGE_FINISHING;
if (item->state != SOUP_MESSAGE_FINISHED)
soup_session_process_queue_item (session, item, NULL, FALSE);
if (item->state != SOUP_MESSAGE_FINISHED)
soup_session_process_queue_item (session, item, NULL, FALSE);
}
}
return g_task_propagate_pointer (task, error);
......@@ -3421,6 +3457,14 @@ soup_session_send (SoupSession *session,
g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
if (soup_session_lookup_queue_item (session, msg)) {
g_set_error_literal (error,
SOUP_SESSION_ERROR,
SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE,
_("Message is already in session queue"));
return NULL;
}
item = soup_session_append_queue_item (session, msg, FALSE, cancellable);
while (!stream) {
......@@ -3869,6 +3913,9 @@ soup_session_websocket_connect_async (SoupSession *session,
g_return_if_fail (SOUP_IS_SESSION (session));
g_return_if_fail (SOUP_IS_MESSAGE (msg));
if (soup_session_return_error_if_message_already_in_queue (session, msg, cancellable, callback, user_data))
return;
supported_extensions = soup_session_get_supported_websocket_extensions_for_message (session, msg);
soup_websocket_client_prepare_handshake (msg, origin, protocols, supported_extensions);
......@@ -3987,6 +4034,9 @@ soup_session_preconnect_async (SoupSession *session,
g_return_if_fail (SOUP_IS_SESSION (session));
g_return_if_fail (SOUP_IS_MESSAGE (msg));
if (soup_session_return_error_if_message_already_in_queue (session, msg, cancellable, callback, user_data))
return;
item = soup_session_append_queue_item (session, msg, TRUE, cancellable);
item->connect_only = TRUE;
item->io_priority = io_priority;
......
......@@ -43,7 +43,8 @@ typedef enum {
SOUP_SESSION_ERROR_TOO_MANY_REDIRECTS,
SOUP_SESSION_ERROR_TOO_MANY_RESTARTS,
SOUP_SESSION_ERROR_REDIRECT_NO_LOCATION,
SOUP_SESSION_ERROR_REDIRECT_BAD_URI
SOUP_SESSION_ERROR_REDIRECT_BAD_URI,
SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE,
} SoupSessionError;
SOUP_AVAILABLE_IN_ALL
......
......@@ -255,6 +255,22 @@ reuse_test_authenticate (SoupMessage *msg,
return TRUE;
}
static void
reuse_preconnect_finished (SoupSession *session,
GAsyncResult *result,
GError **error)
{
g_assert_false (soup_session_preconnect_finish (session, result, error));
}
static void
reuse_websocket_connect_finished (SoupSession *session,
GAsyncResult *result,
GError **error)
{
g_assert_false (soup_session_websocket_connect_finish (session, result, error));
}
static void
do_msg_reuse_test (void)
{
......@@ -263,6 +279,8 @@ do_msg_reuse_test (void)
GBytes *body;
GUri *uri;
guint *signal_ids, n_signal_ids;
GInputStream *stream;
GError *error = NULL;
g_test_bug ("559054");
......@@ -303,6 +321,41 @@ do_msg_reuse_test (void)
ensure_no_signal_handlers (msg, signal_ids, n_signal_ids);
g_bytes_unref (body);
debug_printf (1, " Reuse before finishing\n");
msg = soup_message_new_from_uri ("GET", base_uri);
stream = soup_test_request_send (session, msg, NULL, 0, &error);
g_assert_no_error (error);
g_assert_null (soup_test_request_send (session, msg, NULL, 0, &error));
g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
g_clear_error (&error);
g_assert_null (soup_test_session_async_send (session, msg, NULL, &error));
g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
g_clear_error (&error);
g_assert_null (soup_session_send (session, msg, NULL, &error));
g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
g_clear_error (&error);
g_assert_null (soup_session_send_and_read (session, msg, NULL, &error));
g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
g_clear_error (&error);
soup_session_preconnect_async (session, msg, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback)reuse_preconnect_finished, &error);
while (error == NULL)
g_main_context_iteration (NULL, TRUE);
g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
g_clear_error (&error);
soup_session_websocket_connect_async (session, msg, NULL, NULL, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback)reuse_websocket_connect_finished, &error);
while (error == NULL)
g_main_context_iteration (NULL, TRUE);
g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
g_clear_error (&error);
g_object_unref (stream);
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, FALSE);
ensure_no_signal_handlers (msg, signal_ids, n_signal_ids);
soup_test_session_abort_unref (session);
g_object_unref (msg);
g_free (signal_ids);
......
......@@ -379,6 +379,7 @@ typedef struct {
GBytes *body;
GError *error;
gboolean done;
gboolean message_finished;
} SendAsyncData;
static void
......@@ -389,13 +390,15 @@ send_and_read_async_ready_cb (SoupSession *session,
data->done = TRUE;
g_assert_true (soup_session_get_async_result_message (session, result) == data->msg);
data->body = soup_session_send_and_read_finish (session, result, &data->error);
if (g_error_matches (data->error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE))
data->message_finished = TRUE;
}
static void
on_message_finished (SoupMessage *msg,
gboolean *message_finished)
on_message_finished (SoupMessage *msg,
SendAsyncData *data)
{
*message_finished = TRUE;
data->message_finished = TRUE;
}
GBytes *
......@@ -404,18 +407,17 @@ soup_test_session_async_send (SoupSession *session,
GCancellable *cancellable,
GError **error)
{
gboolean message_finished = FALSE;
GMainContext *async_context = g_main_context_ref_thread_default ();
gulong signal_id;
SendAsyncData data = { msg, NULL, NULL, FALSE };
SendAsyncData data = { msg, NULL, NULL, FALSE, FALSE };
signal_id = g_signal_connect (msg, "finished",
G_CALLBACK (on_message_finished), &message_finished);
G_CALLBACK (on_message_finished), &data);
soup_session_send_and_read_async (session, msg, G_PRIORITY_DEFAULT, cancellable,
(GAsyncReadyCallback)send_and_read_async_ready_cb, &data);
while (!data.done || !message_finished)
while (!data.done || !data.message_finished)
g_main_context_iteration (async_context, TRUE);
g_signal_handler_disconnect (msg, signal_id);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment