gunixoutputstream.c 18.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/* GIO - GLib Input, Output and Streaming Library
 * 
 * Copyright (C) 2006-2007 Red Hat, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General
 * Public License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
 * Boston, MA 02111-1307, USA.
 *
 * Author: Alexander Larsson <alexl@redhat.com>
 */

23
#include "config.h"
24 25 26 27 28 29 30 31 32 33 34

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <fcntl.h>

#include <glib.h>
#include <glib/gstdio.h>
#include "gioerror.h"
35
#include "gunixoutputstream.h"
36 37 38
#include "gcancellable.h"
#include "gsimpleasyncresult.h"
#include "gasynchelper.h"
39
#include "gfiledescriptorbased.h"
40 41
#include "glibintl.h"

42

43
/**
44
 * SECTION:gunixoutputstream
Matthias Clasen's avatar
Matthias Clasen committed
45
 * @short_description: Streaming output operations for UNIX file descriptors
46
 * @include: gio/gunixoutputstream.h
47
 * @see_also: #GOutputStream
48
 *
49 50 51 52
 * #GUnixOutputStream implements #GOutputStream for writing to a UNIX
 * file descriptor, including asynchronous operations. (If the file
 * descriptor refers to a socket or pipe, this will use poll() to do
 * asynchronous I/O. If it refers to a regular file, it will fall back
53
 * to doing asynchronous I/O in another thread.)
54
 *
Matthias Clasen's avatar
Matthias Clasen committed
55
 * Note that <filename>&lt;gio/gunixoutputstream.h&gt;</filename> belongs
56 57 58
 * to the UNIX-specific GIO interfaces, thus you have to use the
 * <filename>gio-unix-2.0.pc</filename> pkg-config file when using it.
 */
59

60 61 62 63 64 65
enum {
  PROP_0,
  PROP_FD,
  PROP_CLOSE_FD
};

66
static void g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
67
static void g_unix_output_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface);
68

69 70 71
G_DEFINE_TYPE_WITH_CODE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM,
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
						g_unix_output_stream_pollable_iface_init)
72 73 74
			 G_IMPLEMENT_INTERFACE (G_TYPE_FILE_DESCRIPTOR_BASED,
						g_unix_output_stream_file_descriptor_based_iface_init)
			 )
75

76
struct _GUnixOutputStreamPrivate {
77
  int fd;
78 79
  guint close_fd : 1;
  guint is_pipe_or_socket : 1;
80 81
};

82 83 84 85 86 87 88 89
static void     g_unix_output_stream_set_property (GObject              *object,
						   guint                 prop_id,
						   const GValue         *value,
						   GParamSpec           *pspec);
static void     g_unix_output_stream_get_property (GObject              *object,
						   guint                 prop_id,
						   GValue               *value,
						   GParamSpec           *pspec);
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
static gssize   g_unix_output_stream_write        (GOutputStream        *stream,
						   const void           *buffer,
						   gsize                 count,
						   GCancellable         *cancellable,
						   GError              **error);
static gboolean g_unix_output_stream_close        (GOutputStream        *stream,
						   GCancellable         *cancellable,
						   GError              **error);
static void     g_unix_output_stream_write_async  (GOutputStream        *stream,
						   const void           *buffer,
						   gsize                 count,
						   int                   io_priority,
						   GCancellable         *cancellable,
						   GAsyncReadyCallback   callback,
						   gpointer              data);
static gssize   g_unix_output_stream_write_finish (GOutputStream        *stream,
						   GAsyncResult         *result,
						   GError              **error);
static void     g_unix_output_stream_close_async  (GOutputStream        *stream,
						   int                   io_priority,
						   GCancellable         *cancellable,
						   GAsyncReadyCallback   callback,
						   gpointer              data);
static gboolean g_unix_output_stream_close_finish (GOutputStream        *stream,
						   GAsyncResult         *result,
						   GError              **error);
116

117 118 119
static gboolean g_unix_output_stream_pollable_is_writable   (GPollableOutputStream *stream);
static GSource *g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
							     GCancellable         *cancellable);
120 121

static void
122
g_unix_output_stream_finalize (GObject *object)
123
{
124
  G_OBJECT_CLASS (g_unix_output_stream_parent_class)->finalize (object);
125 126 127
}

