gunixinputstream.c 14.8 KB
Newer Older
1 2 3 4 5 6 7
/* GIO - GLib Input, Output and Streaming Library
 * 
 * Copyright (C) 2006-2007 Red Hat, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9 10 11 12 13 14 15
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General
16
 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 18 19 20
 *
 * Author: Alexander Larsson <alexl@redhat.com>
 */

21
#include "config.h"
22 23 24 25 26 27 28 29 30 31

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <fcntl.h>

#include <glib.h>
#include <glib/gstdio.h>
32
#include <glib/glib-unix.h>
33
#include "gioerror.h"
34
#include "gunixinputstream.h"
35 36
#include "gcancellable.h"
#include "gasynchelper.h"
37
#include "gfiledescriptorbased.h"
38 39
#include "glibintl.h"

40

41
/**
42
 * SECTION:gunixinputstream
43
 * @short_description: Streaming input operations for UNIX file descriptors
44
 * @include: gio/gunixinputstream.h
45
 * @see_also: #GInputStream
46
 *
47 48 49 50
 * #GUnixInputStream implements #GInputStream for reading from a UNIX
 * file descriptor, including asynchronous operations. (If the file
 * descriptor refers to a socket or pipe, this will use poll() to do
 * asynchronous I/O. If it refers to a regular file, it will fall back
51
 * to doing asynchronous I/O in another thread.)
52
 *
53
 * Note that `<gio/gunixinputstream.h>` belongs to the UNIX-specific GIO
54 55
 * interfaces, thus you have to use the `gio-unix-2.0.pc` pkg-config
 * file when using it.
56
 */
57

58 59 60 61 62 63
enum {
  PROP_0,
  PROP_FD,
  PROP_CLOSE_FD
};

64 65 66 67 68 69
struct _GUnixInputStreamPrivate {
  int fd;
  guint close_fd : 1;
  guint is_pipe_or_socket : 1;
};

70
static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
71
static void g_unix_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface);
72 73

G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM,
74
                         G_ADD_PRIVATE (GUnixInputStream)
75 76
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
						g_unix_input_stream_pollable_iface_init)
77 78 79
			 G_IMPLEMENT_INTERFACE (G_TYPE_FILE_DESCRIPTOR_BASED,
						g_unix_input_stream_file_descriptor_based_iface_init)
			 )
80

81 82 83 84 85 86 87 88
static void     g_unix_input_stream_set_property (GObject              *object,
						  guint                 prop_id,
						  const GValue         *value,
						  GParamSpec           *pspec);
static void     g_unix_input_stream_get_property (GObject              *object,
						  guint                 prop_id,
						  GValue               *value,
						  GParamSpec           *pspec);
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
static gssize   g_unix_input_stream_read         (GInputStream         *stream,
						  void                 *buffer,
						  gsize                 count,
						  GCancellable         *cancellable,
						  GError              **error);
static gboolean g_unix_input_stream_close        (GInputStream         *stream,
						  GCancellable         *cancellable,
						  GError              **error);
static void     g_unix_input_stream_skip_async   (GInputStream         *stream,
						  gsize                 count,
						  int                   io_priority,
						  GCancellable         *cancellable,
						  GAsyncReadyCallback   callback,
						  gpointer              data);
static gssize   g_unix_input_stream_skip_finish  (GInputStream         *stream,
						  GAsyncResult         *result,
						  GError              **error);
static void     g_unix_input_stream_close_async  (GInputStream         *stream,
						  int                   io_priority,
						  GCancellable         *cancellable,
						  GAsyncReadyCallback   callback,
						  gpointer              data);
static gboolean g_unix_input_stream_close_finish (GInputStream         *stream,
						  GAsyncResult         *result,
						  GError              **error);

115
static gboolean g_unix_input_stream_pollable_can_poll      (GPollableInputStream *stream);
116 117 118
static gboolean g_unix_input_stream_pollable_is_readable   (GPollableInputStream *stream);
static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
							    GCancellable         *cancellable);
119 120

static void
121
g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
122 123 124
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
125 126 127

  gobject_class->get_property = g_unix_input_stream_get_property;
  gobject_class->set_property = g_unix_input_stream_set_property;
