GitLab repository storage has been migrated to hashed layout. Please contact Infrastructure team if you notice any issues with repositories or hooks.

giowin32.c 55.9 KB
Newer Older
1 2 3 4 5
/* GLIB - Library of useful routines for C programming
 * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
 *
 * giowin32.c: IO Channels for Win32.
 * Copyright 1998 Owen Taylor and Tor Lillqvist
6
 * Copyright 1999-2000 Tor Lillqvist and Craig Setera
7
 * Copyright 2001-2003 Andrew Lanoix
8 9
 *
 * This library is free software; you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public
11 12 13 14 15
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
 * Lesser General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public
20 21 22 23 24
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

25
/*
26
 * Modified by the GLib Team and others 1997-2000.  See the AUTHORS
27 28
 * file for a list of people on the GLib Team.  See the ChangeLog
 * files for a list of changes.  These files are distributed with
29
 * GLib at ftp://ftp.gtk.org/pub/gtk/.
30 31
 */

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
/*
 * Bugs that are related to the code in this file:
 *
 * Bug 137968 - Sometimes a GIOFunc on Win32 is called with zero condition
 * http://bugzilla.gnome.org/show_bug.cgi?id=137968
 *
 * Bug 324234 - Using g_io_add_watch_full() to wait for connect() to return on a non-blocking socket returns prematurely
 * http://bugzilla.gnome.org/show_bug.cgi?id=324234
 *
 * Bug 331214 - g_io_channel async socket io stalls
 * http://bugzilla.gnome.org/show_bug.cgi?id=331214
 *
 * Bug 338943 - Multiple watches on the same socket
 * http://bugzilla.gnome.org/show_bug.cgi?id=338943
 *
 * Bug 357674 - 2 serious bugs in giowin32.c making glib iochannels useless
 * http://bugzilla.gnome.org/show_bug.cgi?id=357674
 *
 * Bug 425156 - GIOChannel deadlocks on a win32 socket
 * http://bugzilla.gnome.org/show_bug.cgi?id=425156
 *
 * Bug 468910 - giofunc condition=0
 * http://bugzilla.gnome.org/show_bug.cgi?id=468910
 *
 * Bug 500246 - Bug fixes for giowin32
 * http://bugzilla.gnome.org/show_bug.cgi?id=500246
 *
 * Bug 548278 - Async GETs connections are always terminated unexpectedly on windows
 * http://bugzilla.gnome.org/show_bug.cgi?id=548278
 *
 * Bug 548536 - giowin32 problem when adding and removing watches
 * http://bugzilla.gnome.org/show_bug.cgi?id=548536
 *
 * When fixing bugs related to the code in this file, either the above
 * bugs or others, make sure that the test programs attached to the
 * above bugs continue to work.
 */

Owen Taylor's avatar
Owen Taylor committed
70 71
#include "config.h"

72
#include "glib.h"
73 74

#include <stdlib.h>
75
#include <winsock2.h>
76
#include <windows.h>
77
#include <conio.h>
78 79
#include <fcntl.h>
#include <io.h>
80
#include <process.h>
81
#include <errno.h>
82
#include <sys/stat.h>
83

84
#include "gstdio.h"
85 86
#include "glibintl.h"

87 88
#include "galias.h"

89 90 91
typedef struct _GIOWin32Channel GIOWin32Channel;
typedef struct _GIOWin32Watch GIOWin32Watch;

92
#define BUFFER_SIZE 4096
93 94

typedef enum {
95
  G_IO_WIN32_WINDOWS_MESSAGES,	/* Windows messages */
Tor Lillqvist's avatar
Tor Lillqvist committed
96

97
  G_IO_WIN32_FILE_DESC,		/* Unix-like file descriptors from
Tor Lillqvist's avatar
Tor Lillqvist committed
98 99 100
				 * _open() or _pipe(), except for
				 * console IO. Separate thread to read
				 * or write.
101
				 */
Tor Lillqvist's avatar
Tor Lillqvist committed
102

103
  G_IO_WIN32_CONSOLE,		/* Console IO (usually stdin, stdout, stderr) */
Tor Lillqvist's avatar
Tor Lillqvist committed
104 105

  G_IO_WIN32_SOCKET		/* Sockets. No separate thread. */
106 107 108 109 110 111 112 113 114
} GIOWin32ChannelType;

struct _GIOWin32Channel {
  GIOChannel channel;
  gint fd;			/* Either a Unix-like file handle as provided
				 * by the Microsoft C runtime, or a SOCKET
				 * as provided by WinSock.
				 */
  GIOWin32ChannelType type;
115
  
116 117
  gboolean debug;

Tor Lillqvist's avatar
Tor Lillqvist committed
118 119
  /* Field used by G_IO_WIN32_WINDOWS_MESSAGES channels */
  HWND hwnd;			/* Handle of window, or NULL */
120
  
Tor Lillqvist's avatar
Tor Lillqvist committed
121
  /* Fields used by G_IO_WIN32_FILE_DESC channels. */
122 123
  CRITICAL_SECTION mutex;

124 125 126 127
  int direction;		/* 0 means we read from it,
				 * 1 means we write to it.
				 */

Tor Lillqvist's avatar
Tor Lillqvist committed
128 129 130
  gboolean running;		/* Is reader or writer thread
				 * running. FALSE if EOF has been
				 * reached by the reader thread.
131
				 */
Tor Lillqvist's avatar
Tor Lillqvist committed
132

133 134 135
  gboolean needs_close;		/* If the channel has been closed while
				 * the reader thread was still running.
				 */
Tor Lillqvist's avatar
Tor Lillqvist committed
136 137 138 139

