giowin32.c 59.5 KB
Newer Older
1 2 3 4 5
/* GLIB - Library of useful routines for C programming
 * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
 *
 * giowin32.c: IO Channels for Win32.
 * Copyright 1998 Owen Taylor and Tor Lillqvist
6
 * Copyright 1999-2000 Tor Lillqvist and Craig Setera
7
 * Copyright 2001-2003 Andrew Lanoix
8 9
 *
 * This library is free software; you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public
11
 * License as published by the Free Software Foundation; either
12
 * version 2.1 of the License, or (at your option) any later version.
13 14 15
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
 * Lesser General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public
20
 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 22
 */

23
/*
24
 * Modified by the GLib Team and others 1997-2000.  See the AUTHORS
25 26
 * file for a list of people on the GLib Team.  See the ChangeLog
 * files for a list of changes.  These files are distributed with
27
 * GLib at ftp://ftp.gtk.org/pub/gtk/.
28 29
 */

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
/*
 * Bugs that are related to the code in this file:
 *
 * Bug 137968 - Sometimes a GIOFunc on Win32 is called with zero condition
 * http://bugzilla.gnome.org/show_bug.cgi?id=137968
 *
 * Bug 324234 - Using g_io_add_watch_full() to wait for connect() to return on a non-blocking socket returns prematurely
 * http://bugzilla.gnome.org/show_bug.cgi?id=324234
 *
 * Bug 331214 - g_io_channel async socket io stalls
 * http://bugzilla.gnome.org/show_bug.cgi?id=331214
 *
 * Bug 338943 - Multiple watches on the same socket
 * http://bugzilla.gnome.org/show_bug.cgi?id=338943
 *
 * Bug 357674 - 2 serious bugs in giowin32.c making glib iochannels useless
 * http://bugzilla.gnome.org/show_bug.cgi?id=357674
 *
 * Bug 425156 - GIOChannel deadlocks on a win32 socket
 * http://bugzilla.gnome.org/show_bug.cgi?id=425156
 *
 * Bug 468910 - giofunc condition=0
 * http://bugzilla.gnome.org/show_bug.cgi?id=468910
 *
 * Bug 500246 - Bug fixes for giowin32
 * http://bugzilla.gnome.org/show_bug.cgi?id=500246
 *
 * Bug 548278 - Async GETs connections are always terminated unexpectedly on windows
 * http://bugzilla.gnome.org/show_bug.cgi?id=548278
 *
 * Bug 548536 - giowin32 problem when adding and removing watches
 * http://bugzilla.gnome.org/show_bug.cgi?id=548536
 *
 * When fixing bugs related to the code in this file, either the above
 * bugs or others, make sure that the test programs attached to the
 * above bugs continue to work.
 */

Owen Taylor's avatar
Owen Taylor committed
68 69
#include "config.h"

70
#include "glib.h"
71 72

#include <stdlib.h>
73
#include <winsock2.h>
74
#include <windows.h>
75
#include <conio.h>
76 77
#include <fcntl.h>
#include <io.h>
78
#include <process.h>
79
#include <errno.h>
80
#include <sys/stat.h>
81

82
#include "gstdio.h"
83 84
#include "glibintl.h"

85

86 87 88
typedef struct _GIOWin32Channel GIOWin32Channel;
typedef struct _GIOWin32Watch GIOWin32Watch;

89
#define BUFFER_SIZE 4096
90 91

typedef enum {
92
  G_IO_WIN32_WINDOWS_MESSAGES,	/* Windows messages */
93

94
  G_IO_WIN32_FILE_DESC,		/* Unix-like file descriptors from
95 96 97
				 * _open() or _pipe(), except for
				 * console IO. Separate thread to read
				 * or write.
98
				 */
99

100
  G_IO_WIN32_CONSOLE,		/* Console IO (usually stdin, stdout, stderr) */
101 102

  G_IO_WIN32_SOCKET		/* Sockets. No separate thread. */
103 104 105 106 107 108 109 110 111
} GIOWin32ChannelType;

struct _GIOWin32Channel {
  GIOChannel channel;
  gint fd;			/* Either a Unix-like file handle as provided
				 * by the Microsoft C runtime, or a SOCKET
				 * as provided by WinSock.
				 */
  GIOWin32ChannelType type;
112
  
113 114
  gboolean debug;

115 116
  /* Field used by G_IO_WIN32_WINDOWS_MESSAGES channels */
  HWND hwnd;			/* Handle of window, or NULL */
117
  
118
  /* Fields used by G_IO_WIN32_FILE_DESC channels. */
119 120
  CRITICAL_SECTION mutex;

121 122 123 124
  int direction;		/* 0 means we read from it,
				 * 1 means we write to it.
				 */

125 126 127
  gboolean running;		/* Is reader or writer thread
				 * running. FALSE if EOF has been
				 * reached by the reader thread.
128
				 */
129

130 131 132
  gboolean needs_close;		/* If the channel has been closed while
				 * the reader thread was still running.
				 */
133 134 135 136

  guint thread_id;		/* If non-NULL the channel has or has
				 * had a reader or writer thread.
				 */
137 138 139 140
  HANDLE data_avail_event;

  gushort revents;

141
  /* Data is kept in a circular buffer. To be able to distinguish between
142
   * empty and full buffers, we cannot fill it completely, but have to
143 144 145 146 147 148 149 150 151 152 153
   * leave a one character gap.
   *
   * Data available is between indexes rdp and wrp-1 (modulo BUFFER_SIZE).
   *
   * Empty:    wrp == rdp
   * Full:     (wrp + 1) % BUFFER_SIZE == rdp
   * Partial:  otherwise
   */
  guchar *buffer;		/* (Circular) buffer */
  gint wrp, rdp;		/* Buffer indices for writing and reading */
  HANDLE space_avail_event;
154

155
  /* Fields used by G_IO_WIN32_SOCKET channels */
156 157
  int event_mask;
  int last_events;
158
  HANDLE event;
159
  gboolean write_would_have_blocked;
160
  gboolean ever_writable;
161 162 163
};