128

129 130
  stream_class->read_fn = g_unix_input_stream_read;
  stream_class->close_fn = g_unix_input_stream_close;
131 132 133
  if (0)
    {
      /* TODO: Implement instead of using fallbacks */
134 135
      stream_class->skip_async = g_unix_input_stream_skip_async;
      stream_class->skip_finish = g_unix_input_stream_skip_finish;
136
    }
137 138
  stream_class->close_async = g_unix_input_stream_close_async;
  stream_class->close_finish = g_unix_input_stream_close_finish;
139 140 141 142 143 144 145 146 147 148 149

  /**
   * GUnixInputStream:fd:
   *
   * The file descriptor that the stream reads from.
   *
   * Since: 2.20
   */
  g_object_class_install_property (gobject_class,
				   PROP_FD,
				   g_param_spec_int ("fd",
150 151
						     P_("File descriptor"),
						     P_("The file descriptor to read from"),
152 153 154 155 156 157 158 159 160 161 162 163 164
						     G_MININT, G_MAXINT, -1,
						     G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));

  /**
   * GUnixInputStream:close-fd:
   *
   * Whether to close the file descriptor when the stream is closed.
   *
   * Since: 2.20
   */
  g_object_class_install_property (gobject_class,
				   PROP_CLOSE_FD,
				   g_param_spec_boolean ("close-fd",
165 166
							 P_("Close file descriptor"),
							 P_("Whether to close the file descriptor when the stream is closed"),
167 168 169 170
							 TRUE,
							 G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
}

171 172 173
static void
g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
174
  iface->can_poll = g_unix_input_stream_pollable_can_poll;
175 176 177 178
  iface->is_readable = g_unix_input_stream_pollable_is_readable;
  iface->create_source = g_unix_input_stream_pollable_create_source;
}

179 180 181 182 183 184
static void
g_unix_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface)
{
  iface->get_fd = (int (*) (GFileDescriptorBased *))g_unix_input_stream_get_fd;
}

185 186 187 188 189 190 191 192 193 194 195 196 197 198
static void
g_unix_input_stream_set_property (GObject         *object,
				  guint            prop_id,
				  const GValue    *value,
				  GParamSpec      *pspec)
{
  GUnixInputStream *unix_stream;
  
  unix_stream = G_UNIX_INPUT_STREAM (object);

  switch (prop_id)
    {
    case PROP_FD:
      unix_stream->priv->fd = g_value_get_int (value);
199 200 201 202
      if (lseek (unix_stream->priv->fd, 0, SEEK_CUR) == -1 && errno == ESPIPE)
	unix_stream->priv->is_pipe_or_socket = TRUE;
      else
	unix_stream->priv->is_pipe_or_socket = FALSE;
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
      break;
    case PROP_CLOSE_FD:
      unix_stream->priv->close_fd = g_value_get_boolean (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
    }
}

static void
g_unix_input_stream_get_property (GObject    *object,
				  guint       prop_id,
				  GValue     *value,
				  GParamSpec *pspec)
{
  GUnixInputStream *unix_stream;

  unix_stream = G_UNIX_INPUT_STREAM (object);

  switch (prop_id)
    {
    case PROP_FD:
      g_value_set_int (value, unix_stream->priv->fd);
      break;
    case PROP_CLOSE_FD:
      g_value_set_boolean (value, unix_stream->priv->close_fd);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
    }
234 235 236
}

static void
237
g_unix_input_stream_init (GUnixInputStream *unix_stream)
238
{
239
  unix_stream->priv = g_unix_input_stream_get_instance_private (unix_stream);
240 241
  unix_stream->priv->fd = -1;
  unix_stream->priv->close_fd = TRUE;
242 243 244
}

/**
245
 * g_unix_input_stream_new:
246 247
 * @fd: a UNIX file descriptor
 * @close_fd: %TRUE to close the file descriptor when done
248
 * 
249 250 251 252
 * Creates a new #GUnixInputStream for the given @fd. 
 *
 * If @close_fd is %TRUE, the file descriptor will be closed 
 * when the stream is closed.
253
 * 
254
 * Returns: a new #GUnixInputStream
255 256
 **/
GInputStream *
257 258
g_unix_input_stream_new (gint     fd,
			 gboolean close_fd)
259
{
260
  GUnixInputStream *stream;
261 262 263

  g_return_val_if_fail (fd != -1, NULL);

264 265 266 267
  stream = g_object_new (G_TYPE_UNIX_INPUT_STREAM,
			 "fd", fd,
			 "close-fd", close_fd,
			 NULL);
268 269 270 271

  return G_INPUT_STREAM (stream);
}

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
/**
 * g_unix_input_stream_set_close_fd:
 * @stream: a #GUnixInputStream
 * @close_fd: %TRUE to close the file descriptor when done
 *
 * Sets whether the file descriptor of @stream shall be closed
 * when the stream is closed.
 *
 * Since: 2.20
 */
void
g_unix_input_stream_set_close_fd (GUnixInputStream *stream,
				  gboolean          close_fd)
{
  g_return_if_fail (G_IS_UNIX_INPUT_STREAM (stream));

  close_fd = close_fd != FALSE;
  if (stream->priv->close_fd != close_fd)
    {
      stream->priv->close_fd = close_fd;
      g_object_notify (G_OBJECT (stream), "close-fd");
    }
}

/**
 * g_unix_input_stream_get_close_fd:
 * @stream: a #GUnixInputStream
 *
 * Returns whether the file descriptor of @stream will be
 * closed when the stream is closed.
 *
303
 * Returns: %TRUE if the file descriptor is closed when done
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
 *
 * Since: 2.20
 */
gboolean
g_unix_input_stream_get_close_fd (GUnixInputStream *stream)
{
  g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), FALSE);

  return stream->priv->close_fd;
}