  guint thread_id;		/* If non-NULL the channel has or has
				 * had a reader or writer thread.
				 */
140 141 142 143
  HANDLE data_avail_event;

  gushort revents;

144
  /* Data is kept in a circular buffer. To be able to distinguish between
Tor Lillqvist's avatar
Tor Lillqvist committed
145
   * empty and full buffers, we cannot fill it completely, but have to
146 147 148 149 150 151 152 153 154 155 156
   * leave a one character gap.
   *
   * Data available is between indexes rdp and wrp-1 (modulo BUFFER_SIZE).
   *
   * Empty:    wrp == rdp
   * Full:     (wrp + 1) % BUFFER_SIZE == rdp
   * Partial:  otherwise
   */
  guchar *buffer;		/* (Circular) buffer */
  gint wrp, rdp;		/* Buffer indices for writing and reading */
  HANDLE space_avail_event;
157

Tor Lillqvist's avatar
Tor Lillqvist committed
158
  /* Fields used by G_IO_WIN32_SOCKET channels */
159 160
  int event_mask;
  int last_events;
161
  HANDLE event;
162
  gboolean write_would_have_blocked;
163
  gboolean ever_writable;
164 165
};

166 167 168
#define LOCK(mutex) EnterCriticalSection (&mutex)
#define UNLOCK(mutex) LeaveCriticalSection (&mutex)

169
struct _GIOWin32Watch {
170
  GSource       source;
171 172 173 174 175
  GPollFD       pollfd;
  GIOChannel   *channel;
  GIOCondition  condition;
};

176
static void
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
g_win32_print_access_mode (int flags)
{
  g_print ("%s%s%s%s%s%s%s%s%s%s",
	   ((flags & 0x3) == _O_RDWR ? "O_RDWR" :
	    ((flags & 0x3) == _O_RDONLY ? "O_RDONLY" :
	     ((flags & 0x3) == _O_WRONLY ? "O_WRONLY" : "0"))),
	   (flags & _O_APPEND ? "|O_APPEND" : ""),
	   (flags & _O_RANDOM ? "|O_RANDOM" : ""),
	   (flags & _O_SEQUENTIAL ? "|O_SEQUENTIAL" : ""),
	   (flags & _O_TEMPORARY ? "|O_TEMPORARY" : ""),
	   (flags & _O_CREAT ? "|O_CREAT" : ""),
	   (flags & _O_TRUNC ? "|O_TRUNC" : ""),
	   (flags & _O_EXCL ? "|O_EXCL" : ""),
	   (flags & _O_TEXT ? "|O_TEXT" : ""),
	   (flags & _O_BINARY ? "|O_BINARY" : ""));
}

static void
g_win32_print_gioflags (GIOFlags flags)
{
  char *bar = "";

  if (flags & G_IO_FLAG_APPEND)
    bar = "|", g_print ("APPEND");
  if (flags & G_IO_FLAG_NONBLOCK)
    g_print ("%sNONBLOCK", bar), bar = "|";
  if (flags & G_IO_FLAG_IS_READABLE)
    g_print ("%sREADABLE", bar), bar = "|";
  if (flags & G_IO_FLAG_IS_WRITEABLE)
    g_print ("%sWRITEABLE", bar), bar = "|";
  if (flags & G_IO_FLAG_IS_SEEKABLE)
    g_print ("%sSEEKABLE", bar), bar = "|";
}

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
static const char *
event_mask_to_string (int mask)
{
  char buf[100];
  int checked_bits = 0;
  char *bufp = buf;

  if (mask == 0)
    return "";

#define BIT(n) checked_bits |= FD_##n; if (mask & FD_##n) bufp += sprintf (bufp, "%s" #n, (bufp>buf ? "|" : ""))

  BIT (READ);
  BIT (WRITE);
  BIT (OOB);
  BIT (ACCEPT);
  BIT (CONNECT);
  BIT (CLOSE);
  BIT (QOS);
  BIT (GROUP_QOS);
  BIT (ROUTING_INTERFACE_CHANGE);
  BIT (ADDRESS_LIST_CHANGE);
  
#undef BIT

  if ((mask & ~checked_bits) != 0)
	  bufp += sprintf (bufp, "|%#x", mask & ~checked_bits);
  
  return g_quark_to_string (g_quark_from_string (buf));
}

static const char *
condition_to_string (GIOCondition condition)
{
  char buf[100];
  int checked_bits = 0;
  char *bufp = buf;

  if (condition == 0)
    return "";

#define BIT(n) checked_bits |= G_IO_##n; if (condition & G_IO_##n) bufp += sprintf (bufp, "%s" #n, (bufp>buf ? "|" : ""))

  BIT (IN);
  BIT (OUT);
  BIT (PRI);
  BIT (ERR);
  BIT (HUP);
  BIT (NVAL);
  
#undef BIT

  if ((condition & ~checked_bits) != 0)
	  bufp += sprintf (bufp, "|%#x", condition & ~checked_bits);
  
  return g_quark_to_string (g_quark_from_string (buf));
}