struct _GIOWin32Watch {
164
  GSource       source;
165 166 167 168 169
  GPollFD       pollfd;
  GIOChannel   *channel;
  GIOCondition  condition;
};

170
static void
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
g_win32_print_access_mode (int flags)
{
  g_print ("%s%s%s%s%s%s%s%s%s%s",
	   ((flags & 0x3) == _O_RDWR ? "O_RDWR" :
	    ((flags & 0x3) == _O_RDONLY ? "O_RDONLY" :
	     ((flags & 0x3) == _O_WRONLY ? "O_WRONLY" : "0"))),
	   (flags & _O_APPEND ? "|O_APPEND" : ""),
	   (flags & _O_RANDOM ? "|O_RANDOM" : ""),
	   (flags & _O_SEQUENTIAL ? "|O_SEQUENTIAL" : ""),
	   (flags & _O_TEMPORARY ? "|O_TEMPORARY" : ""),
	   (flags & _O_CREAT ? "|O_CREAT" : ""),
	   (flags & _O_TRUNC ? "|O_TRUNC" : ""),
	   (flags & _O_EXCL ? "|O_EXCL" : ""),
	   (flags & _O_TEXT ? "|O_TEXT" : ""),
	   (flags & _O_BINARY ? "|O_BINARY" : ""));
}

static void
g_win32_print_gioflags (GIOFlags flags)
{
  char *bar = "";

  if (flags & G_IO_FLAG_APPEND)
    bar = "|", g_print ("APPEND");
  if (flags & G_IO_FLAG_NONBLOCK)
    g_print ("%sNONBLOCK", bar), bar = "|";
  if (flags & G_IO_FLAG_IS_READABLE)
    g_print ("%sREADABLE", bar), bar = "|";
199 200
  if (flags & G_IO_FLAG_IS_WRITABLE)
    g_print ("%sWRITABLE", bar), bar = "|";
201 202 203 204
  if (flags & G_IO_FLAG_IS_SEEKABLE)
    g_print ("%sSEEKABLE", bar), bar = "|";
}

205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
static const char *
event_mask_to_string (int mask)
{
  char buf[100];
  int checked_bits = 0;
  char *bufp = buf;

  if (mask == 0)
    return "";

#define BIT(n) checked_bits |= FD_##n; if (mask & FD_##n) bufp += sprintf (bufp, "%s" #n, (bufp>buf ? "|" : ""))

  BIT (READ);
  BIT (WRITE);
  BIT (OOB);
  BIT (ACCEPT);
  BIT (CONNECT);
  BIT (CLOSE);
  BIT (QOS);
  BIT (GROUP_QOS);
  BIT (ROUTING_INTERFACE_CHANGE);
  BIT (ADDRESS_LIST_CHANGE);
  
#undef BIT

  if ((mask & ~checked_bits) != 0)
	  bufp += sprintf (bufp, "|%#x", mask & ~checked_bits);
  
  return g_quark_to_string (g_quark_from_string (buf));
}

static const char *
condition_to_string (GIOCondition condition)
{
  char buf[100];
  int checked_bits = 0;
  char *bufp = buf;

  if (condition == 0)
    return "";

#define BIT(n) checked_bits |= G_IO_##n; if (condition & G_IO_##n) bufp += sprintf (bufp, "%s" #n, (bufp>buf ? "|" : ""))

  BIT (IN);
  BIT (OUT);
  BIT (PRI);
  BIT (ERR);
  BIT (HUP);
  BIT (NVAL);
  
#undef BIT

  if ((condition & ~checked_bits) != 0)
	  bufp += sprintf (bufp, "|%#x", condition & ~checked_bits);
  
  return g_quark_to_string (g_quark_from_string (buf));
}

263 264
static gboolean
g_io_win32_get_debug_flag (void)
265
{
266 267 268
  return (getenv ("G_IO_WIN32_DEBUG") != NULL);
}

269 270 271 272
static void
g_io_channel_win32_init (GIOWin32Channel *channel)
{
  channel->debug = g_io_win32_get_debug_flag ();
273 274

  InitializeCriticalSection (&channel->mutex);
275
  channel->running = FALSE;
276
  channel->needs_close = FALSE;
277
  channel->thread_id = 0;
278
  channel->data_avail_event = NULL;
279
  channel->revents = 0;
280
  channel->buffer = NULL;
281
  channel->space_avail_event = NULL;
282

283 284
  channel->event_mask = 0;
  channel->last_events = 0;
285
  channel->event = NULL;
286
  channel->write_would_have_blocked = FALSE;
287
  channel->ever_writable = FALSE;
288 289
}

290 291 292 293 294
static void
create_events (GIOWin32Channel *channel)
{
  SECURITY_ATTRIBUTES sec_attrs;
  
295
  sec_attrs.nLength = sizeof (SECURITY_ATTRIBUTES);
296 297
  sec_attrs.lpSecurityDescriptor = NULL;
  sec_attrs.bInheritHandle = FALSE;
298

299 300 301 302
  /* The data available event is manual reset, the space available event
   * is automatic reset.
   */
  if (!(channel->data_avail_event = CreateEvent (&sec_attrs, TRUE, FALSE, NULL))
303
      || !(channel->space_avail_event = CreateEvent (&sec_attrs, FALSE, FALSE, NULL)))
304
    {
305
      gchar *emsg = g_win32_error_message (GetLastError ());
306

307 308
      g_error ("Error creating event: %s", emsg);
      g_free (emsg);
309 310
    }
}
311

