Commit 00ee06e6 authored by Dan Winship's avatar Dan Winship

gio: use GPollable* to implement fallback read_async/write_async

If a GInputStream does not provide a read_async() implementation, but
does implement GPollableInputStream, then instead of doing
read-synchronously-in-a-thread, just use
g_pollable_input_stream_read_nonblocking() and
g_pollable_input_stream_create_source() to implement an async read in
the same thread. Similarly for GOutputStream.

Remove a bunch of existing read_async()/write_async() implementations
that are basically equivalent to the new fallback method.

https://bugzilla.gnome.org/show_bug.cgi?id=673997
parent 82ec4dca
......@@ -100,16 +100,6 @@ static gssize g_buffered_input_stream_read (GInputStream *s
gsize count,
GCancellable *cancellable,
GError **error);
static void g_buffered_input_stream_read_async (GInputStream *stream,
void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
static gssize g_buffered_input_stream_read_finish (GInputStream *stream,
GAsyncResult *result,
GError **error);
static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
gssize count,
GCancellable *cancellable,
......@@ -150,8 +140,6 @@ g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
istream_class->skip_async = g_buffered_input_stream_skip_async;
istream_class->skip_finish = g_buffered_input_stream_skip_finish;
istream_class->read_fn = g_buffered_input_stream_read;
istream_class->read_async = g_buffered_input_stream_read_async;
istream_class->read_finish = g_buffered_input_stream_read_finish;
bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
bstream_class->fill = g_buffered_input_stream_real_fill;
......@@ -1017,189 +1005,6 @@ g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
return nread;
}
typedef struct
{
gssize bytes_read;
gssize count;
void *buffer;
} ReadAsyncData;
static void
free_read_async_data (gpointer _data)
{
ReadAsyncData *data = _data;
g_slice_free (ReadAsyncData, data);
}
static void
large_read_callback (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
ReadAsyncData *data;
GError *error;
gssize nread;
data = g_simple_async_result_get_op_res_gpointer (simple);
error = NULL;
nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
result, &error);
/* Only report the error if we've not already read some data */
if (nread < 0 && data->bytes_read == 0)
g_simple_async_result_take_error (simple, error);
else if (error)
g_error_free (error);
if (nread > 0)
data->bytes_read += nread;
/* Complete immediately, not in idle, since we're already
* in a mainloop callout
*/
g_simple_async_result_complete (simple);
g_object_unref (simple);
}
static void
read_fill_buffer_callback (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
GBufferedInputStream *bstream;
GBufferedInputStreamPrivate *priv;
ReadAsyncData *data;
GError *error;
gssize nread;
gsize available;
bstream = G_BUFFERED_INPUT_STREAM (source_object);
priv = bstream->priv;
data = g_simple_async_result_get_op_res_gpointer (simple);
error = NULL;
nread = g_buffered_input_stream_fill_finish (bstream,
result, &error);
if (nread < 0 && data->bytes_read == 0)
g_simple_async_result_take_error (simple, error);
else if (error)
g_error_free (error);
if (nread > 0)
{
available = priv->end - priv->pos;
data->count = MIN (data->count, available);
memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
data->bytes_read += data->count;
priv->pos += data->count;
}
/* Complete immediately, not in idle, since we're already
* in a mainloop callout
*/
g_simple_async_result_complete (simple);
g_object_unref (simple);
}
static void
g_buffered_input_stream_read_async (GInputStream *stream,
void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GBufferedInputStream *bstream;
GBufferedInputStreamPrivate *priv;
GBufferedInputStreamClass *class;
GInputStream *base_stream;
gsize available;
GSimpleAsyncResult *simple;
ReadAsyncData *data;
bstream = G_BUFFERED_INPUT_STREAM (stream);
priv = bstream->priv;
data = g_slice_new (ReadAsyncData);
data->buffer = buffer;
data->bytes_read = 0;
simple = g_simple_async_result_new (G_OBJECT (stream),
callback, user_data,
g_buffered_input_stream_read_async);
g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
available = priv->end - priv->pos;
if (count <= available)
{
memcpy (buffer, priv->buffer + priv->pos, count);
priv->pos += count;
data->bytes_read = count;
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
return;
}
/* Full request not available, read all currently available
* and request refill for more
*/
memcpy (buffer, priv->buffer + priv->pos, available);
priv->pos = 0;
priv->end = 0;
count -= available;
data->bytes_read = available;
data->count = count;
if (count > priv->len)
{
/* Large request, shortcut buffer */
base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
g_input_stream_read_async (base_stream,
(char *)buffer + data->bytes_read,
count,
io_priority, cancellable,
large_read_callback,
simple);
}
else
{
class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
class->fill_async (bstream, priv->len, io_priority, cancellable,
read_fill_buffer_callback, simple);
}
}
static gssize
g_buffered_input_stream_read_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
ReadAsyncData *data;
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
data = g_simple_async_result_get_op_res_gpointer (simple);
return data->bytes_read;
}
typedef struct
{
gssize bytes_skipped;
......
......@@ -88,16 +88,6 @@ static gboolean g_buffered_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
static void g_buffered_output_stream_write_async (GOutputStream *stream,
const void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data);
static gssize g_buffered_output_stream_write_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error);
static void g_buffered_output_stream_flush_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
......@@ -137,8 +127,6 @@ g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
ostream_class->write_fn = g_buffered_output_stream_write;
ostream_class->flush = g_buffered_output_stream_flush;
ostream_class->close_fn = g_buffered_output_stream_close;
ostream_class->write_async = g_buffered_output_stream_write_async;
ostream_class->write_finish = g_buffered_output_stream_write_finish;
ostream_class->flush_async = g_buffered_output_stream_flush_async;
ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
ostream_class->close_async = g_buffered_output_stream_close_async;
......@@ -578,102 +566,6 @@ flush_buffer_thread (GSimpleAsyncResult *result,
g_simple_async_result_take_error (result, error);
}
typedef struct {
FlushData fdata;
gsize count;
const void *buffer;
} WriteData;
static void
free_write_data (gpointer data)
{
g_slice_free (WriteData, data);
}
static void
g_buffered_output_stream_write_async (GOutputStream *stream,
const void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data)
{
GBufferedOutputStream *buffered_stream;
GBufferedOutputStreamPrivate *priv;
GSimpleAsyncResult *res;
WriteData *wdata;
buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
priv = buffered_stream->priv;
wdata = g_slice_new (WriteData);
wdata->count = count;
wdata->buffer = buffer;
res = g_simple_async_result_new (G_OBJECT (stream),
callback,
data,
g_buffered_output_stream_write_async);
g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
/* if we have space left directly call the
* callback (from idle) otherwise schedule a buffer
* flush in the thread. In both cases the actual
* copying of the data to the buffer will be done in
* the write_finish () func since that should
* be fast enough */
if (priv->len - priv->pos > 0)
{
g_simple_async_result_complete_in_idle (res);
}
else
{
wdata->fdata.flush_stream = FALSE;
wdata->fdata.close_stream = FALSE;
g_simple_async_result_run_in_thread (res,
flush_buffer_thread,
io_priority,
cancellable);
}
g_object_unref (res);
}
static gssize
g_buffered_output_stream_write_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error)
{
GBufferedOutputStreamPrivate *priv;
GBufferedOutputStream *buffered_stream;
GSimpleAsyncResult *simple;
WriteData *wdata;
gssize count;
simple = G_SIMPLE_ASYNC_RESULT (result);
buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
priv = buffered_stream->priv;
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
g_buffered_output_stream_write_async);
wdata = g_simple_async_result_get_op_res_gpointer (simple);
/* Now do the real copying of data to the buffer */
count = priv->len - priv->pos;
count = MIN (wdata->count, count);
memcpy (priv->buffer + priv->pos, wdata->buffer, count);
priv->pos += count;
return count;
}
static void
g_buffered_output_stream_flush_async (GOutputStream *stream,
int io_priority,
......
......@@ -30,7 +30,7 @@
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "gioerror.h"
#include "gpollableinputstream.h"
/**
* SECTION:ginputstream
......@@ -925,6 +925,10 @@ typedef struct {
void *buffer;
gsize count_requested;
gssize count_read;
GCancellable *cancellable;
gint io_priority;
gboolean need_idle;
} ReadData;
static void
......@@ -947,6 +951,60 @@ read_async_thread (GSimpleAsyncResult *res,
g_simple_async_result_take_error (res, error);
}
static void read_async_pollable (GPollableInputStream *stream,
GSimpleAsyncResult *result);
static gboolean
read_async_pollable_ready (GPollableInputStream *stream,
gpointer user_data)
{
GSimpleAsyncResult *result = user_data;
read_async_pollable (stream, result);
return FALSE;
}
static void
read_async_pollable (GPollableInputStream *stream,
GSimpleAsyncResult *result)
{
GError *error = NULL;
ReadData *op = g_simple_async_result_get_op_res_gpointer (result);
if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
op->count_read = -1;
else
{
op->count_read = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
read_nonblocking (stream, op->buffer, op->count_requested, &error);
}
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
GSource *source;
g_error_free (error);
op->need_idle = FALSE;
source = g_pollable_input_stream_create_source (stream, op->cancellable);
g_source_set_callback (source,
(GSourceFunc) read_async_pollable_ready,
g_object_ref (result), g_object_unref);
g_source_set_priority (source, op->io_priority);
g_source_attach (source, g_main_context_get_thread_default ());
g_source_unref (source);
return;
}
if (op->count_read == -1)
g_simple_async_result_take_error (result, error);
if (op->need_idle)
g_simple_async_result_complete_in_idle (result);
else
g_simple_async_result_complete (result);
}
static void
g_input_stream_real_read_async (GInputStream *stream,
void *buffer,
......@@ -964,8 +1022,15 @@ g_input_stream_real_read_async (GInputStream *stream,
g_simple_async_result_set_op_res_gpointer (res, op, g_free);
op->buffer = buffer;
op->count_requested = count;
g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
op->io_priority = io_priority;
op->need_idle = TRUE;
if (G_IS_POLLABLE_INPUT_STREAM (stream) &&
g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), res);
else
g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
g_object_unref (res);
}
......
......@@ -70,16 +70,6 @@ static gssize g_memory_input_stream_skip (GInputStream *stream
static gboolean g_memory_input_stream_close (GInputStream *stream,
GCancellable *cancellable,
GError **error);
static void g_memory_input_stream_read_async (GInputStream *stream,
void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
static gssize g_memory_input_stream_read_finish (GInputStream *stream,
GAsyncResult *result,
GError **error);
static void g_memory_input_stream_skip_async (GInputStream *stream,
gsize count,
int io_priority,
......@@ -143,8 +133,6 @@ g_memory_input_stream_class_init (GMemoryInputStreamClass *klass)
istream_class->skip = g_memory_input_stream_skip;
istream_class->close_fn = g_memory_input_stream_close;
istream_class->read_async = g_memory_input_stream_read_async;
istream_class->read_finish = g_memory_input_stream_read_finish;
istream_class->skip_async = g_memory_input_stream_skip_async;
istream_class->skip_finish = g_memory_input_stream_skip_finish;
istream_class->close_async = g_memory_input_stream_close_async;
......@@ -352,51 +340,6 @@ g_memory_input_stream_close (GInputStream *stream,
return TRUE;
}
static void
g_memory_input_stream_read_async (GInputStream *stream,
void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *simple;
GError *error = NULL;
gssize nread;
nread = G_INPUT_STREAM_GET_CLASS (stream)->read_fn (stream,
buffer,
count,
cancellable,
&error);
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_memory_input_stream_read_async);
if (error)
g_simple_async_result_take_error (simple, error);
else
g_simple_async_result_set_op_res_gssize (simple, nread);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
}
static gssize
g_memory_input_stream_read_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
gssize nread;
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_memory_input_stream_read_async);
nread = g_simple_async_result_get_op_res_gssize (simple);
return nread;
}
static void
g_memory_input_stream_skip_async (GInputStream *stream,
gsize count,
......
......@@ -89,16 +89,6 @@ static gboolean g_memory_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
static void g_memory_output_stream_write_async (GOutputStream *stream,
const void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data);
static gssize g_memory_output_stream_write_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error);
static void g_memory_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
......@@ -152,8 +142,6 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass)
ostream_class->write_fn = g_memory_output_stream_write;
ostream_class->close_fn = g_memory_output_stream_close;
ostream_class->write_async = g_memory_output_stream_write_async;
ostream_class->write_finish = g_memory_output_stream_write_finish;
ostream_class->close_async = g_memory_output_stream_close_async;
ostream_class->close_finish = g_memory_output_stream_close_finish;
......@@ -628,56 +616,6 @@ g_memory_output_stream_close (GOutputStream *stream,
return TRUE;
}
static void
g_memory_output_stream_write_async (GOutputStream *stream,
const void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data)
{
GSimpleAsyncResult *simple;
GError *error = NULL;
gssize nwritten;
nwritten = G_OUTPUT_STREAM_GET_CLASS (stream)->write_fn (stream,
buffer,
count,
cancellable,
&error);
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
data,
g_memory_output_stream_write_async);
if (error)
g_simple_async_result_take_error (simple, error);
else
g_simple_async_result_set_op_res_gssize (simple, nwritten);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
}
static gssize
g_memory_output_stream_write_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
gssize nwritten;
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
g_memory_output_stream_write_async);
nwritten = g_simple_async_result_get_op_res_gssize (simple);
return nwritten;
}
static void
g_memory_output_stream_close_async (GOutputStream *stream,
int io_priority,
......
......@@ -28,7 +28,7 @@
#include "ginputstream.h"
#include "gioerror.h"
#include "glibintl.h"
#include "gpollableoutputstream.h"
/**
* SECTION:goutputstream
......@@ -1266,6 +1266,10 @@ typedef struct {
const void *buffer;
gsize count_requested;
gssize count_written;
GCancellable *cancellable;
gint io_priority;
gboolean need_idle;
} WriteData;
static void
......@@ -1285,6 +1289,60 @@ write_async_thread (GSimpleAsyncResult *res,
g_simple_async_result_take_error (res, error);
}
static void write_async_pollable (GPollableOutputStream *stream,
GSimpleAsyncResult *result);
static gboolean
write_async_pollable_ready (GPollableOutputStream *stream,
gpointer user_data)
{
GSimpleAsyncResult *result = user_data;
write_async_pollable (stream, result);
return FALSE;
}
static void
write_async_pollable (GPollableOutputStream *stream,
GSimpleAsyncResult *result)
{
GError *error = NULL;
WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
op->count_written = -1;
else
{
op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
write_nonblocking (stream, op->buffer, op->count_requested, &error);
}
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
GSource *source;
g_error_free (error);
op->need_idle = FALSE;
source = g_pollable_output_stream_create_source (stream, op->cancellable);
g_source_set_callback (source,
(GSourceFunc) write_async_pollable_ready,
g_object_ref (result), g_object_unref);
g_source_set_priority (source, op->io_priority);
g_source_attach (source, g_main_context_get_thread_default ());
g_source_unref (source);
return;
}
if (op->count_written == -1)
g_simple_async_result_take_error (result, error);
if (op->need_idle)
g_simple_async_result_complete_in_idle (result);
else
g_simple_async_result_complete (result);
}
static void
g_output_stream_real_write_async (GOutputStream *stream,
const void *buffer,
......@@ -1303,7 +1361,11 @@ g_output_stream_real_write_async (GOutputStream *stream,
op->buffer = buffer;
op->count_requested = count;
g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
else
g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
g_object_unref (res);
}
......
......@@ -132,95 +132,6 @@ g_socket_input_stream_read (GInputStream *stream,
cancellable, error);
}
static gboolean
g_socket_input_stream_read_ready (GSocket *socket,
GIOCondition condition,
GSocketInputStream *stream)
{
GSimpleAsyncResult *simple;
GError *error = NULL;
gssize result;
result = g_socket_receive_with_blocking (stream->priv->socket,
stream->priv->buffer,
stream->priv->count,
FALSE,
stream->priv->cancellable,
&error);
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
return TRUE;
simple = stream->priv->result;
stream->priv->result = NULL;
if (result >= 0)
g_simple_async_result_set_op_res_gssize (simple, result);
if (error)
g_simple_async_result_take_error (simple, error);
if (stream->priv->cancellable)
g_object_unref (stream->priv->cancellable);
g_simple_async_result_complete (simple);
g_object_unref (simple);
return FALSE;
}
static void