Commit 1a4c60ae authored by Carlos Garcia Campos's avatar Carlos Garcia Campos
Browse files

http2: add support for content sniffing

parent f89b9315
Pipeline #284862 failed with stages
in 5 minutes and 30 seconds
......@@ -53,6 +53,7 @@ typedef enum {
STATE_WRITE_DATA,
STATE_WRITE_DONE,
STATE_READ_HEADERS,
STATE_READ_DATA_START,
STATE_READ_DATA,
STATE_READ_DONE,
} SoupHTTP2IOState;
......@@ -172,6 +173,8 @@ state_to_string (SoupHTTP2IOState state)
return "WRITE_DONE";
case STATE_READ_HEADERS:
return "READ_HEADERS";
case STATE_READ_DATA_START:
return "READ_DATA_START";
case STATE_READ_DATA:
return "READ_DATA";
case STATE_READ_DONE:
......@@ -332,23 +335,23 @@ on_begin_frame_callback (nghttp2_session *session,
if (data->state < STATE_READ_HEADERS)
advance_state_from (data, STATE_WRITE_DONE, STATE_READ_HEADERS);
break;
case NGHTTP2_DATA: {
if (data->state < STATE_READ_DATA)
advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DATA);
if (!data->body_istream) {
case NGHTTP2_DATA:
if (data->state < STATE_READ_DATA_START) {
g_assert (!data->body_istream);
data->body_istream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (data->io->istream));
g_signal_connect (data->body_istream, "need-more-data",
G_CALLBACK (memory_stream_need_more_data_callback), data);
}
if (!data->decoded_data_istream)
g_assert (!data->decoded_data_istream);
data->decoded_data_istream = soup_session_setup_message_body_input_stream (data->item->session,
data->msg,
data->body_istream,
SOUP_STAGE_MESSAGE_BODY);
advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DATA_START);
}
break;
}
}
return 0;
}
......@@ -406,7 +409,6 @@ on_frame_recv_callback (nghttp2_session *session,
if (SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (data->msg))) {
soup_message_got_informational (data->msg);
soup_message_cleanup_response (data->msg);
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (data->body_istream));
advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DONE);
return 0;
}
......@@ -422,7 +424,7 @@ on_frame_recv_callback (nghttp2_session *session,
case NGHTTP2_DATA:
if (data->metrics)
data->metrics->response_body_bytes_received += frame->data.hd.length + FRAME_HEADER_SIZE;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && data->body_istream)
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (data->body_istream));
break;
case NGHTTP2_RST_STREAM:
......@@ -1021,6 +1023,8 @@ soup_client_message_io_http2_get_source (SoupMessage *msg,
base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session))
base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
else if (data->state < STATE_READ_DONE && data->decoded_data_istream)
base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (data->decoded_data_istream), cancellable);
else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (io->session))
base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
else {
......@@ -1126,17 +1130,43 @@ io_write (SoupClientMessageIOHTTP2 *io,
return TRUE;
}
static void
io_try_sniff_content (SoupHTTP2MessageData *data,
gboolean blocking,
GCancellable *cancellable)
{
GError *error = NULL;
if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable, &error)) {
h2_debug (data->io, data, "[DATA] Sniffed content");
advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
} else {
h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
g_clear_error (&error);
}
}
static gboolean
io_read_or_write (SoupHTTP2MessageData *data,
gboolean blocking,
GCancellable *cancellable,
GError **error)
io_run (SoupHTTP2MessageData *data,
gboolean blocking,
GCancellable *cancellable,
GError **error)
{
gboolean progress = FALSE;
if (data->state == STATE_READ_DATA_START)
io_try_sniff_content (data, blocking, cancellable);
if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
progress = io_write (data->io, blocking, cancellable, error);
else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session)) {
progress = io_read (data->io, blocking, cancellable, error);
if (progress && data->state == STATE_READ_DATA_START)
io_try_sniff_content (data, blocking, cancellable);
}
return progress;
}
......@@ -1163,9 +1193,8 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
g_object_ref (msg);
while (progress && get_io_data (msg) == io && !data->paused && data->state < state) {
progress = io_read_or_write (data, blocking, cancellable, &my_error);
}
while (progress && get_io_data (msg) == io && !data->paused && data->state < state)
progress = io_run (data, blocking, cancellable, &my_error);
if (my_error) {
g_propagate_error (error, my_error);
......
......@@ -116,7 +116,7 @@ read_stream_to_bytes_sync (GInputStream *stream)
NULL, &error);
g_assert_no_error (error);
g_assert_cmpint (read, >, 0);
g_assert_cmpint (read, >=, 0);
GBytes *bytes = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (out));
g_object_unref (out);
......@@ -605,6 +605,113 @@ do_invalid_header_test (Test *test, gconstpointer data)
}
}
static void
content_sniffed (SoupMessage *msg,
char *content_type,
GHashTable *params)
{
soup_test_assert (g_object_get_data (G_OBJECT (msg), "got-chunk") == NULL,
"got-chunk got emitted before content-sniffed");
g_object_set_data (G_OBJECT (msg), "content-sniffed", GINT_TO_POINTER (TRUE));
}
static void
got_headers (SoupMessage *msg)
{
soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
"content-sniffed got emitted before got-headers");
g_object_set_data (G_OBJECT (msg), "got-headers", GINT_TO_POINTER (TRUE));
}
static void
sniffer_test_send_ready_cb (SoupSession *session,
GAsyncResult *result,
GInputStream **stream)
{
GError *error = NULL;
*stream = soup_session_send_finish (session, result, &error);
g_assert_no_error (error);
g_assert_nonnull (*stream);
}
static void
do_one_sniffer_test (SoupSession *session,
const char *uri,
gsize expected_size,
gboolean should_sniff,
GMainContext *async_context)
{
SoupMessage *msg;
GInputStream *stream = NULL;
GBytes *bytes;
msg = soup_message_new (SOUP_METHOD_GET, uri);
g_object_connect (msg,
"signal::got-headers", got_headers, NULL,
"signal::content-sniffed", content_sniffed, NULL,
NULL);
if (async_context) {
soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback)sniffer_test_send_ready_cb,
&stream);
while (!stream)
g_main_context_iteration (async_context, TRUE);
} else {
GError *error = NULL;
stream = soup_session_send (session, msg, NULL, &error);
g_assert_no_error (error);
g_assert_nonnull (stream);
}
if (should_sniff) {
soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") != NULL,
"content-sniffed did not get emitted");
} else {
soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
"content-sniffed got emitted without a sniffer");
}
bytes = read_stream_to_bytes_sync (stream);
g_assert_cmpuint (g_bytes_get_size (bytes), ==, expected_size);
g_object_unref (stream);
g_bytes_unref (bytes);
g_object_unref (msg);
}
static void
do_sniffer_async_test (Test *test, gconstpointer data)
{
GMainContext *async_context = g_main_context_ref_thread_default ();
soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/", 11, TRUE, async_context);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE, async_context);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content", 0, FALSE, async_context);
while (g_main_context_pending (async_context))
g_main_context_iteration (async_context, FALSE);
g_main_context_unref (async_context);
}
static void
do_sniffer_sync_test (Test *test, gconstpointer data)
{
soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/", 11, TRUE, NULL);
/* FIXME: large seems to be broken in sync mode */
/* do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE, NULL); */
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content", 0, FALSE, NULL);
}
int
main (int argc, char **argv)
{
......@@ -691,8 +798,14 @@ main (int argc, char **argv)
setup_session,
do_invalid_header_test,
teardown_session);
g_test_add ("/http2/sniffer/async", Test, NULL,
setup_session,
do_sniffer_async_test,
teardown_session);
g_test_add ("/http2/sniffer/sync", Test, NULL,
setup_session,
do_sniffer_sync_test,
teardown_session);
ret = g_test_run ();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment