Commit 82ec4dca authored by Dan Winship's avatar Dan Winship

gio: implement GPollableInput/OutputStream in more stream types

Implement GPollableInputStream in GMemoryInputStream and
GConverterInputStream, and likewise implement GPollableOutputStream in
the corresponding output streams.

https://bugzilla.gnome.org/show_bug.cgi?id=673997
parent 111ba203
......@@ -25,6 +25,7 @@
#include <string.h>
#include "gconverterinputstream.h"
#include "gpollableinputstream.h"
#include "gsimpleasyncresult.h"
#include "gcancellable.h"
#include "gioenumtypes.h"
......@@ -41,6 +42,8 @@
* Converter input stream implements #GInputStream and allows
* conversion of data of various types during reading.
*
* As of GLib 2.34, #GConverterInputStream implements
* #GPollableInputStream.
**/
#define INITIAL_BUFFER_SIZE 4096
......@@ -55,6 +58,7 @@ typedef struct {
struct _GConverterInputStreamPrivate {
gboolean at_input_end;
gboolean finished;
gboolean need_input;
GConverter *converter;
Buffer input_buffer;
Buffer converted_buffer;
......@@ -80,9 +84,24 @@ static gssize g_converter_input_stream_read (GInputStream *stream,
GCancellable *cancellable,
GError **error);
G_DEFINE_TYPE (GConverterInputStream,
g_converter_input_stream,
G_TYPE_FILTER_INPUT_STREAM)
static gboolean g_converter_input_stream_can_poll (GPollableInputStream *stream);
static gboolean g_converter_input_stream_is_readable (GPollableInputStream *stream);
static gssize g_converter_input_stream_read_nonblocking (GPollableInputStream *stream,
void *buffer,
gsize size,
GError **error);
static GSource *g_converter_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
static void g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
G_DEFINE_TYPE_WITH_CODE (GConverterInputStream,
g_converter_input_stream,
G_TYPE_FILTER_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
g_converter_input_stream_pollable_iface_init);
)
static void
g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
......@@ -112,6 +131,15 @@ g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
}
static void
g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
iface->can_poll = g_converter_input_stream_can_poll;
iface->is_readable = g_converter_input_stream_is_readable;
iface->read_nonblocking = g_converter_input_stream_read_nonblocking;
iface->create_source = g_converter_input_stream_create_source;
}
static void
g_converter_input_stream_finalize (GObject *object)
{
......@@ -320,6 +348,7 @@ buffer_ensure_space (Buffer *buffer,
static gssize
fill_input_buffer (GConverterInputStream *stream,
gsize at_least_size,
gboolean blocking,
GCancellable *cancellable,
GError **error)
{
......@@ -332,25 +361,30 @@ fill_input_buffer (GConverterInputStream *stream,
buffer_ensure_space (&priv->input_buffer, at_least_size);
base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
nread = g_input_stream_read (base_stream,
priv->input_buffer.data + priv->input_buffer.end,
buffer_tailspace (&priv->input_buffer),
cancellable,
error);
nread = g_pollable_stream_read (base_stream,
priv->input_buffer.data + priv->input_buffer.end,
buffer_tailspace (&priv->input_buffer),
blocking,
cancellable,
error);
if (nread > 0)
priv->input_buffer.end += nread;
{
priv->input_buffer.end += nread;
priv->need_input = FALSE;
}
return nread;
}
static gssize
g_converter_input_stream_read (GInputStream *stream,
void *buffer,
gsize count,
GCancellable *cancellable,
GError **error)
read_internal (GInputStream *stream,
void *buffer,
gsize count,
gboolean blocking,
GCancellable *cancellable,
GError **error)
{
GConverterInputStream *cstream;
GConverterInputStreamPrivate *priv;
......@@ -389,7 +423,7 @@ g_converter_input_stream_read (GInputStream *stream,
total_bytes_read == 0 &&
!priv->at_input_end)
{
nread = fill_input_buffer (cstream, count, cancellable, error);
nread = fill_input_buffer (cstream, count, blocking, cancellable, error);
if (nread < 0)
return -1;
if (nread == 0)
......@@ -497,6 +531,7 @@ g_converter_input_stream_read (GInputStream *stream,
my_error2 = NULL;
nread = fill_input_buffer (cstream,
buffer_data_size (&priv->input_buffer) + 4096,
blocking,
cancellable,
&my_error2);
if (nread < 0)
......@@ -504,6 +539,7 @@ g_converter_input_stream_read (GInputStream *stream,
/* Can't read any more data, return that error */
g_error_free (my_error);
g_propagate_error (error, my_error2);
priv->need_input = TRUE;
return -1;
}
else if (nread == 0)
......@@ -536,6 +572,70 @@ g_converter_input_stream_read (GInputStream *stream,
g_assert_not_reached ();
}
static gssize
g_converter_input_stream_read (GInputStream *stream,
void *buffer,
gsize count,
GCancellable *cancellable,
GError **error)
{
return read_internal (stream, buffer, count, TRUE, cancellable, error);
}
static gboolean
g_converter_input_stream_can_poll (GPollableInputStream *stream)
{
GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
return (G_IS_POLLABLE_INPUT_STREAM (base_stream) &&
g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream)));
}
static gboolean
g_converter_input_stream_is_readable (GPollableInputStream *stream)
{
GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
GConverterInputStream *cstream = G_CONVERTER_INPUT_STREAM (stream);
if (buffer_data_size (&cstream->priv->converted_buffer))
return TRUE;
else if (buffer_data_size (&cstream->priv->input_buffer) &&
!cstream->priv->need_input)
return TRUE;
else
return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (base_stream));
}
static gssize
g_converter_input_stream_read_nonblocking (GPollableInputStream *stream,
void *buffer,
gsize count,
GError **error)
{
return read_internal (G_INPUT_STREAM (stream), buffer, count,
FALSE, NULL, error);
}
static GSource *
g_converter_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable)
{
GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
GSource *base_source, *pollable_source;
if (g_pollable_input_stream_is_readable (stream))
base_source = g_timeout_source_new (0);
else
base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (base_stream), NULL);
pollable_source = g_pollable_source_new_full (stream, base_source,
cancellable);
g_source_unref (base_source);
return pollable_source;
}
/**
* g_converter_input_stream_get_converter:
* @converter_stream: a #GConverterInputStream
......
......@@ -25,6 +25,7 @@
#include <string.h>
#include "gconverteroutputstream.h"
#include "gpollableoutputstream.h"
#include "gsimpleasyncresult.h"
#include "gcancellable.h"
#include "gioenumtypes.h"
......@@ -41,6 +42,8 @@
* Converter output stream implements #GOutputStream and allows
* conversion of data of various types during reading.
*
* As of GLib 2.34, #GConverterOutputStream implements
* #GPollableOutputStream.
**/
#define INITIAL_BUFFER_SIZE 4096
......@@ -96,9 +99,24 @@ static gboolean g_converter_output_stream_flush (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
G_DEFINE_TYPE (GConverterOutputStream,
g_converter_output_stream,
G_TYPE_FILTER_OUTPUT_STREAM)
static gboolean g_converter_output_stream_can_poll (GPollableOutputStream *stream);
static gboolean g_converter_output_stream_is_writable (GPollableOutputStream *stream);
static gssize g_converter_output_stream_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize size,
GError **error);
static GSource *g_converter_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable);
static void g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
G_DEFINE_TYPE_WITH_CODE (GConverterOutputStream,
g_converter_output_stream,
G_TYPE_FILTER_OUTPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
g_converter_output_stream_pollable_iface_init);
)
static void
g_converter_output_stream_class_init (GConverterOutputStreamClass *klass)
......@@ -129,6 +147,15 @@ g_converter_output_stream_class_init (GConverterOutputStreamClass *klass)
}
static void
g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
{
iface->can_poll = g_converter_output_stream_can_poll;
iface->is_writable = g_converter_output_stream_is_writable;
iface->write_nonblocking = g_converter_output_stream_write_nonblocking;
iface->create_source = g_converter_output_stream_create_source;
}
static void
g_converter_output_stream_finalize (GObject *object)
{
......@@ -339,7 +366,7 @@ buffer_append (Buffer *buffer,
static gboolean
flush_buffer (GConverterOutputStream *stream,
Buffer *buffer,
gboolean blocking,
GCancellable *cancellable,
GError **error)
{
......@@ -356,12 +383,13 @@ flush_buffer (GConverterOutputStream *stream,
available = buffer_data_size (&priv->converted_buffer);
if (available > 0)
{
res = g_output_stream_write_all (base_stream,
buffer_data (&priv->converted_buffer),
available,
&nwritten,
cancellable,
error);
res = g_pollable_stream_write_all (base_stream,
buffer_data (&priv->converted_buffer),
available,
blocking,
&nwritten,
cancellable,
error);
buffer_consumed (&priv->converted_buffer, nwritten);
return res;
}
......@@ -370,11 +398,12 @@ flush_buffer (GConverterOutputStream *stream,
static gssize
g_converter_output_stream_write (GOutputStream *stream,
const void *buffer,
gsize count,
GCancellable *cancellable,
GError **error)
write_internal (GOutputStream *stream,
const void *buffer,
gsize count,
gboolean blocking,
GCancellable *cancellable,
GError **error)
{
GConverterOutputStream *cstream;
GConverterOutputStreamPrivate *priv;
......@@ -392,7 +421,7 @@ g_converter_output_stream_write (GOutputStream *stream,
/* Write out all available pre-converted data and fail if
not possible */
if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
if (!flush_buffer (cstream, blocking, cancellable, error))
return -1;
if (priv->finished)
......@@ -499,11 +528,21 @@ g_converter_output_stream_write (GOutputStream *stream,
even if writing this to the base stream fails. If it does we'll just
stop early and report this error when we try again on the next
write call. */
flush_buffer (cstream, &priv->converted_buffer, cancellable, NULL);
flush_buffer (cstream, blocking, cancellable, NULL);
return retval;
}
static gssize
g_converter_output_stream_write (GOutputStream *stream,
const void *buffer,
gsize count,
GCancellable *cancellable,
GError **error)
{
return write_internal (stream, buffer, count, TRUE, cancellable, error);
}
static gboolean
g_converter_output_stream_flush (GOutputStream *stream,
GCancellable *cancellable,
......@@ -525,7 +564,7 @@ g_converter_output_stream_flush (GOutputStream *stream,
/* Write out all available pre-converted data and fail if
not possible */
if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
if (!flush_buffer (cstream, TRUE, cancellable, error))
return FALSE;
/* Ensure we have *some* initial target space */
......@@ -590,12 +629,54 @@ g_converter_output_stream_flush (GOutputStream *stream,
}
/* Now write all converted data to base stream */
if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
if (!flush_buffer (cstream, TRUE, cancellable, error))
return FALSE;
return TRUE;
}
static gboolean
g_converter_output_stream_can_poll (GPollableOutputStream *stream)
{
GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
return (G_IS_POLLABLE_OUTPUT_STREAM (base_stream) &&
g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (base_stream)));
}
static gboolean
g_converter_output_stream_is_writable (GPollableOutputStream *stream)
{
GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
return g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (base_stream));
}
static gssize
g_converter_output_stream_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize count,
GError **error)
{
return write_internal (G_OUTPUT_STREAM (stream), buffer, count, FALSE,
NULL, error);
}
static GSource *
g_converter_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable)
{
GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
GSource *base_source, *pollable_source;
base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (base_stream), NULL);
pollable_source = g_pollable_source_new_full (stream, base_source,
cancellable);
g_source_unref (base_source);
return pollable_source;
}
/**
* g_converter_output_stream_get_converter:
* @converter_stream: a #GConverterOutputStream
......
......@@ -22,6 +22,7 @@
#include "config.h"
#include "gmemoryinputstream.h"
#include "gpollableinputstream.h"
#include "ginputstream.h"
#include "gseekable.h"
#include "string.h"
......@@ -39,6 +40,8 @@
* #GMemoryInputStream is a class for using arbitrary
* memory chunks as input for GIO streaming input operations.
*
* As of GLib 2.34, #GMemoryInputStream implements
* #GPollableInputStream.
*/
typedef struct _Chunk Chunk;
......@@ -108,11 +111,20 @@ static gboolean g_memory_input_stream_truncate (GSeekable *seek
goffset offset,
GCancellable *cancellable,
GError **error);
static void g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
static gboolean g_memory_input_stream_is_readable (GPollableInputStream *stream);
static GSource *g_memory_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
static void g_memory_input_stream_finalize (GObject *object);
G_DEFINE_TYPE_WITH_CODE (GMemoryInputStream, g_memory_input_stream, G_TYPE_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
g_memory_input_stream_seekable_iface_init))
g_memory_input_stream_seekable_iface_init);
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
g_memory_input_stream_pollable_iface_init);
)
static void
......@@ -174,6 +186,13 @@ g_memory_input_stream_seekable_iface_init (GSeekableIface *iface)
iface->truncate_fn = g_memory_input_stream_truncate;
}
static void
g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
iface->is_readable = g_memory_input_stream_is_readable;
iface->create_source = g_memory_input_stream_create_source;
}
static void
g_memory_input_stream_init (GMemoryInputStream *stream)
{
......@@ -526,3 +545,23 @@ g_memory_input_stream_truncate (GSeekable *seekable,
_("Cannot truncate GMemoryInputStream"));
return FALSE;
}
static gboolean
g_memory_input_stream_is_readable (GPollableInputStream *stream)
{
return TRUE;
}
static GSource *
g_memory_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable)
{
GSource *base_source, *pollable_source;
base_source = g_timeout_source_new (0);
pollable_source = g_pollable_source_new_full (stream, base_source,
cancellable);
g_source_unref (base_source);
return pollable_source;
}
......@@ -25,6 +25,7 @@
#include "config.h"
#include "gmemoryoutputstream.h"
#include "goutputstream.h"
#include "gpollableoutputstream.h"
#include "gseekable.h"
#include "gsimpleasyncresult.h"
#include "gioerror.h"
......@@ -41,6 +42,8 @@
* #GMemoryOutputStream is a class for using arbitrary
* memory chunks as output for GIO streaming output operations.
*
* As of GLib 2.34, #GMemoryOutputStream implements
* #GPollableOutputStream.
*/
#define MIN_ARRAY_SIZE 16
......@@ -119,9 +122,17 @@ static gboolean g_memory_output_stream_truncate (GSeekable *see
GCancellable *cancellable,
GError **error);
static gboolean g_memory_output_stream_is_writable (GPollableOutputStream *stream);
static GSource *g_memory_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable);
static void g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
G_DEFINE_TYPE_WITH_CODE (GMemoryOutputStream, g_memory_output_stream, G_TYPE_OUTPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
g_memory_output_stream_seekable_iface_init))
g_memory_output_stream_seekable_iface_init);
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
g_memory_output_stream_pollable_iface_init))
static void
......@@ -224,6 +235,13 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass)
G_PARAM_STATIC_STRINGS));
}
static void
g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
{
iface->is_writable = g_memory_output_stream_is_writable;
iface->create_source = g_memory_output_stream_create_source;
}
static void
g_memory_output_stream_set_property (GObject *object,
guint prop_id,
......@@ -800,3 +818,23 @@ g_memory_output_stream_truncate (GSeekable *seekable,
return TRUE;
}
static gboolean
g_memory_output_stream_is_writable (GPollableOutputStream *stream)
{
return TRUE;
}
static GSource *
g_memory_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable)
{
GSource *base_source, *pollable_source;
base_source = g_timeout_source_new (0);
pollable_source = g_pollable_source_new_full (stream, base_source,
cancellable);
g_source_unref (base_source);
return pollable_source;
}
......@@ -724,6 +724,219 @@ test_charset (gconstpointer data)
g_object_unref (conv);
}
static void
client_connected (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GSocketClient *client = G_SOCKET_CLIENT (source);
GSocketConnection **conn = user_data;
GError *error = NULL;
*conn = g_socket_client_connect_finish (client, result, &error);
g_assert_no_error (error);
}
static void
server_connected (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GSocketListener *listener = G_SOCKET_LISTENER (source);
GSocketConnection **conn = user_data;
GError *error = NULL;
*conn = g_socket_listener_accept_finish (listener, result, NULL, &error);
g_assert_no_error (error);
}
static void
make_socketpair (GIOStream **left,
GIOStream **right)
{
GInetAddress *iaddr;
GSocketAddress *saddr, *effective_address;
GSocketListener *listener;
GSocketClient *client;
GError *error = NULL;
GSocketConnection *client_conn = NULL, *server_conn = NULL;
iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
saddr = g_inet_socket_address_new (iaddr, 0);
g_object_unref (iaddr);
listener = g_socket_listener_new ();
g_socket_listener_add_address (listener, saddr,
G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_TCP,
NULL,
&effective_address,
&error);
g_assert_no_error (error);
g_object_unref (saddr);
client = g_socket_client_new ();
g_socket_client_connect_async (client,
G_SOCKET_CONNECTABLE (effective_address),
NULL, client_connected, &client_conn);
g_socket_listener_accept_async (listener, NULL,
server_connected, &server_conn);
while (!client_conn || !server_conn)
g_main_context_iteration (NULL, TRUE);
g_object_unref (client);
g_object_unref (listener);
g_object_unref (effective_address);
*left = G_IO_STREAM (client_conn);
*right = G_IO_STREAM (server_conn);
}
static void
test_converter_pollable (void)
{
GIOStream *left, *right;
guint8 *converted, *inptr;
guint8 *expanded, *outptr, *expanded_end;
gsize n_read, expanded_size;
gsize total_read;
gssize res;
gboolean is_readable;
GConverterResult cres;
GInputStream *cstream;
GPollableInputStream *pollable_in;
GOutputStream *socket_out, *mem_out, *cstream_out;
GPollableOutputStream *pollable_out;
GConverter *expander, *compressor;
GError *error;
int i;
expander = g_expander_converter_new ();
expanded = g_malloc (100*1000); /* Large enough */
cres = g_converter_convert (expander,
unexpanded_data, sizeof(unexpanded_data),
expanded, 100*1000,
G_CONVERTER_INPUT_AT_END,
&n_read, &expanded_size, NULL);
g_assert (cres == G_CONVERTER_FINISHED);
g_assert (n_read == 11);
g_assert (expanded_size == 41030);
expanded_end = expanded + expanded_size;
make_socketpair (&left, &right);
compressor = g_compressor_converter_new ();
converted = g_malloc (100*1000); /* Large enough */
cstream = g_converter_input_stream_new (g_io_stream_get_input_stream (left),
compressor);
pollable_in = G_POLLABLE_INPUT_STREAM (cstream);
g_assert (g_pollable_input_stream_can_poll (pollable_in));
socket_out = g_io_stream_get_output_stream (right);
total_read = 0;
outptr = expanded;
inptr = converted;
while (TRUE)
{
error = NULL;
if (outptr < expanded_end)
{
res = g_output_stream_write (socket_out,
outptr,
MIN (1000, (expanded_end - outptr)),
NULL, &error);
g_assert_cmpint (res, >, 0);
outptr += res;
}
else if (socket_out)
{
g_object_unref (right);
socket_out = NULL;
}
is_readable = g_pollable_input_stream_is_readable (pollable_in);
res = g_pollable_input_stream_read_nonblocking (pollable_in,
inptr, 1,
NULL, &error);
/* is_readable can be a false positive, but not a false negative */
if (!is_readable)
g_assert_cmpint (res, ==, -1);
/* After closing the write end, we can't get WOULD_BLOCK any more */
if (!socket_out)
g_assert_cmpint (res, !=, -1);
if (res == -1)
{
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
g_error_free (error);
continue;
}
if (res == 0)
break;
inptr += res;
total_read += res;
}
g_assert (total_read == n_read - 1); /* Last 2 zeros are combined */
g_assert (memcmp (converted, unexpanded_data, total_read) == 0);
g_object_unref (cstream);
g_object_unref (left);
g_converter_reset (compressor);
/* This doesn't actually test the behavior on
* G_IO_ERROR_WOULD_BLOCK; to do that we'd need to implement a
* custom GOutputStream that we could control blocking on.
*/
mem_out = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
cstream_out = g_converter_output_stream_new (mem_out, compressor);
g_object_unref (mem_out);
pollable_out = G_POLLABLE_OUTPUT_STREAM (cstream_out);
for (i = 0; i < expanded_size; i++)
{
error = NULL;
res = g_pollable_output_stream_write_nonblocking (pollable_out,
expanded + i,