269 270
static gboolean
g_io_win32_get_debug_flag (void)
271
{
272 273 274
  return (getenv ("G_IO_WIN32_DEBUG") != NULL);
}

275 276 277 278
static void
g_io_channel_win32_init (GIOWin32Channel *channel)
{
  channel->debug = g_io_win32_get_debug_flag ();
279 280
  channel->buffer = NULL;
  channel->running = FALSE;
281
  channel->needs_close = FALSE;
282
  channel->thread_id = 0;
283
  channel->data_avail_event = NULL;
284
  channel->revents = 0;
285
  channel->space_avail_event = NULL;
286 287
  channel->event_mask = 0;
  channel->last_events = 0;
288
  channel->event = NULL;
289
  channel->write_would_have_blocked = FALSE;
290
  channel->ever_writable = FALSE;
291
  InitializeCriticalSection (&channel->mutex);
292 293
}

294 295 296 297 298
static void
create_events (GIOWin32Channel *channel)
{
  SECURITY_ATTRIBUTES sec_attrs;
  
299
  sec_attrs.nLength = sizeof (SECURITY_ATTRIBUTES);
300 301
  sec_attrs.lpSecurityDescriptor = NULL;
  sec_attrs.bInheritHandle = FALSE;
302

303 304 305 306
  /* The data available event is manual reset, the space available event
   * is automatic reset.
   */
  if (!(channel->data_avail_event = CreateEvent (&sec_attrs, TRUE, FALSE, NULL))
307
      || !(channel->space_avail_event = CreateEvent (&sec_attrs, FALSE, FALSE, NULL)))
308
    {
309 310 311
      gchar *emsg = g_win32_error_message (GetLastError ());
      g_error ("Error creating event: %s", emsg);
      g_free (emsg);
312 313
    }
}
314

315
static unsigned __stdcall
316
read_thread (void *parameter)
317 318 319
{
  GIOWin32Channel *channel = parameter;
  guchar *buffer;
320
  gint nbytes;
321

322
  g_io_channel_ref ((GIOChannel *)channel);
323

324
  if (channel->debug)
325
    g_print ("read_thread %#x: start fd=%d, data_avail=%p space_avail=%p\n",
326 327
	     channel->thread_id,
	     channel->fd,
328 329
	     channel->data_avail_event,
	     channel->space_avail_event);
330 331

  channel->direction = 0;
332 333 334
  channel->buffer = g_malloc (BUFFER_SIZE);
  channel->rdp = channel->wrp = 0;
  channel->running = TRUE;
335

336 337
  SetEvent (channel->space_avail_event);
  
338
  LOCK (channel->mutex);
339 340
  while (channel->running)
    {
341
      if (channel->debug)
342
	g_print ("read_thread %#x: rdp=%d, wrp=%d\n",
343
		 channel->thread_id, channel->rdp, channel->wrp);
344 345 346
      if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
	{
	  /* Buffer is full */
347
	  if (channel->debug)
348
	    g_print ("read_thread %#x: resetting space_avail\n",
349
		     channel->thread_id);
350
	  ResetEvent (channel->space_avail_event);
351
	  if (channel->debug)
352 353
	    g_print ("read_thread %#x: waiting for space\n",
		     channel->thread_id);
354 355 356
	  UNLOCK (channel->mutex);
	  WaitForSingleObject (channel->space_avail_event, INFINITE);
	  LOCK (channel->mutex);
357
	  if (channel->debug)
358
	    g_print ("read_thread %#x: rdp=%d, wrp=%d\n",
359
		     channel->thread_id, channel->rdp, channel->wrp);
360 361 362 363 364 365 366 367 368
	}
      
      buffer = channel->buffer + channel->wrp;
      
      /* Always leave at least one byte unused gap to be able to
       * distinguish between the full and empty condition...
       */
      nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE,
		    BUFFER_SIZE - channel->wrp);
369

370
      if (channel->debug)
371
	g_print ("read_thread %#x: calling read() for %d bytes\n",
372 373
		 channel->thread_id, nbytes);

374
      UNLOCK (channel->mutex);
375

376
      nbytes = read (channel->fd, buffer, nbytes);
377 378
      
      LOCK (channel->mutex);
379

380 381 382 383 384 385
      channel->revents = G_IO_IN;
      if (nbytes == 0)
	channel->revents |= G_IO_HUP;
      else if (nbytes < 0)
	channel->revents |= G_IO_ERR;

386
      if (channel->debug)
387
	g_print ("read_thread %#x: read() returned %d, rdp=%d, wrp=%d\n",
388
		 channel->thread_id, nbytes, channel->rdp, channel->wrp);
389 390 391 392

      if (nbytes <= 0)
	break;

393
      channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE;
394
      if (channel->debug)
395
	g_print ("read_thread %#x: rdp=%d, wrp=%d, setting data_avail\n",
396
		 channel->thread_id, channel->rdp, channel->wrp);
397 398 399 400
      SetEvent (channel->data_avail_event);
    }
  
  channel->running = FALSE;
401 402 403
  if (channel->needs_close)
    {
      if (channel->debug)
404
	g_print ("read_thread %#x: channel fd %d needs closing\n",
405
		 channel->thread_id, channel->fd);
406
      close (channel->fd);
407 408 409
      channel->fd = -1;
    }

410 411 412
  if (channel->debug)
    g_print ("read_thread %#x: EOF, rdp=%d, wrp=%d, setting data_avail\n",
	     channel->thread_id, channel->rdp, channel->wrp);
413 414 415
  SetEvent (channel->data_avail_event);
  UNLOCK (channel->mutex);
  
416
  g_io_channel_unref ((GIOChannel *)channel);
417
  
418 419 420
  /* No need to call _endthreadex(), the actual thread starter routine
   * in MSVCRT (see crt/src/threadex.c:_threadstartex) calls
   * _endthreadex() for us.
421
   */
422

423 424
  return 0;
}
425

426 427 428 429 430
static unsigned __stdcall
write_thread (void *parameter)
{
  GIOWin32Channel *channel = parameter;
  guchar *buffer;
431
  gint nbytes;
432 433 434 435

  g_io_channel_ref ((GIOChannel *)channel);

  if (channel->debug)
436
    g_print ("write_thread %#x: start fd=%d, data_avail=%p space_avail=%p\n",
437 438
	     channel->thread_id,
	     channel->fd,
439 440
	     channel->data_avail_event,
	     channel->space_avail_event);
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
  
  channel->direction = 1;
  channel->buffer = g_malloc (BUFFER_SIZE);
  channel->rdp = channel->wrp = 0;
  channel->running = TRUE;

  SetEvent (channel->space_avail_event);

  /* We use the same event objects as for a reader thread, but with
   * reversed meaning. So, space_avail is used if data is available
   * for writing, and data_avail is used if space is available in the
   * write buffer.
   */

  LOCK (channel->mutex);
  while (channel->running || channel->rdp != channel->wrp)
    {
      if (channel->debug)
	g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
		 channel->thread_id, channel->rdp, channel->wrp);
      if (channel->wrp == channel->rdp)
	{
	  /* Buffer is empty. */
	  if (channel->debug)
	    g_print ("write_thread %#x: resetting space_avail\n",
		     channel->thread_id);
	  ResetEvent (channel->space_avail_event);
	  if (channel->debug)
	    g_print ("write_thread %#x: waiting for data\n",
		     channel->thread_id);
	  channel->revents = G_IO_OUT;
	  SetEvent (channel->data_avail_event);
	  UNLOCK (channel->mutex);
	  WaitForSingleObject (channel->space_avail_event, INFINITE);

	  LOCK (channel->mutex);
	  if (channel->rdp == channel->wrp)
	    break;

	  if (channel->debug)
	    g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
		     channel->thread_id, channel->rdp, channel->wrp);
	}
      
      buffer = channel->buffer + channel->rdp;
      if (channel->rdp < channel->wrp)
	nbytes = channel->wrp - channel->rdp;
      else
	nbytes = BUFFER_SIZE - channel->rdp;

      if (channel->debug)
	g_print ("write_thread %#x: calling write() for %d bytes\n",
		 channel->thread_id, nbytes);

      UNLOCK (channel->mutex);
      nbytes = write (channel->fd, buffer, nbytes);
      LOCK (channel->mutex);

      if (channel->debug)
	g_print ("write_thread %#x: write(%i) returned %d, rdp=%d, wrp=%d\n",
		 channel->thread_id, channel->fd, nbytes, channel->rdp, channel->wrp);

      channel->revents = 0;
      if (nbytes > 0)
	channel->revents |= G_IO_OUT;
      else if (nbytes <= 0)
	channel->revents |= G_IO_ERR;

      channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE;

      if (nbytes <= 0)
	break;

      if (channel->debug)
	g_print ("write_thread: setting data_avail for thread %#x\n",
		 channel->thread_id);
      SetEvent (channel->data_avail_event);
    }
  
  channel->running = FALSE;
  if (channel->needs_close)
    {
      if (channel->debug)
	g_print ("write_thread %#x: channel fd %d needs closing\n",
		 channel->thread_id, channel->fd);
      close (channel->fd);
      channel->fd = -1;
    }

  UNLOCK (channel->mutex);
  
  g_io_channel_unref ((GIOChannel *)channel);
  
  return 0;
}

537
static void
538 539 540
create_thread (GIOWin32Channel     *channel,
	       GIOCondition         condition,
	       unsigned (__stdcall *thread) (void *parameter))
541
{
542 543 544 545 546
  HANDLE thread_handle;

  thread_handle = (HANDLE) _beginthreadex (NULL, 0, thread, channel, 0,
					   &channel->thread_id);
  if (thread_handle == 0)
547
    g_warning ("Error creating thread: %s.",
Owen Taylor's avatar
Owen Taylor committed
548
	       g_strerror (errno));
549
  else if (!CloseHandle (thread_handle))
550
    g_warning ("Error closing thread handle: %s.\n",
551 552
	       g_win32_error_message (GetLastError ()));

553 554
  WaitForSingleObject (channel->space_avail_event, INFINITE);
}
555

556
static GIOStatus
557
buffer_read (GIOWin32Channel *channel,
558
	     gchar           *dest,
559 560 561
	     gsize            count,
	     gsize           *bytes_read,
	     GError         **err)