static void
128
g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
129 130 131 132
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
  
133
  g_type_class_add_private (klass, sizeof (GUnixOutputStreamPrivate));
134 135 136

  gobject_class->get_property = g_unix_output_stream_get_property;
  gobject_class->set_property = g_unix_output_stream_set_property;
137 138
  gobject_class->finalize = g_unix_output_stream_finalize;

139 140
  stream_class->write_fn = g_unix_output_stream_write;
  stream_class->close_fn = g_unix_output_stream_close;
141 142 143 144
  stream_class->write_async = g_unix_output_stream_write_async;
  stream_class->write_finish = g_unix_output_stream_write_finish;
  stream_class->close_async = g_unix_output_stream_close_async;
  stream_class->close_finish = g_unix_output_stream_close_finish;
145 146 147 148 149 150 151 152 153 154 155

   /**
   * GUnixOutputStream:fd:
   *
   * The file descriptor that the stream writes to.
   *
   * Since: 2.20
   */
  g_object_class_install_property (gobject_class,
				   PROP_FD,
				   g_param_spec_int ("fd",
156 157
						     P_("File descriptor"),
						     P_("The file descriptor to write to"),
158 159 160 161 162 163 164 165 166 167 168 169 170
						     G_MININT, G_MAXINT, -1,
						     G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));

  /**
   * GUnixOutputStream:close-fd:
   *
   * Whether to close the file descriptor when the stream is closed.
   *
   * Since: 2.20
   */
  g_object_class_install_property (gobject_class,
				   PROP_CLOSE_FD,
				   g_param_spec_boolean ("close-fd",
171 172
							 P_("Close file descriptor"),
							 P_("Whether to close the file descriptor when the stream is closed"),
173 174 175 176
							 TRUE,
							 G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
}

177 178 179 180 181 182 183
static void
g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
{
  iface->is_writable = g_unix_output_stream_pollable_is_writable;
  iface->create_source = g_unix_output_stream_pollable_create_source;
}

184 185 186 187 188 189
static void
g_unix_output_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface)
{
  iface->get_fd = (int (*) (GFileDescriptorBased *))g_unix_output_stream_get_fd;
}