312
static unsigned __stdcall
313
read_thread (void *parameter)
314 315 316
{
  GIOWin32Channel *channel = parameter;
  guchar *buffer;
317
  gint nbytes;
318

319
  g_io_channel_ref ((GIOChannel *)channel);
320

321
  if (channel->debug)
322
    g_print ("read_thread %#x: start fd=%d, data_avail=%p space_avail=%p\n",
323 324
	     channel->thread_id,
	     channel->fd,
325 326
	     channel->data_avail_event,
	     channel->space_avail_event);
327 328

  channel->direction = 0;
329 330 331
  channel->buffer = g_malloc (BUFFER_SIZE);
  channel->rdp = channel->wrp = 0;
  channel->running = TRUE;
332

333 334
  SetEvent (channel->space_avail_event);
  
335
  EnterCriticalSection (&channel->mutex);
336 337
  while (channel->running)
    {
338
      if (channel->debug)
339
	g_print ("read_thread %#x: rdp=%d, wrp=%d\n",
340
		 channel->thread_id, channel->rdp, channel->wrp);
341 342 343
      if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
	{
	  /* Buffer is full */
344
	  if (channel->debug)
345
	    g_print ("read_thread %#x: resetting space_avail\n",
346
		     channel->thread_id);
347
	  ResetEvent (channel->space_avail_event);
348
	  if (channel->debug)
349 350
	    g_print ("read_thread %#x: waiting for space\n",
		     channel->thread_id);
351
	  LeaveCriticalSection (&channel->mutex);
352
	  WaitForSingleObject (channel->space_avail_event, INFINITE);
353
	  EnterCriticalSection (&channel->mutex);
354
	  if (channel->debug)
355
	    g_print ("read_thread %#x: rdp=%d, wrp=%d\n",
356
		     channel->thread_id, channel->rdp, channel->wrp);
357 358 359 360 361 362 363 364 365
	}
      
      buffer = channel->buffer + channel->wrp;
      
      /* Always leave at least one byte unused gap to be able to
       * distinguish between the full and empty condition...
       */
      nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE,
		    BUFFER_SIZE - channel->wrp);
366

367
      if (channel->debug)
368
	g_print ("read_thread %#x: calling read() for %d bytes\n",
369 370
		 channel->thread_id, nbytes);

371
      LeaveCriticalSection (&channel->mutex);
372

373
      nbytes = read (channel->fd, buffer, nbytes);
374
      
375
      EnterCriticalSection (&channel->mutex);
376

377 378 379 380 381 382
      channel->revents = G_IO_IN;
      if (nbytes == 0)
	channel->revents |= G_IO_HUP;
      else if (nbytes < 0)
	channel->revents |= G_IO_ERR;

383
      if (channel->debug)
384
	g_print ("read_thread %#x: read() returned %d, rdp=%d, wrp=%d\n",
385
		 channel->thread_id, nbytes, channel->rdp, channel->wrp);
386 387 388 389

      if (nbytes <= 0)
	break;

390
      channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE;
391
      if (channel->debug)
392
	g_print ("read_thread %#x: rdp=%d, wrp=%d, setting data_avail\n",
393
		 channel->thread_id, channel->rdp, channel->wrp);
394 395 396 397
      SetEvent (channel->data_avail_event);
    }
  
  channel->running = FALSE;
398 399 400
  if (channel->needs_close)
    {
      if (channel->debug)
401
	g_print ("read_thread %#x: channel fd %d needs closing\n",
402
		 channel->thread_id, channel->fd);
403
      close (channel->fd);
404 405 406
      channel->fd = -1;
    }

407 408 409
  if (channel->debug)
    g_print ("read_thread %#x: EOF, rdp=%d, wrp=%d, setting data_avail\n",
	     channel->thread_id, channel->rdp, channel->wrp);
410
  SetEvent (channel->data_avail_event);
411
  LeaveCriticalSection (&channel->mutex);
412
  
413
  g_io_channel_unref ((GIOChannel *)channel);
414
  
415 416 417
  /* No need to call _endthreadex(), the actual thread starter routine
   * in MSVCRT (see crt/src/threadex.c:_threadstartex) calls
   * _endthreadex() for us.
418
   */
419

420 421
  return 0;
}
422

423 424 425 426 427
static unsigned __stdcall
write_thread (void *parameter)
{
  GIOWin32Channel *channel = parameter;
  guchar *buffer;
428
  gint nbytes;
429 430 431 432

  g_io_channel_ref ((GIOChannel *)channel);

  if (channel->debug)
433
    g_print ("write_thread %#x: start fd=%d, data_avail=%p space_avail=%p\n",
434 435
	     channel->thread_id,
	     channel->fd,
436 437
	     channel->data_avail_event,
	     channel->space_avail_event);
438 439 440 441 442 443 444 445 446 447 448 449 450 451
  
  channel->direction = 1;
  channel->buffer = g_malloc (BUFFER_SIZE);
  channel->rdp = channel->wrp = 0;
  channel->running = TRUE;

  SetEvent (channel->space_avail_event);

  /* We use the same event objects as for a reader thread, but with
   * reversed meaning. So, space_avail is used if data is available
   * for writing, and data_avail is used if space is available in the
   * write buffer.
   */

452
  EnterCriticalSection (&channel->mutex);
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
  while (channel->running || channel->rdp != channel->wrp)
    {
      if (channel->debug)
	g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
		 channel->thread_id, channel->rdp, channel->wrp);
      if (channel->wrp == channel->rdp)
	{
	  /* Buffer is empty. */
	  if (channel->debug)
	    g_print ("write_thread %#x: resetting space_avail\n",
		     channel->thread_id);
	  ResetEvent (channel->space_avail_event);
	  if (channel->debug)
	    g_print ("write_thread %#x: waiting for data\n",
		     channel->thread_id);
	  channel->revents = G_IO_OUT;
	  SetEvent (channel->data_avail_event);
470
	  LeaveCriticalSection (&channel->mutex);
471 472
	  WaitForSingleObject (channel->space_avail_event, INFINITE);

473
	  EnterCriticalSection (&channel->mutex);
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
	  if (channel->rdp == channel->wrp)
	    break;

	  if (channel->debug)
	    g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
		     channel->thread_id, channel->rdp, channel->wrp);
	}
      
      buffer = channel->buffer + channel->rdp;
      if (channel->rdp < channel->wrp)
	nbytes = channel->wrp - channel->rdp;
      else
	nbytes = BUFFER_SIZE - channel->rdp;

      if (channel->debug)
	g_print ("write_thread %#x: calling write() for %d bytes\n",
		 channel->thread_id, nbytes);

492
      LeaveCriticalSection (&channel->mutex);
493
      nbytes = write (channel->fd, buffer, nbytes);
494
      EnterCriticalSection (&channel->mutex);
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526

      if (channel->debug)
	g_print ("write_thread %#x: write(%i) returned %d, rdp=%d, wrp=%d\n",
		 channel->thread_id, channel->fd, nbytes, channel->rdp, channel->wrp);

      channel->revents = 0;
      if (nbytes > 0)
	channel->revents |= G_IO_OUT;
      else if (nbytes <= 0)
	channel->revents |= G_IO_ERR;

      channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE;

      if (nbytes <= 0)
	break;

      if (channel->debug)
	g_print ("write_thread: setting data_avail for thread %#x\n",
		 channel->thread_id);
      SetEvent (channel->data_avail_event);
    }
  
  channel->running = FALSE;
  if (channel->needs_close)
    {
      if (channel->debug)
	g_print ("write_thread %#x: channel fd %d needs closing\n",
		 channel->thread_id, channel->fd);
      close (channel->fd);
      channel->fd = -1;
    }

527
  LeaveCriticalSection (&channel->mutex);
528 529 530 531 532 533
  
  g_io_channel_unref ((GIOChannel *)channel);
  
  return 0;
}

534
static void
535 536 537
create_thread (GIOWin32Channel     *channel,
	       GIOCondition         condition,
	       unsigned (__stdcall *thread) (void *parameter))
538
{
539
  HANDLE thread_handle;
540
  int errsv;
541 542 543

  thread_handle = (HANDLE) _beginthreadex (NULL, 0, thread, channel, 0,
					   &channel->thread_id);
544
  errsv = errno;
545
  if (thread_handle == 0)
546
    g_warning ("Error creating thread: %s.",
547
	       g_strerror (errsv));
548
  else if (!CloseHandle (thread_handle))
549 550 551 552 553 554
    {
      gchar *emsg = g_win32_error_message (GetLastError ());

      g_warning ("Error closing thread handle: %s.", emsg);
      g_free (emsg);
    }
555

556 557
  WaitForSingleObject (channel->space_avail_event, INFINITE);
}
558

559
static GIOStatus
560
buffer_read (GIOWin32Channel *channel,
561
	     gchar           *dest,
562 563 564
	     gsize            count,
	     gsize           *bytes_read,
	     GError         **err)
565
{
566 567 568
  guint nbytes;
  guint left = count;
  
569
  EnterCriticalSection (&channel->mutex);
570
  if (channel->debug)
571
    g_print ("reading from thread %#x %" G_GSIZE_FORMAT " bytes, rdp=%d, wrp=%d\n",
572
	     channel->thread_id, count, channel->rdp, channel->wrp);
573
  
574
  if (channel->wrp == channel->rdp)
575
    {
576
      LeaveCriticalSection (&channel->mutex);
577 578 579
      if (channel->debug)
	g_print ("waiting for data from thread %#x\n", channel->thread_id);
      WaitForSingleObject (channel->data_avail_event, INFINITE);
580 581
      if (channel->debug)
	g_print ("done waiting for data from thread %#x\n", channel->thread_id);
582
      EnterCriticalSection (&channel->mutex);
583
      if (channel->wrp == channel->rdp && !channel->running)
584
	{
585 586
	  if (channel->debug)
	    g_print ("wrp==rdp, !running\n");
587
	  LeaveCriticalSection (&channel->mutex);
588
          *bytes_read = 0;
589
	  return G_IO_STATUS_EOF;
590
	}
591
    }
592 593 594 595 596
  
  if (channel->rdp < channel->wrp)
    nbytes = channel->wrp - channel->rdp;
  else
    nbytes = BUFFER_SIZE - channel->rdp;
597
  LeaveCriticalSection (&channel->mutex);
598 599 600 601 602 603 604
  nbytes = MIN (left, nbytes);
  if (channel->debug)
    g_print ("moving %d bytes from thread %#x\n",
	     nbytes, channel->thread_id);
  memcpy (dest, channel->buffer + channel->rdp, nbytes);
  dest += nbytes;
  left -= nbytes;
605
  EnterCriticalSection (&channel->mutex);
606 607
  channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE;
  if (channel->debug)
608
    g_print ("setting space_avail for thread %#x\n", channel->thread_id);
609 610 611 612
  SetEvent (channel->space_avail_event);
  if (channel->debug)
    g_print ("for thread %#x: rdp=%d, wrp=%d\n",
	     channel->thread_id, channel->rdp, channel->wrp);
613
  if (channel->running && channel->wrp == channel->rdp)
614 615
    {
      if (channel->debug)
616
	g_print ("resetting data_avail of thread %#x\n",
617 618 619
		 channel->thread_id);
      ResetEvent (channel->data_avail_event);
    };
620
  LeaveCriticalSection (&channel->mutex);
621
  
622 623 624
  /* We have no way to indicate any errors form the actual
   * read() or recv() call in the reader thread. Should we have?
   */
625 626
  *bytes_read = count - left;
  return (*bytes_read > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
627
}
628

629 630 631

static GIOStatus
buffer_write (GIOWin32Channel *channel,
632
	      const gchar     *dest,
633 634 635 636 637 638 639
	      gsize            count,
	      gsize           *bytes_written,
	      GError         **err)
{
  guint nbytes;
  guint left = count;
  
640
  EnterCriticalSection (&channel->mutex);
641
  if (channel->debug)
642
    g_print ("buffer_write: writing to thread %#x %" G_GSIZE_FORMAT " bytes, rdp=%d, wrp=%d\n",
643 644 645 646 647 648 649 650 651 652 653 654
	     channel->thread_id, count, channel->rdp, channel->wrp);
  
  if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
    {
      /* Buffer is full */
      if (channel->debug)
	g_print ("buffer_write: tid %#x: resetting data_avail\n",
		 channel->thread_id);
      ResetEvent (channel->data_avail_event);
      if (channel->debug)
	g_print ("buffer_write: tid %#x: waiting for space\n",
		 channel->thread_id);
655
      LeaveCriticalSection (&channel->mutex);
656
      WaitForSingleObject (channel->data_avail_event, INFINITE);
657
      EnterCriticalSection (&channel->mutex);
658 659 660 661 662 663 664 665
      if (channel->debug)
	g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d\n",
		 channel->thread_id, channel->rdp, channel->wrp);
    }
   
  nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE,
		BUFFER_SIZE - channel->wrp);

666
  LeaveCriticalSection (&channel->mutex);
667 668 669 670 671 672 673
  nbytes = MIN (left, nbytes);
  if (channel->debug)
    g_print ("buffer_write: tid %#x: writing %d bytes\n",
	     channel->thread_id, nbytes);
  memcpy (channel->buffer + channel->wrp, dest, nbytes);
  dest += nbytes;
  left -= nbytes;
674
  EnterCriticalSection (&channel->mutex);
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690

  channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE;
  if (channel->debug)
    g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d, setting space_avail\n",
	     channel->thread_id, channel->rdp, channel->wrp);
  SetEvent (channel->space_avail_event);

  if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
    {
      /* Buffer is full */
      if (channel->debug)
	g_print ("buffer_write: tid %#x: resetting data_avail\n",
		 channel->thread_id);
      ResetEvent (channel->data_avail_event);
    }

691
  LeaveCriticalSection (&channel->mutex);
692 693 694 695 696 697 698 699 700
  
  /* We have no way to indicate any errors form the actual
   * write() call in the writer thread. Should we have?
   */
  *bytes_written = count - left;
  return (*bytes_written > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
}


701
static gboolean
702 703
g_io_win32_prepare (GSource *source,
		    gint    *timeout)
704
{
705
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
706
  GIOCondition buffer_condition = g_io_channel_get_buffer_condition (watch->channel);
707
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
708
  int event_mask;
709
  
710
  *timeout = -1;
711
  
712 713 714
  if (channel->debug)
    g_print ("g_io_win32_prepare: source=%p channel=%p", source, channel);

715
  switch (channel->type)
716
    {
717
    case G_IO_WIN32_WINDOWS_MESSAGES:
718 719 720 721
      if (channel->debug)
	g_print (" MSG");
      break;

722
    case G_IO_WIN32_CONSOLE:
723 724
      if (channel->debug)
	g_print (" CON");
725 726 727 728
      break;

    case G_IO_WIN32_FILE_DESC:
      if (channel->debug)
729 730
	g_print (" FD thread=%#x buffer_condition:{%s}"
		 "\n  watch->pollfd.events:{%s} watch->pollfd.revents:{%s} channel->revents:{%s}",
731 732 733 734 735
		 channel->thread_id, condition_to_string (buffer_condition),
		 condition_to_string (watch->pollfd.events),
		 condition_to_string (watch->pollfd.revents),
		 condition_to_string (channel->revents));
      
736
      EnterCriticalSection (&channel->mutex);
737
      if (channel->running)
738
	{
739 740 741
	  if (channel->direction == 0 && channel->wrp == channel->rdp)
	    {
	      if (channel->debug)
742
		g_print ("\n  setting revents=0");
743 744
	      channel->revents = 0;
	    }
745
	}
746 747 748 749 750 751
      else
	{
	  if (channel->direction == 1
	      && (channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
	    {
	      if (channel->debug)
752
		g_print ("\n setting revents=0");
753 754 755
	      channel->revents = 0;
	    }
	}	  
756
      LeaveCriticalSection (&channel->mutex);
757
      break;
758

759
    case G_IO_WIN32_SOCKET:
760 761
      if (channel->debug)
	g_print (" SOCK");
762 763 764 765 766
      event_mask = 0;
      if (watch->condition & G_IO_IN)
	event_mask |= (FD_READ | FD_ACCEPT);
      if (watch->condition & G_IO_OUT)
	event_mask |= (FD_WRITE | FD_CONNECT);
767
      event_mask |= FD_CLOSE;
768

769
      if (channel->event_mask != event_mask)
770 771
	{
	  if (channel->debug)
772 773
	    g_print ("\n  WSAEventSelect(%d,%p,{%s})",
		     channel->fd, (HANDLE) watch->pollfd.fd,
774 775 776
		     event_mask_to_string (event_mask));
	  if (WSAEventSelect (channel->fd, (HANDLE) watch->pollfd.fd,
			      event_mask) == SOCKET_ERROR)
777 778 779 780 781 782 783
	    if (channel->debug)
	      {
		gchar *emsg = g_win32_error_message (WSAGetLastError ());

		g_print (" failed: %s", emsg);
		g_free (emsg);
	      }
784
	  channel->event_mask = event_mask;
Tor Lillqvist's avatar
Tor Lillqvist committed
785

786 787
	  if (channel->debug)
	    g_print ("\n  setting last_events=0");
788
	  channel->last_events = 0;
789

790 791 792
	  if ((event_mask & FD_WRITE) &&
	      channel->ever_writable &&
	      !channel->write_would_have_blocked)
793 794
	    {
	      if (channel->debug)
795
		g_print (" WSASetEvent(%p)", (WSAEVENT) watch->pollfd.fd);
796 797
	      WSASetEvent ((WSAEVENT) watch->pollfd.fd);
	    }
798 799 800 801 802
	}
      break;

    default:
      g_assert_not_reached ();
803
      g_abort ();
804
    }
805 806 807
  if (channel->debug)
    g_print ("\n");

808
  return ((watch->condition & buffer_condition) == watch->condition);
809 810
}

811
static gboolean
812
g_io_win32_check (GSource *source)
813
{
814
  MSG msg;
815
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
816
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
817
  GIOCondition buffer_condition = g_io_channel_get_buffer_condition (watch->channel);
818
  WSANETWORKEVENTS events;
819

820 821 822
  if (channel->debug)
    g_print ("g_io_win32_check: source=%p channel=%p", source, channel);

823
  switch (channel->type)
824
    {
825
    case G_IO_WIN32_WINDOWS_MESSAGES:
826 827
      if (channel->debug)
	g_print (" MSG\n");
828
      return (PeekMessage (&msg, channel->hwnd, 0, 0, PM_NOREMOVE));
829 830

    case G_IO_WIN32_FILE_DESC:
831
      if (channel->debug)
832
	g_print (" FD thread=%#x buffer_condition=%s\n"
833 834 835 836 837 838 839 840 841 842
		 "  watch->pollfd.events={%s} watch->pollfd.revents={%s} channel->revents={%s}\n",
		 channel->thread_id, condition_to_string (buffer_condition),
		 condition_to_string (watch->pollfd.events),
		 condition_to_string (watch->pollfd.revents),
		 condition_to_string (channel->revents));
      
      watch->pollfd.revents = (watch->pollfd.events & channel->revents);

      return ((watch->pollfd.revents | buffer_condition) & watch->condition);

843
    case G_IO_WIN32_CONSOLE:
844 845
      if (channel->debug)
	g_print (" CON\n");
846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867
      if (watch->channel->is_writeable)
	return TRUE;
      else if (watch->channel->is_readable)
        {
	  INPUT_RECORD buffer;
	  DWORD n;
	  if (PeekConsoleInput ((HANDLE) watch->pollfd.fd, &buffer, 1, &n) &&
	      n == 1)
	    {
	      /* _kbhit() does quite complex processing to find out
	       * whether at least one of the key events pending corresponds
	       * to a "real" character that can be read.
	       */
	      if (_kbhit ())
		return TRUE;
	      
	      /* Discard all other kinds of events */
	      ReadConsoleInput ((HANDLE) watch->pollfd.fd, &buffer, 1, &n);
	    }
        }
      return FALSE;

868
    case G_IO_WIN32_SOCKET:
869 870
      if (channel->debug)
	g_print (" SOCK");
871 872 873
      if (channel->last_events & FD_WRITE)
	{
	  if (channel->debug)
874 875
	    g_print (" sock=%d event=%p last_events has FD_WRITE",
		     channel->fd, (HANDLE) watch->pollfd.fd);
876 877 878 879 880 881
	}
      else
	{
	  WSAEnumNetworkEvents (channel->fd, 0, &events);

	  if (channel->debug)
882 883
	    g_print ("\n  revents={%s} condition={%s}"
		     "\n  WSAEnumNetworkEvents(%d,0) sets events={%s}",
884 885
		     condition_to_string (watch->pollfd.revents),
		     condition_to_string (watch->condition),
886
		     channel->fd, 
887 888 889 890 891 892 893 894
		     event_mask_to_string (events.lNetworkEvents));
	  
	  if (watch->pollfd.revents != 0 &&
	      events.lNetworkEvents == 0 &&
	      !(channel->event_mask & FD_WRITE))
	    {
	      channel->event_mask = 0;
	      if (channel->debug)
895 896
		g_print ("\n  WSAEventSelect(%d,%p,{})",
			 channel->fd, (HANDLE) watch->pollfd.fd);
897 898
	      WSAEventSelect (channel->fd, (HANDLE) watch->pollfd.fd, 0);
	      if (channel->debug)
899 900
		g_print ("  ResetEvent(%p)",
			 (HANDLE) watch->pollfd.fd);
901 902
	      ResetEvent ((HANDLE) watch->pollfd.fd);
	    }
903 904
	  else if (events.lNetworkEvents & FD_WRITE)
	    channel->ever_writable = TRUE;
905 906
	  channel->last_events = events.lNetworkEvents;
	}
907

908 909 910
      watch->pollfd.revents = 0;
      if (channel->last_events & (FD_READ | FD_ACCEPT))
	watch->pollfd.revents |= G_IO_IN;
911

912
      if (channel->last_events & FD_WRITE)
913
	watch->pollfd.revents |= G_IO_OUT;
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928
      else
	{
	  /* We have called WSAEnumNetworkEvents() above but it didn't
	   * set FD_WRITE.
	   */
	  if (events.lNetworkEvents & FD_CONNECT)
	    {
	      if (events.iErrorCode[FD_CONNECT_BIT] == 0)
		watch->pollfd.revents |= G_IO_OUT;
	      else
		watch->pollfd.revents |= (G_IO_HUP | G_IO_ERR);
	    }
	  if (watch->pollfd.revents == 0 && (channel->last_events & (FD_CLOSE)))
	    watch->pollfd.revents |= G_IO_HUP;
	}
929

930
      /* Regardless of WSAEnumNetworkEvents() result, if watching for
931 932 933
       * writability, and if we have ever got a FD_WRITE event, and
       * unless last write would have blocked, set G_IO_OUT. But never
       * set both G_IO_OUT and G_IO_HUP.
934 935
       */
      if (!(watch->pollfd.revents & G_IO_HUP) &&
936
	  channel->ever_writable &&
937 938 939
	  !channel->write_would_have_blocked &&
	  (channel->event_mask & FD_WRITE))
	watch->pollfd.revents |= G_IO_OUT;
940

941 942 943 944 945
      if (channel->debug)
	g_print ("\n  revents={%s} retval={%s}\n",
		 condition_to_string (watch->pollfd.revents),
		 condition_to_string ((watch->pollfd.revents | buffer_condition) & watch->condition));

946 947 948 949
      return ((watch->pollfd.revents | buffer_condition) & watch->condition);

    default:
      g_assert_not_reached ();
950
      g_abort ();
951
    }
952 953 954
}

static gboolean
955 956 957
g_io_win32_dispatch (GSource     *source,
		     GSourceFunc  callback,
		     gpointer     user_data)
958
{
959
  GIOFunc func = (GIOFunc)callback;
960
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
961
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
962
  GIOCondition buffer_condition = g_io_channel_get_buffer_condition (watch->channel);
963
  
964 965
  if (!func)
    {
966
      g_warning ("IO Watch dispatched without callback. "
967 968 969 970
		 "You must call g_source_connect().");
      return FALSE;
    }
  
971 972 973 974 975 976
  if (channel->debug)
    g_print ("g_io_win32_dispatch: pollfd.revents=%s condition=%s result=%s\n",
	     condition_to_string (watch->pollfd.revents),
	     condition_to_string (watch->condition),
	     condition_to_string ((watch->pollfd.revents | buffer_condition) & watch->condition));

977
  return (*func) (watch->channel,
978
		  (watch->pollfd.revents | buffer_condition) & watch->condition,
979
		  user_data);
980 981 982
}

static void
983
g_io_win32_finalize (GSource *source)
984
{
985
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
986
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
987
  
988 989 990
  if (channel->debug)
    g_print ("g_io_win32_finalize: source=%p channel=%p", source, channel);

991 992 993
  switch (channel->type)
    {
    case G_IO_WIN32_WINDOWS_MESSAGES:
994 995 996 997
      if (channel->debug)
	g_print (" MSG");
      break;

998
    case G_IO_WIN32_CONSOLE:
999 1000
      if (channel->debug)
	g_print (" CON");
1001
      break;
1002

1003 1004
    case G_IO_WIN32_FILE_DESC:
      if (channel->debug)
1005
	g_print (" FD thread=%#x", channel->thread_id);
1006 1007 1008 1009
      break;

    case G_IO_WIN32_SOCKET:
      if (channel->debug)
1010
	g_print (" SOCK sock=%d", channel->fd);
1011 1012 1013 1014
      break;

    default:
      g_assert_not_reached ();
1015
      g_abort ();
1016
    }
1017 1018
  if (channel->debug)
    g_print ("\n");
1019
  g_io_channel_unref (watch->channel);
1020 1021
}

1022
GSourceFuncs g_io_watch_funcs = {
1023 1024 1025
  g_io_win32_prepare,
  g_io_win32_check,
  g_io_win32_dispatch,
1026
  g_io_win32_finalize
1027
};
1028

1029
static GIOStatus
1030 1031
g_io_win32_msg_read (GIOChannel *channel,
		     gchar      *buf,
1032 1033 1034
		     gsize       count,
		     gsize      *bytes_read,
		     GError    **err)
1035
{
1036
  GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
1037
  MSG msg;               /* In case of alignment problems */
1038 1039 1040

  *bytes_read = 0;

1041
  if (count < sizeof (MSG))
1042
    {
1043 1044
      g_set_error_literal (err, G_IO_CHANNEL_ERROR, G_IO_CHANNEL_ERROR_INVAL,
                           "Incorrect message size"); /* Informative enough error message? */
1045 1046
      return G_IO_STATUS_ERROR;
    }
1047
  
1048
  if (win32_channel->debug)
1049 1050
    g_print ("g_io_win32_msg_read: channel=%p hwnd=%p\n",
	     channel, win32_channel->hwnd);
1051
  if (!PeekMessage (&msg, win32_channel->hwnd, 0, 0, PM_REMOVE))
1052
    return G_IO_STATUS_AGAIN;
1053

1054 1055
  memmove (buf, &msg, sizeof (MSG));
  *bytes_read = sizeof (MSG);
1056

1057
  return G_IO_STATUS_NORMAL;
1058
}
1059

1060 1061 1062 1063 1064 1065
static GIOStatus
g_io_win32_msg_write (GIOChannel  *channel,
		      const gchar *buf,
		      gsize        count,
		      gsize       *bytes_written,
		      GError     **err)
1066
{
1067
  GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
1068
  MSG msg;
1069 1070 1071

  *bytes_written = 0;

1072
  if (count != sizeof (MSG))
1073
    {
1074 1075
      g_set_error_literal (err, G_IO_CHANNEL_ERROR, G_IO_CHANNEL_ERROR_INVAL,
                           "Incorrect message size"); /* Informative enough error message? */
1076 1077
      return G_IO_STATUS_ERROR;
    }
1078
  
1079 1080 1081
  /* In case of alignment problems */
  memmove (&msg, buf, sizeof (MSG));
  if (!PostMessage (win32_channel->hwnd, msg.message, msg.wParam, msg.lParam))
1082
    {
1083
      gchar *emsg = g_win32_error_message (GetLastError ());
1084

1085
      g_set_error_literal (err, G_IO_CHANNEL_ERROR, G_IO_CHANNEL_ERROR_FAILED, emsg);
1086
      g_free (emsg);
1087

1088 1089 1090
      return G_IO_STATUS_ERROR;
    }

1091
  *bytes_written = sizeof (MSG);
1092 1093

  return G_IO_STATUS_NORMAL;
1094 1095
}

1096 1097 1098
static GIOStatus
g_io_win32_msg_close (GIOChannel *channel,
		      GError    **err)
1099 1100
{
  /* Nothing to be done. Or should we set hwnd to some invalid value? */
1101 1102

  return G_IO_STATUS_NORMAL;
1103 1104
}

1105
static void
1106 1107
g_io_win32_free (GIOChannel *channel)
{
1108
  GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
1109
  
1110
  if (win32_channel->debug)
1111
    g_print ("g_io_win32_free channel=%p fd=%d\n", channel, win32_channel->fd);
1112

1113 1114
  DeleteCriticalSection (&win32_channel->mutex);

1115
  if (win32_channel->data_avail_event)
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
    if (!CloseHandle (win32_channel->data_avail_event))
      if (win32_channel->debug)
	{
	  gchar *emsg = g_win32_error_message (GetLastError ());

	  g_print ("  CloseHandle(%p) failed: %s\n",
		   win32_channel->data_avail_event, emsg);
	  g_free (emsg);
	}

  g_free (win32_channel->buffer);

1128
  if (win32_channel->space_avail_event)
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
    if (!CloseHandle (win32_channel->space_avail_event))
      if (win32_channel->debug)
	{
	  gchar *emsg = g_win32_error_message (GetLastError ());

	  g_print ("  CloseHandle(%p) failed: %s\n",
		   win32_channel->space_avail_event, emsg);
	  g_free (emsg);
	}

1139 1140
  if (win32_channel->type == G_IO_WIN32_SOCKET &&
      win32_channel->fd != -1)
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
    if (WSAEventSelect (win32_channel->fd, NULL, 0) == SOCKET_ERROR)
      if (win32_channel->debug)
	{
	  gchar *emsg = g_win32_error_message (WSAGetLastError ());

	  g_print ("  WSAEventSelect(%d,NULL,{}) failed: %s\n",
		   win32_channel->fd, emsg);
	  g_free (emsg);
	}

  if (win32_channel->event)
    if (!WSACloseEvent (win32_channel->event))
      if (win32_channel->debug)
	{
	  gchar *emsg = g_win32_error_message (WSAGetLastError ());

	  g_print ("  WSACloseEvent(%p) failed: %s\n",
		   win32_channel->event, emsg);
	  g_free (emsg);
	}
1161 1162 1163 1164

  g_free (win32_channel);
}

1165
static GSource *
1166 1167
g_io_win32_msg_create_watch (GIOChannel   *channel,
			     GIOCondition  condition)
1168
{
1169 1170 1171
  GIOWin32Watch *watch;
  GSource *source;

1172
  source = g_source_new (&g_io_watch_funcs, sizeof (GIOWin32Watch));
1173
  g_source_set_name (source, "GIOChannel (Win32)");
1174
  watch = (GIOWin32Watch *)source;
1175 1176 1177
  
  watch->channel = channel;
  g_io_channel_ref (channel);
1178
  
1179
  watch->condition = condition;
1180
  
1181
  watch->pollfd.fd = (gintptr) G_WIN32_MSG_HANDLE;
1182
  watch->pollfd.events = condition;
1183
  
1184
  g_source_add_poll (source, &watch->pollfd);
1185
  
1186
  return source;
1187 1188
}

1189
static GIOStatus
1190 1191 1192 1193 1194
g_io_win32_fd_and_console_read (GIOChannel *channel,
				gchar      *buf,
				gsize       count,
				gsize      *bytes_read,
				GError    **err)
1195
{
1196
  GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
1197
  gint result;
1198
  int errsv;
1199
  
1200
  if (win32_channel->debug)
1201
    g_print ("g_io_win32_fd_read: fd=%d count=%" G_GSIZE_FORMAT "\n",
1202 1203
	     win32_channel->fd, count);
  
1204 1205
  if (win32_channel->thread_id)
    {
1206
      return buffer_read (win32_channel, buf, count, bytes_read, err);
1207
    }
1208 1209

  result = read (win32_channel->fd, buf, count);
1210
  errsv = errno;
1211

1212
  if (win32_channel->debug)
1213
    g_print ("g_io_win32_fd_read: read() => %d\n", result);
1214

1215 1216 1217
  if (result < 0)
    {
      *bytes_read = 0;
1218

1219
      switch (errsv)
1220 1221
        {
#ifdef EAGAIN
1222 1223
	case EAGAIN:
	  return G_IO_STATUS_AGAIN;
1224
#endif
1225
	default:
1226
	  g_set_error_literal (err, G_IO_CHANNEL_ERROR,
1227 1228
                               g_io_channel_error_from_errno (errsv),
                               g_strerror (errsv));