/**
 * g_unix_input_stream_get_fd:
 * @stream: a #GUnixInputStream
 *
 * Return the UNIX file descriptor that the stream reads from.
 *
321
 * Returns: The file descriptor of @stream
322 323 324 325 326 327 328 329 330 331 332
 *
 * Since: 2.20
 */
gint
g_unix_input_stream_get_fd (GUnixInputStream *stream)
{
  g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), -1);
  
  return stream->priv->fd;
}

333
static gssize
334 335 336 337 338
g_unix_input_stream_read (GInputStream  *stream,
			  void          *buffer,
			  gsize          count,
			  GCancellable  *cancellable,
			  GError       **error)
339
{
340
  GUnixInputStream *unix_stream;
341
  gssize res = -1;
342
  GPollFD poll_fds[2];
343
  int nfds;
344 345
  int poll_ret;

346
  unix_stream = G_UNIX_INPUT_STREAM (stream);
347

348 349
  poll_fds[0].fd = unix_stream->priv->fd;
  poll_fds[0].events = G_IO_IN;
350 351
  if (unix_stream->priv->is_pipe_or_socket &&
      g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
352 353 354 355 356
    nfds = 2;
  else
    nfds = 1;

  while (1)
357
    {
358 359
      int errsv;

360
      poll_fds[0].revents = poll_fds[1].revents = 0;
361
      do
362 363 364 365 366
        {
          poll_ret = g_poll (poll_fds, nfds, -1);
          errsv = errno;
        }
      while (poll_ret == -1 && errsv == EINTR);
367

368 369 370
      if (poll_ret == -1)
	{
	  g_set_error (error, G_IO_ERROR,
371
		       g_io_error_from_errno (errsv),
372
		       _("Error reading from file descriptor: %s"),
373
		       g_strerror (errsv));
374
	  break;
375 376 377
	}

      if (g_cancellable_set_error_if_cancelled (cancellable, error))
378 379 380 381 382
	break;

      if (!poll_fds[0].revents)
	continue;

383
      res = read (unix_stream->priv->fd, buffer, count);
384 385
      if (res == -1)
	{
386 387
          int errsv = errno;

388
	  if (errsv == EINTR || errsv == EAGAIN)
389
	    continue;
390

391
	  g_set_error (error, G_IO_ERROR,
392
		       g_io_error_from_errno (errsv),
393
		       _("Error reading from file descriptor: %s"),
394
		       g_strerror (errsv));
395
	}
396

397 398 399
      break;
    }

400 401
  if (nfds == 2)
    g_cancellable_release_fd (cancellable);
402 403 404 405
  return res;
}

static gboolean
406 407 408
g_unix_input_stream_close (GInputStream  *stream,
			   GCancellable  *cancellable,
			   GError       **error)
409
{
410
  GUnixInputStream *unix_stream;
411 412
  int res;

413
  unix_stream = G_UNIX_INPUT_STREAM (stream);
414

415
  if (!unix_stream->priv->close_fd)
416 417
    return TRUE;
  
418 419 420
  /* This might block during the close. Doesn't seem to be a way to avoid it though. */
  res = close (unix_stream->priv->fd);
  if (res == -1)
421
    {
422
      int errsv = errno;
423

424 425 426 427
      g_set_error (error, G_IO_ERROR,
		   g_io_error_from_errno (errsv),
		   _("Error closing file descriptor: %s"),
		   g_strerror (errsv));
428 429 430 431 432 433
    }
  
  return res != -1;
}

static void
434 435 436 437 438 439
g_unix_input_stream_skip_async (GInputStream        *stream,
				gsize                count,
				int                  io_priority,
				GCancellable        *cancellable,
				GAsyncReadyCallback  callback,
				gpointer             data)
440
{
441
  g_warn_if_reached ();
442 443 444 445
  /* TODO: Not implemented */
}

static gssize
446 447 448
g_unix_input_stream_skip_finish  (GInputStream  *stream,
				  GAsyncResult  *result,
				  GError       **error)
449
{
450
  g_warn_if_reached ();
451
  return 0;
452 453 454 455
  /* TODO: Not implemented */
}

static void
456 457 458 459 460
g_unix_input_stream_close_async (GInputStream        *stream,
				 int                  io_priority,
				 GCancellable        *cancellable,
				 GAsyncReadyCallback  callback,
				 gpointer             user_data)
461
{
462 463
  GTask *task;
  GError *error = NULL;
464

465
  task = g_task_new (stream, cancellable, callback, user_data);
466
  g_task_set_source_tag (task, g_unix_input_stream_close_async);
467
  g_task_set_priority (task, io_priority);
468

469 470 471 472 473
  if (g_unix_input_stream_close (stream, cancellable, &error))
    g_task_return_boolean (task, TRUE);
  else
    g_task_return_error (task, error);
  g_object_unref (task);
474 475 476
}

static gboolean
477 478 479
g_unix_input_stream_close_finish (GInputStream  *stream,
				  GAsyncResult  *result,
				  GError       **error)
480
{
481 482 483
  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);

  return g_task_propagate_boolean (G_TASK (result), error);
484
}
485