562
{
563 564 565 566
  guint nbytes;
  guint left = count;
  
  LOCK (channel->mutex);
567
  if (channel->debug)
568
    g_print ("reading from thread %#x %" G_GSIZE_FORMAT " bytes, rdp=%d, wrp=%d\n",
569
	     channel->thread_id, count, channel->rdp, channel->wrp);
570
  
571
  if (channel->wrp == channel->rdp)
572 573
    {
      UNLOCK (channel->mutex);
574 575 576
      if (channel->debug)
	g_print ("waiting for data from thread %#x\n", channel->thread_id);
      WaitForSingleObject (channel->data_avail_event, INFINITE);
577 578
      if (channel->debug)
	g_print ("done waiting for data from thread %#x\n", channel->thread_id);
579
      LOCK (channel->mutex);
580
      if (channel->wrp == channel->rdp && !channel->running)
581
	{
582 583
	  if (channel->debug)
	    g_print ("wrp==rdp, !running\n");
584
	  UNLOCK (channel->mutex);
585
          *bytes_read = 0;
586
	  return G_IO_STATUS_EOF;
587
	}
588
    }
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
  
  if (channel->rdp < channel->wrp)
    nbytes = channel->wrp - channel->rdp;
  else
    nbytes = BUFFER_SIZE - channel->rdp;
  UNLOCK (channel->mutex);
  nbytes = MIN (left, nbytes);
  if (channel->debug)
    g_print ("moving %d bytes from thread %#x\n",
	     nbytes, channel->thread_id);
  memcpy (dest, channel->buffer + channel->rdp, nbytes);
  dest += nbytes;
  left -= nbytes;
  LOCK (channel->mutex);
  channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE;
  if (channel->debug)
605
    g_print ("setting space_avail for thread %#x\n", channel->thread_id);
606 607 608 609
  SetEvent (channel->space_avail_event);
  if (channel->debug)
    g_print ("for thread %#x: rdp=%d, wrp=%d\n",
	     channel->thread_id, channel->rdp, channel->wrp);
610
  if (channel->running && channel->wrp == channel->rdp)
611 612
    {
      if (channel->debug)
613
	g_print ("resetting data_avail of thread %#x\n",
614 615 616
		 channel->thread_id);
      ResetEvent (channel->data_avail_event);
    };
617 618
  UNLOCK (channel->mutex);
  
619 620 621
  /* We have no way to indicate any errors form the actual
   * read() or recv() call in the reader thread. Should we have?
   */
622 623
  *bytes_read = count - left;
  return (*bytes_read > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
624
}
625

626 627 628

static GIOStatus
buffer_write (GIOWin32Channel *channel,
629
	      const gchar     *dest,
630 631 632 633 634 635 636 637 638
	      gsize            count,
	      gsize           *bytes_written,
	      GError         **err)
{
  guint nbytes;
  guint left = count;
  
  LOCK (channel->mutex);
  if (channel->debug)
639
    g_print ("buffer_write: writing to thread %#x %" G_GSIZE_FORMAT " bytes, rdp=%d, wrp=%d\n",
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
	     channel->thread_id, count, channel->rdp, channel->wrp);
  
  if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
    {
      /* Buffer is full */
      if (channel->debug)
	g_print ("buffer_write: tid %#x: resetting data_avail\n",
		 channel->thread_id);
      ResetEvent (channel->data_avail_event);
      if (channel->debug)
	g_print ("buffer_write: tid %#x: waiting for space\n",
		 channel->thread_id);
      UNLOCK (channel->mutex);
      WaitForSingleObject (channel->data_avail_event, INFINITE);
      LOCK (channel->mutex);
      if (channel->debug)
	g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d\n",
		 channel->thread_id, channel->rdp, channel->wrp);
    }
   
  nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE,
		BUFFER_SIZE - channel->wrp);

  UNLOCK (channel->mutex);
  nbytes = MIN (left, nbytes);
  if (channel->debug)
    g_print ("buffer_write: tid %#x: writing %d bytes\n",
	     channel->thread_id, nbytes);
  memcpy (channel->buffer + channel->wrp, dest, nbytes);
  dest += nbytes;
  left -= nbytes;
  LOCK (channel->mutex);

  channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE;
  if (channel->debug)
    g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d, setting space_avail\n",
	     channel->thread_id, channel->rdp, channel->wrp);
  SetEvent (channel->space_avail_event);

  if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
    {
      /* Buffer is full */
      if (channel->debug)
	g_print ("buffer_write: tid %#x: resetting data_avail\n",
		 channel->thread_id);
      ResetEvent (channel->data_avail_event);
    }

  UNLOCK (channel->mutex);
  
  /* We have no way to indicate any errors form the actual
   * write() call in the writer thread. Should we have?
   */
  *bytes_written = count - left;
  return (*bytes_written > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
}


698
static gboolean
699 700
g_io_win32_prepare (GSource *source,
		    gint    *timeout)