190 191 192 193 194 195 196 197 198 199 200 201 202 203
static void
g_unix_output_stream_set_property (GObject         *object,
				   guint            prop_id,
				   const GValue    *value,
				   GParamSpec      *pspec)
{
  GUnixOutputStream *unix_stream;

  unix_stream = G_UNIX_OUTPUT_STREAM (object);

  switch (prop_id)
    {
    case PROP_FD:
      unix_stream->priv->fd = g_value_get_int (value);
204 205 206 207
      if (lseek (unix_stream->priv->fd, 0, SEEK_CUR) == -1 && errno == ESPIPE)
	unix_stream->priv->is_pipe_or_socket = TRUE;
      else
	unix_stream->priv->is_pipe_or_socket = FALSE;
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
      break;
    case PROP_CLOSE_FD:
      unix_stream->priv->close_fd = g_value_get_boolean (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
    }
}

static void
g_unix_output_stream_get_property (GObject    *object,
				   guint       prop_id,
				   GValue     *value,
				   GParamSpec *pspec)
{
  GUnixOutputStream *unix_stream;

  unix_stream = G_UNIX_OUTPUT_STREAM (object);

  switch (prop_id)
    {
    case PROP_FD:
      g_value_set_int (value, unix_stream->priv->fd);
      break;
    case PROP_CLOSE_FD:
      g_value_set_boolean (value, unix_stream->priv->close_fd);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
    }
239 240 241
}

static void
242
g_unix_output_stream_init (GUnixOutputStream *unix_stream)
243
{
244 245 246
  unix_stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (unix_stream,
						   G_TYPE_UNIX_OUTPUT_STREAM,
						   GUnixOutputStreamPrivate);
247

248 249 250
  unix_stream->priv->fd = -1;
  unix_stream->priv->close_fd = TRUE;
}
251 252

/**
253
 * g_unix_output_stream_new:
254 255
 * @fd: a UNIX file descriptor
 * @close_fd: %TRUE to close the file descriptor when done
256
 * 
257
 * Creates a new #GUnixOutputStream for the given @fd. 
258
 * 
259 260 261 262
 * If @close_fd, is %TRUE, the file descriptor will be closed when 
 * the output stream is destroyed.
 * 
 * Returns: a new #GOutputStream
263 264
 **/
GOutputStream *
265 266
g_unix_output_stream_new (gint     fd,
			  gboolean close_fd)
267
{
268
  GUnixOutputStream *stream;
269 270 271

  g_return_val_if_fail (fd != -1, NULL);

272 273 274 275
  stream = g_object_new (G_TYPE_UNIX_OUTPUT_STREAM,
			 "fd", fd,
			 "close-fd", close_fd,
			 NULL);
276 277 278 279
  
  return G_OUTPUT_STREAM (stream);
}

280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
/**
 * g_unix_output_stream_set_close_fd:
 * @stream: a #GUnixOutputStream
 * @close_fd: %TRUE to close the file descriptor when done
 *
 * Sets whether the file descriptor of @stream shall be closed
 * when the stream is closed.
 *
 * Since: 2.20
 */
void
g_unix_output_stream_set_close_fd (GUnixOutputStream *stream,
                                   gboolean           close_fd)
{
  g_return_if_fail (G_IS_UNIX_OUTPUT_STREAM (stream));

  close_fd = close_fd != FALSE;
  if (stream->priv->close_fd != close_fd)
    {
      stream->priv->close_fd = close_fd;
      g_object_notify (G_OBJECT (stream), "close-fd");
    }
}

/**
 * g_unix_output_stream_get_close_fd:
 * @stream: a #GUnixOutputStream
 *
 * Returns whether the file descriptor of @stream will be
 * closed when the stream is closed.
 *
 * Return value: %TRUE if the file descriptor is closed when done
 *
 * Since: 2.20
 */
gboolean
g_unix_output_stream_get_close_fd (GUnixOutputStream *stream)
{
  g_return_val_if_fail (G_IS_UNIX_OUTPUT_STREAM (stream), FALSE);

  return stream->priv->close_fd;
}

/**
 * g_unix_output_stream_get_fd:
 * @stream: a #GUnixOutputStream
 *
 * Return the UNIX file descriptor that the stream writes to.
 *
 * Return value: The file descriptor of @stream
 *
 * Since: 2.20
 */
gint
g_unix_output_stream_get_fd (GUnixOutputStream *stream)
{
  g_return_val_if_fail (G_IS_UNIX_OUTPUT_STREAM (stream), -1);

  return stream->priv->fd;
}

341
static gssize
342 343 344 345 346
g_unix_output_stream_write (GOutputStream  *stream,
			    const void     *buffer,
			    gsize           count,
			    GCancellable   *cancellable,
			    GError        **error)
347
{
348
  GUnixOutputStream *unix_stream;
349
  gssize res = -1;
350
  GPollFD poll_fds[2];
351
  int nfds;
352 353
  int poll_ret;

354
  unix_stream = G_UNIX_OUTPUT_STREAM (stream);
355

356 357 358
  poll_fds[0].fd = unix_stream->priv->fd;
  poll_fds[0].events = G_IO_OUT;

359 360
  if (unix_stream->priv->is_pipe_or_socket &&
      g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
361 362 363 364 365
    nfds = 2;
  else
    nfds = 1;

  while (1)
366
    {
367
      poll_fds[0].revents = poll_fds[1].revents = 0;
368
      do
369
	poll_ret = g_poll (poll_fds, nfds, -1);
370
      while (poll_ret == -1 && errno == EINTR);
371

372 373
      if (poll_ret == -1)
	{
374 375
          int errsv = errno;

376
	  g_set_error (error, G_IO_ERROR,
377
		       g_io_error_from_errno (errsv),
378
		       _("Error writing to file descriptor: %s"),
379
		       g_strerror (errsv));
380
	  break;
381
	}
382

383
      if (g_cancellable_set_error_if_cancelled (cancellable, error))
384 385 386 387
	break;

      if (!poll_fds[0].revents)
	continue;
388

389
      res = write (unix_stream->priv->fd, buffer, count);
390 391
      if (res == -1)
	{
392 393
          int errsv = errno;

394
	  if (errsv == EINTR || errsv == EAGAIN)
395
	    continue;
396

397
	  g_set_error (error, G_IO_ERROR,
398
		       g_io_error_from_errno (errsv),
399
		       _("Error writing to file descriptor: %s"),
400
		       g_strerror (errsv));
401
	}
402

403 404
      break;
    }
405

406 407
  if (nfds == 2)
    g_cancellable_release_fd (cancellable);
408 409 410 411
  return res;
}

static gboolean
412 413 414
g_unix_output_stream_close (GOutputStream  *stream,
			    GCancellable   *cancellable,
			    GError        **error)
415
{
416
  GUnixOutputStream *unix_stream;
417 418
  int res;

419
  unix_stream = G_UNIX_OUTPUT_STREAM (stream);
420

421
  if (!unix_stream->priv->close_fd)
422 423 424 425 426
    return TRUE;
  
  while (1)
    {
      /* This might block during the close. Doesn't seem to be a way to avoid it though. */
427
      res = close (unix_stream->priv->fd);
428 429
      if (res == -1)
	{
430 431
          int errsv = errno;

432
	  g_set_error (error, G_IO_ERROR,
433
		       g_io_error_from_errno (errsv),
434
		       _("Error closing file descriptor: %s"),
435
		       g_strerror (errsv));
436 437 438 439 440 441 442 443 444 445 446 447 448
	}
      break;
    }

  return res != -1;
}

typedef struct {
  gsize count;
  const void *buffer;
  GAsyncReadyCallback callback;
  gpointer user_data;
  GCancellable *cancellable;
449
  GUnixOutputStream *stream;
450 451 452
} WriteAsyncData;

static gboolean
453
write_async_cb (int             fd,
454
		GIOCondition    condition,
455
		WriteAsyncData *data)
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
{
  GSimpleAsyncResult *simple;
  GError *error = NULL;
  gssize count_written;

  while (1)
    {
      if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
	{
	  count_written = -1;
	  break;
	}
      
      count_written = write (data->stream->priv->fd, data->buffer, data->count);
      if (count_written == -1)
	{
472 473
          int errsv = errno;

474 475
	  if (errsv == EINTR || errsv == EAGAIN)
	    return TRUE;
476 477
	  
	  g_set_error (&error, G_IO_ERROR,
478
		       g_io_error_from_errno (errsv),
479
		       _("Error writing to file descriptor: %s"),
480
		       g_strerror (errsv));
481 482 483 484 485 486 487
	}
      break;
    }

  simple = g_simple_async_result_new (G_OBJECT (data->stream),
				      data->callback,
				      data->user_data,
488
				      g_unix_output_stream_write_async);
489 490 491 492
  
  g_simple_async_result_set_op_res_gssize (simple, count_written);

  if (count_written == -1)
493
    g_simple_async_result_take_error (simple, error);
494 495 496 497 498 499 500 501 502

  /* Complete immediately, not in idle, since we're already in a mainloop callout */
  g_simple_async_result_complete (simple);
  g_object_unref (simple);

  return FALSE;
}

static void
503 504 505 506 507 508 509
g_unix_output_stream_write_async (GOutputStream       *stream,
				  const void          *buffer,
				  gsize                count,
				  int                  io_priority,
				  GCancellable        *cancellable,
				  GAsyncReadyCallback  callback,
				  gpointer             user_data)
510 511
{
  GSource *source;
512
  GUnixOutputStream *unix_stream;
513 514
  WriteAsyncData *data;

515
  unix_stream = G_UNIX_OUTPUT_STREAM (stream);
516

517 518 519 520 521 522 523 524
  if (!unix_stream->priv->is_pipe_or_socket)
    {
      G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
	write_async (stream, buffer, count, io_priority,
		     cancellable, callback, user_data);
      return;
    }

525 526 527 528 529 530
  data = g_new0 (WriteAsyncData, 1);
  data->count = count;
  data->buffer = buffer;
  data->callback = callback;
  data->user_data = user_data;
  data->cancellable = cancellable;
531
  data->stream = unix_stream;
532

533
  source = _g_fd_source_new (unix_stream->priv->fd,
534
			     G_IO_OUT,
535
			     cancellable);
536
  g_source_set_name (source, "GUnixOutputStream");
537 538
  
  g_source_set_callback (source, (GSourceFunc)write_async_cb, data, g_free);
539
  g_source_attach (source, g_main_context_get_thread_default ());
540 541 542 543 544
  
  g_source_unref (source);
}

static gssize
545 546 547
g_unix_output_stream_write_finish (GOutputStream  *stream,
				   GAsyncResult   *result,
				   GError        **error)
548
{
549
  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
550 551 552
  GSimpleAsyncResult *simple;
  gssize nwritten;

553 554 555 556 557 558
  if (!unix_stream->priv->is_pipe_or_socket)
    {
      return G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
	write_finish (stream, result, error);
    }

559
  simple = G_SIMPLE_ASYNC_RESULT (result);
560
  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_output_stream_write_async);
561 562 563 564 565 566 567 568 569 570 571 572 573 574
  
  nwritten = g_simple_async_result_get_op_res_gssize (simple);
  return nwritten;
}

typedef struct {
  GOutputStream *stream;
  GAsyncReadyCallback callback;
  gpointer user_data;
} CloseAsyncData;

static gboolean
close_async_cb (CloseAsyncData *data)
{
575
  GUnixOutputStream *unix_stream;
576 577 578 579 580
  GSimpleAsyncResult *simple;
  GError *error = NULL;
  gboolean result;
  int res;

581
  unix_stream = G_UNIX_OUTPUT_STREAM (data->stream);
582

583
  if (!unix_stream->priv->close_fd)
584 585 586 587 588 589 590
    {
      result = TRUE;
      goto out;
    }
  
  while (1)
    {
591
      res = close (unix_stream->priv->fd);
592 593
      if (res == -1)
	{
594 595
          int errsv = errno;

596
	  g_set_error (&error, G_IO_ERROR,
597
		       g_io_error_from_errno (errsv),
598
		       _("Error closing file descriptor: %s"),
599
		       g_strerror (errsv));
600 601 602 603 604 605 606 607 608 609
	}
      break;
    }
  
  result = res != -1;
  
 out:
  simple = g_simple_async_result_new (G_OBJECT (data->stream),
				      data->callback,
				      data->user_data,
610
				      g_unix_output_stream_close_async);
611 612

  if (!result)
613
    g_simple_async_result_take_error (simple, error);
614 615 616 617 618 619 620 621 622

  /* Complete immediately, not in idle, since we're already in a mainloop callout */
  g_simple_async_result_complete (simple);
  g_object_unref (simple);
  
  return FALSE;
}

static void
623 624 625 626 627
g_unix_output_stream_close_async (GOutputStream        *stream,
				  int                  io_priority,
				  GCancellable        *cancellable,
				  GAsyncReadyCallback  callback,
				  gpointer             user_data)
628 629 630 631 632 633 634 635 636 637 638 639
{
  GSource *idle;
  CloseAsyncData *data;

  data = g_new0 (CloseAsyncData, 1);

  data->stream = stream;
  data->callback = callback;
  data->user_data = user_data;
  
  idle = g_idle_source_new ();
  g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, g_free);
640
  g_source_attach (idle, g_main_context_get_thread_default ());
641 642 643 644
  g_source_unref (idle);
}

static gboolean
645 646 647
g_unix_output_stream_close_finish (GOutputStream  *stream,
				   GAsyncResult   *result,
				   GError        **error)
648 649 650 651
{
  /* Failures handled in generic close_finish code */
  return TRUE;
}
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685

static gboolean
g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream)
{
  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
  GPollFD poll_fd;
  gint result;

  poll_fd.fd = unix_stream->priv->fd;
  poll_fd.events = G_IO_OUT;

  do
    result = g_poll (&poll_fd, 1, 0);
  while (result == -1 && errno == EINTR);

  return poll_fd.revents != 0;
}

static GSource *
g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
					     GCancellable          *cancellable)
{
  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
  GSource *inner_source, *pollable_source;

  pollable_source = g_pollable_source_new (G_OBJECT (stream));

  inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_OUT, cancellable);
  g_source_set_dummy_callback (inner_source);
  g_source_add_child_source (pollable_source, inner_source);
  g_source_unref (inner_source);

  return pollable_source;
}