486 487 488 489 490 491
static gboolean
g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream)
{
  return G_UNIX_INPUT_STREAM (stream)->priv->is_pipe_or_socket;
}

492 493 494 495 496 497 498 499 500
static gboolean
g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
{
  GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
  GPollFD poll_fd;
  gint result;

  poll_fd.fd = unix_stream->priv->fd;
  poll_fd.events = G_IO_IN;
501
  poll_fd.revents = 0;
502 503 504 505 506 507 508 509 510 511 512 513 514

  do
    result = g_poll (&poll_fd, 1, 0);
  while (result == -1 && errno == EINTR);

  return poll_fd.revents != 0;
}

static GSource *
g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
					    GCancellable         *cancellable)
{
  GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
515
  GSource *inner_source, *cancellable_source, *pollable_source;
516 517 518

  pollable_source = g_pollable_source_new (G_OBJECT (stream));

519
  inner_source = g_unix_fd_source_new (unix_stream->priv->fd, G_IO_IN);
520 521 522 523
  g_source_set_dummy_callback (inner_source);
  g_source_add_child_source (pollable_source, inner_source);
  g_source_unref (inner_source);

524 525 526 527 528 529 530 531
  if (cancellable)
    {
      cancellable_source = g_cancellable_source_new (cancellable);
      g_source_set_dummy_callback (cancellable_source);
      g_source_add_child_source (pollable_source, cancellable_source);
      g_source_unref (cancellable_source);
    }

532 533
  return pollable_source;
}