701
{
702
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
703
  GIOCondition buffer_condition = g_io_channel_get_buffer_condition (watch->channel);
704
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
705
  int event_mask;
706
  
707
  *timeout = -1;
708
  
709 710 711
  if (channel->debug)
    g_print ("g_io_win32_prepare: source=%p channel=%p", source, channel);

712
  switch (channel->type)
713
    {
714
    case G_IO_WIN32_WINDOWS_MESSAGES:
715 716 717 718
      if (channel->debug)
	g_print (" MSG");
      break;

719
    case G_IO_WIN32_CONSOLE:
720 721
      if (channel->debug)
	g_print (" CON");
722 723 724 725
      break;

    case G_IO_WIN32_FILE_DESC:
      if (channel->debug)
726 727
	g_print (" FD thread=%#x buffer_condition:{%s}"
		 "\n  watch->pollfd.events:{%s} watch->pollfd.revents:{%s} channel->revents:{%s}",
728 729 730 731 732
		 channel->thread_id, condition_to_string (buffer_condition),
		 condition_to_string (watch->pollfd.events),
		 condition_to_string (watch->pollfd.revents),
		 condition_to_string (channel->revents));
      
733
      LOCK (channel->mutex);
734
      if (channel->running)
735
	{
736 737 738
	  if (channel->direction == 0 && channel->wrp == channel->rdp)
	    {
	      if (channel->debug)
739
		g_print ("\n  setting revents=0");
740 741
	      channel->revents = 0;
	    }
742
	}
743 744 745 746 747 748
      else
	{
	  if (channel->direction == 1
	      && (channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
	    {
	      if (channel->debug)
749
		g_print ("\n setting revents=0");
750 751 752
	      channel->revents = 0;
	    }
	}	  
753
      UNLOCK (channel->mutex);
754
      break;
755

756
    case G_IO_WIN32_SOCKET:
757 758
      if (channel->debug)
	g_print (" SOCK");
759 760 761 762 763
      event_mask = 0;
      if (watch->condition & G_IO_IN)
	event_mask |= (FD_READ | FD_ACCEPT);
      if (watch->condition & G_IO_OUT)
	event_mask |= (FD_WRITE | FD_CONNECT);
764
      event_mask |= FD_CLOSE;
765

766
      if (channel->event_mask != event_mask)
767 768
	{
	  if (channel->debug)
769 770
	    g_print ("\n  WSAEventSelect(%d,%p,{%s})",
		     channel->fd, (HANDLE) watch->pollfd.fd,
771 772 773 774 775 776 777 778
		     event_mask_to_string (event_mask));
	  if (WSAEventSelect (channel->fd, (HANDLE) watch->pollfd.fd,
			      event_mask) == SOCKET_ERROR)
	    ;			/* What? */
	  channel->event_mask = event_mask;
#if 0
	  channel->event = watch->pollfd.fd;
#endif
779 780
	  if (channel->debug)
	    g_print ("\n  setting last_events=0");
781
	  channel->last_events = 0;
782

783 784 785
	  if ((event_mask & FD_WRITE) &&
	      channel->ever_writable &&
	      !channel->write_would_have_blocked)
786 787
	    {
	      if (channel->debug)
788
		g_print (" WSASetEvent(%p)", (WSAEVENT) watch->pollfd.fd);
789 790
	      WSASetEvent ((WSAEVENT) watch->pollfd.fd);
	    }
791 792 793 794 795 796 797
	}
      break;

    default:
      g_assert_not_reached ();
      abort ();
    }
798 799 800
  if (channel->debug)
    g_print ("\n");

801
  return ((watch->condition & buffer_condition) == watch->condition);
802 803
}

804
static gboolean
805
g_io_win32_check (GSource *source)
806
{
807
  MSG msg;
808
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
809
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
810
  GIOCondition buffer_condition = g_io_channel_get_buffer_condition (watch->channel);
811
  WSANETWORKEVENTS events;
812

813 814 815
  if (channel->debug)
    g_print ("g_io_win32_check: source=%p channel=%p", source, channel);

816
  switch (channel->type)
817
    {
818
    case G_IO_WIN32_WINDOWS_MESSAGES:
819 820
      if (channel->debug)
	g_print (" MSG\n");
821
      return (PeekMessage (&msg, channel->hwnd, 0, 0, PM_NOREMOVE));
822 823

    case G_IO_WIN32_FILE_DESC:
824
      if (channel->debug)
825
	g_print (" FD thread=%#x buffer_condition=%s\n"
826 827 828 829 830 831 832 833 834 835
		 "  watch->pollfd.events={%s} watch->pollfd.revents={%s} channel->revents={%s}\n",
		 channel->thread_id, condition_to_string (buffer_condition),
		 condition_to_string (watch->pollfd.events),
		 condition_to_string (watch->pollfd.revents),
		 condition_to_string (channel->revents));
      
      watch->pollfd.revents = (watch->pollfd.events & channel->revents);

      return ((watch->pollfd.revents | buffer_condition) & watch->condition);

836
    case G_IO_WIN32_CONSOLE:
837 838
      if (channel->debug)
	g_print (" CON\n");
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
      if (watch->channel->is_writeable)
	return TRUE;
      else if (watch->channel->is_readable)
        {
	  INPUT_RECORD buffer;
	  DWORD n;
	  if (PeekConsoleInput ((HANDLE) watch->pollfd.fd, &buffer, 1, &n) &&
	      n == 1)
	    {
	      /* _kbhit() does quite complex processing to find out
	       * whether at least one of the key events pending corresponds
	       * to a "real" character that can be read.
	       */
	      if (_kbhit ())
		return TRUE;
	      
	      /* Discard all other kinds of events */
	      ReadConsoleInput ((HANDLE) watch->pollfd.fd, &buffer, 1, &n);
	    }
        }
      return FALSE;

861
    case G_IO_WIN32_SOCKET:
862 863
      if (channel->debug)
	g_print (" SOCK");
864 865 866
      if (channel->last_events & FD_WRITE)
	{
	  if (channel->debug)
867 868
	    g_print (" sock=%d event=%p last_events has FD_WRITE",
		     channel->fd, (HANDLE) watch->pollfd.fd);
869 870 871 872 873 874
	}
      else
	{
	  WSAEnumNetworkEvents (channel->fd, 0, &events);

	  if (channel->debug)
875 876
	    g_print ("\n  revents={%s} condition={%s}"
		     "\n  WSAEnumNetworkEvents(%d,0) sets events={%s}",
877 878
		     condition_to_string (watch->pollfd.revents),
		     condition_to_string (watch->condition),
879
		     channel->fd, 
880 881 882 883 884 885 886 887
		     event_mask_to_string (events.lNetworkEvents));
	  
	  if (watch->pollfd.revents != 0 &&
	      events.lNetworkEvents == 0 &&
	      !(channel->event_mask & FD_WRITE))
	    {
	      channel->event_mask = 0;
	      if (channel->debug)
888 889
		g_print ("\n  WSAEventSelect(%d,%p,{})",
			 channel->fd, (HANDLE) watch->pollfd.fd);
890 891
	      WSAEventSelect (channel->fd, (HANDLE) watch->pollfd.fd, 0);
	      if (channel->debug)
892 893
		g_print ("  ResetEvent(%p)",
			 (HANDLE) watch->pollfd.fd);
894 895
	      ResetEvent ((HANDLE) watch->pollfd.fd);
	    }
896 897
	  else if (events.lNetworkEvents & FD_WRITE)
	    channel->ever_writable = TRUE;
898 899
	  channel->last_events = events.lNetworkEvents;
	}
900

901 902 903
      watch->pollfd.revents = 0;
      if (channel->last_events & (FD_READ | FD_ACCEPT))
	watch->pollfd.revents |= G_IO_IN;
904

905
      if (channel->last_events & FD_WRITE)
906
	watch->pollfd.revents |= G_IO_OUT;
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921
      else
	{
	  /* We have called WSAEnumNetworkEvents() above but it didn't
	   * set FD_WRITE.
	   */
	  if (events.lNetworkEvents & FD_CONNECT)
	    {
	      if (events.iErrorCode[FD_CONNECT_BIT] == 0)
		watch->pollfd.revents |= G_IO_OUT;
	      else
		watch->pollfd.revents |= (G_IO_HUP | G_IO_ERR);
	    }
	  if (watch->pollfd.revents == 0 && (channel->last_events & (FD_CLOSE)))
	    watch->pollfd.revents |= G_IO_HUP;
	}
922

923
      /* Regardless of WSAEnumNetworkEvents() result, if watching for
924 925 926
       * writability, and if we have ever got a FD_WRITE event, and
       * unless last write would have blocked, set G_IO_OUT. But never
       * set both G_IO_OUT and G_IO_HUP.
927 928
       */
      if (!(watch->pollfd.revents & G_IO_HUP) &&
929
	  channel->ever_writable &&
930 931 932
	  !channel->write_would_have_blocked &&
	  (channel->event_mask & FD_WRITE))
	watch->pollfd.revents |= G_IO_OUT;
933

934 935 936 937 938
      if (channel->debug)
	g_print ("\n  revents={%s} retval={%s}\n",
		 condition_to_string (watch->pollfd.revents),
		 condition_to_string ((watch->pollfd.revents | buffer_condition) & watch->condition));

939 940 941 942 943 944
      return ((watch->pollfd.revents | buffer_condition) & watch->condition);

    default:
      g_assert_not_reached ();
      abort ();
    }
945 946 947
}

static gboolean
948 949 950
g_io_win32_dispatch (GSource     *source,
		     GSourceFunc  callback,
		     gpointer     user_data)
951
{
952
  GIOFunc func = (GIOFunc)callback;
953
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
954
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
955
  GIOCondition buffer_condition = g_io_channel_get_buffer_condition (watch->channel);
956
  
957 958
  if (!func)
    {
959
      g_warning ("IO Watch dispatched without callback\n"
960 961 962 963
		 "You must call g_source_connect().");
      return FALSE;
    }
  
964 965 966 967 968 969
  if (channel->debug)
    g_print ("g_io_win32_dispatch: pollfd.revents=%s condition=%s result=%s\n",
	     condition_to_string (watch->pollfd.revents),
	     condition_to_string (watch->condition),
	     condition_to_string ((watch->pollfd.revents | buffer_condition) & watch->condition));

970
  return (*func) (watch->channel,
971
		  (watch->pollfd.revents | buffer_condition) & watch->condition,
972
		  user_data);
973 974 975
}

static void
976
g_io_win32_finalize (GSource *source)
977
{
978
  GIOWin32Watch *watch = (GIOWin32Watch *)source;
979
  GIOWin32Channel *channel = (GIOWin32Channel *)watch->channel;
980
  
981 982 983
  if (channel->debug)
    g_print ("g_io_win32_finalize: source=%p channel=%p", source, channel);

984 985 986
  switch (channel->type)
    {
    case G_IO_WIN32_WINDOWS_MESSAGES:
987 988 989 990
      if (channel->debug)
	g_print (" MSG");
      break;

991
    case G_IO_WIN32_CONSOLE:
992 993
      if (channel->debug)
	g_print (" CON");
994
      break;
995

996 997
    case G_IO_WIN32_FILE_DESC:
      if (channel->debug)
998
	g_print (" FD thread=%#x", channel->thread_id);
999 1000 1001 1002
      break;

    case G_IO_WIN32_SOCKET:
      if (channel->debug)
1003
	g_print (" SOCK sock=%d", channel->fd);
1004 1005 1006 1007 1008 1009
      break;

    default:
      g_assert_not_reached ();
      abort ();
    }
1010 1011
  if (channel->debug)
    g_print ("\n");
1012
  g_io_channel_unref (watch->channel);
1013 1014
}

1015
GSourceFuncs g_io_watch_funcs = {
1016 1017 1018
  g_io_win32_prepare,
  g_io_win32_check,
  g_io_win32_dispatch,
1019
  g_io_win32_finalize
1020
};
1021

1022
static GIOStatus
1023 1024
g_io_win32_msg_read (GIOChannel *channel,
		     gchar      *buf,
1025 1026 1027
		     gsize       count,
		     gsize      *bytes_read,
		     GError    **err)
1028
{
1029
  GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
1030 1031
  MSG msg;               /* In case of alignment problems */
  
1032
  if (count < sizeof (MSG))
1033
    {
1034 1035
      g_set_error_literal (err, G_IO_CHANNEL_ERROR, G_IO_CHANNEL_ERROR_INVAL,
                           "Incorrect message size"); /* Informative enough error message? */
1036 1037
      return G_IO_STATUS_ERROR;
    }
1038
  
1039
  if (win32_channel->debug)
1040 1041
    g_print ("g_io_win32_msg_read: channel=%p hwnd=%p\n",
	     channel, win32_channel->hwnd);
1042
  if (!PeekMessage (&msg, win32_channel->hwnd, 0, 0, PM_REMOVE))
1043
    return G_IO_STATUS_AGAIN;
1044

1045 1046
  memmove (buf, &msg, sizeof (MSG));
  *bytes_read = sizeof (MSG);
1047

1048
  return G_IO_STATUS_NORMAL;
1049
}
1050

1051 1052 1053 1054 1055 1056
static GIOStatus
g_io_win32_msg_write (GIOChannel  *channel,
		      const gchar *buf,
		      gsize        count,
		      gsize       *bytes_written,
		      GError     **err)
1057
{
1058
  GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
1059
  MSG msg;
1060
  
1061
  if (count != sizeof (MSG))
1062
    {
1063 1064
      g_set_error_literal (err, G_IO_CHANNEL_ERROR, G_IO_CHANNEL_ERROR_INVAL,
                           "Incorrect message size"); /* Informative enough error message? */
1065 1066
      return G_IO_STATUS_ERROR;
    }
1067
  
1068 1069 1070
  /* In case of alignment problems */
  memmove (&msg, buf, sizeof (MSG));
  if (!PostMessage (win32_channel->hwnd, msg.message, msg.wParam, msg.lParam))
1071
    {
1072
      gchar *emsg = g_win32_error_message (GetLastError ());
1073
      g_set_error_literal (err, G_IO_CHANNEL_ERROR, G_IO_CHANNEL_ERROR_FAILED, emsg);
1074
      g_free (emsg);
1075 1076 1077
      return G_IO_STATUS_ERROR;
    }

1078
  *bytes_written = sizeof (MSG);
1079 1080

  return G_IO_STATUS_NORMAL;
1081 1082
}

1083 1084 1085
static GIOStatus
g_io_win32_msg_close (GIOChannel *channel,
		      GError    **err)
1086 1087
{
  /* Nothing to be done. Or should we set hwnd to some invalid value? */
1088 1089

  return G_IO_STATUS_NORMAL;
1090 1091
}

1092
static void
1093 1094
g_io_win32_free (GIOChannel *channel)
{
1095
  GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
1096
  
1097
  if (win32_channel->debug)
1098
    g_print ("g_io_win32_free channel=%p fd=%d\n", channel, win32_channel->fd);
1099

1100 1101 1102 1103
  if (win32_channel->data_avail_event)
    CloseHandle (win32_channel->data_avail_event);
  if (win32_channel->space_avail_event)
    CloseHandle (win32_channel->space_avail_event);
1104 1105
  if (win32_channel->type == G_IO_WIN32_SOCKET)
    WSAEventSelect (win32_channel->fd, NULL, 0);
1106
  DeleteCriticalSection (&win32_channel->mutex);
1107

1108
  g_free (win32_channel->buffer);
1109 1110 1111
  g_free (win32_channel);
}

1112
static GSource *
1113 1114
g_io_win32_msg_create_watch (GIOChannel   *channel,
			     GIOCondition  condition)
1115
{
1116 1117 1118
  GIOWin32Watch *watch;
  GSource *source;

1119
  source = g_source_new (&g_io_watch_funcs, sizeof (GIOWin32Watch));
1120
  watch = (GIOWin32Watch *)source;
1121 1122 1123
  
  watch->channel = channel;
  g_io_channel_ref (channel);
1124
  
1125
  watch->condition = condition;
1126
  
1127
  watch->pollfd.fd = (gintptr) G_WIN32_MSG_HANDLE;
1128
  watch->pollfd.events = condition;
1129
  
1130
  g_source_add_poll (source, &watch->pollfd);
1131
  
1132
  return source;
1133 1134
}