Commit 799f8dcd authored by Sebastian Dröge's avatar Sebastian Dröge

GSocket: Fix race conditions on Win32 if multiple threads are waiting on...

GSocket: Fix race conditions on Win32 if multiple threads are waiting on conditions for the same socket

WSAWaitForMultipleEvents() only returns for one of the waiting threads, and
that one might not even be the one waiting for the condition that changed. As
such, only let a single thread wait on the event and use a GCond for all other
threads.

With this it is possible to e.g. have an UDP socket that is written to from
one thread and read from in another thread on Win32 too. On POSIX systems this
was working before already.

https://bugzilla.gnome.org/show_bug.cgi?id=762283
parent 620b3c1e
......@@ -249,11 +249,14 @@ struct _GSocketPrivate
guint connect_pending : 1;
#ifdef G_OS_WIN32
WSAEVENT event;
gboolean waiting;
DWORD waiting_result;
int current_events;
int current_errors;
int selected_events;
GList *requested_conditions; /* list of requested GIOCondition * */
GMutex win32_source_lock;
GCond win32_source_cond;
#endif
struct {
......@@ -333,8 +336,10 @@ socket_strerror (int err)
static void
_win32_unset_event_mask (GSocket *socket, int mask)
{
g_mutex_lock (&socket->priv->win32_source_lock);
socket->priv->current_events &= ~mask;
socket->priv->current_errors &= ~mask;
g_mutex_unlock (&socket->priv->win32_source_lock);
}
#else
#define win32_unset_event_mask(_socket, _mask)
......@@ -831,6 +836,7 @@ g_socket_finalize (GObject *object)
g_assert (socket->priv->requested_conditions == NULL);
g_mutex_clear (&socket->priv->win32_source_lock);
g_cond_clear (&socket->priv->win32_source_cond);
#endif
for (i = 0; i < RECV_ADDR_CACHE_SIZE; i++)
......@@ -1058,6 +1064,7 @@ g_socket_init (GSocket *socket)
#ifdef G_OS_WIN32
socket->priv->event = WSA_INVALID_EVENT;
g_mutex_init (&socket->priv->win32_source_lock);
g_cond_init (&socket->priv->win32_source_cond);
#endif
}
......@@ -2441,6 +2448,8 @@ g_socket_accept (GSocket *socket,
while (TRUE)
{
win32_unset_event_mask (socket, FD_ACCEPT);
if ((ret = accept (socket->priv->fd, NULL, 0)) < 0)
{
int errsv = get_socket_errno ();
......@@ -2455,8 +2464,6 @@ g_socket_accept (GSocket *socket,
errsv == EAGAIN)
#endif
{
win32_unset_event_mask (socket, FD_ACCEPT);
if (socket->priv->blocking)
{
if (!g_socket_condition_wait (socket,
......@@ -2473,8 +2480,6 @@ g_socket_accept (GSocket *socket,
break;
}
win32_unset_event_mask (socket, FD_ACCEPT);
#ifdef G_OS_WIN32
{
/* The socket inherits the accepting sockets event mask and even object,
......@@ -2563,6 +2568,8 @@ g_socket_connect (GSocket *socket,
while (1)
{
win32_unset_event_mask (socket, FD_CONNECT);
if (connect (socket->priv->fd, (struct sockaddr *) &buffer,
g_socket_address_get_native_size (address)) < 0)
{
......@@ -2577,8 +2584,6 @@ g_socket_connect (GSocket *socket,
if (errsv == WSAEWOULDBLOCK)
#endif
{
win32_unset_event_mask (socket, FD_CONNECT);
if (socket->priv->blocking)
{
if (g_socket_condition_wait (socket, G_IO_OUT, cancellable, error))
......@@ -2604,8 +2609,6 @@ g_socket_connect (GSocket *socket,
break;
}
win32_unset_event_mask (socket, FD_CONNECT);
socket->priv->connected_read = TRUE;
socket->priv->connected_write = TRUE;
......@@ -2785,6 +2788,8 @@ g_socket_receive_with_timeout (GSocket *socket,
while (1)
{
win32_unset_event_mask (socket, FD_READ);
if ((ret = recv (socket->priv->fd, buffer, size, 0)) < 0)
{
int errsv = get_socket_errno ();
......@@ -2799,8 +2804,6 @@ g_socket_receive_with_timeout (GSocket *socket,
errsv == EAGAIN)
#endif
{
win32_unset_event_mask (socket, FD_READ);
if (timeout != 0)
{
if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
......@@ -2811,14 +2814,10 @@ g_socket_receive_with_timeout (GSocket *socket,
}
}
win32_unset_event_mask (socket, FD_READ);
socket_set_error_lazy (error, errsv, _("Error receiving data: %s"));
return -1;
}
win32_unset_event_mask (socket, FD_READ);
break;
}
......@@ -2984,6 +2983,8 @@ g_socket_send_with_timeout (GSocket *socket,
while (1)
{
win32_unset_event_mask (socket, FD_WRITE);
if ((ret = send (socket->priv->fd, buffer, size, G_SOCKET_DEFAULT_SEND_FLAGS)) < 0)
{
int errsv = get_socket_errno ();
......@@ -2998,8 +2999,6 @@ g_socket_send_with_timeout (GSocket *socket,
errsv == EAGAIN)
#endif
{
win32_unset_event_mask (socket, FD_WRITE);
if (timeout != 0)
{
if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time,
......@@ -3414,7 +3413,7 @@ remove_condition_watch (GSocket *socket,
}
static GIOCondition
update_condition (GSocket *socket)
update_condition_unlocked (GSocket *socket)
{
WSANETWORKEVENTS events;
GIOCondition condition;
......@@ -3481,6 +3480,16 @@ update_condition (GSocket *socket)
return condition;
}
static GIOCondition
update_condition (GSocket *socket)
{
GIOCondition res;
g_mutex_lock (&socket->priv->win32_source_lock);
res = update_condition_unlocked (socket);
g_mutex_unlock (&socket->priv->win32_source_lock);
return res;
}
#endif
typedef struct {
......@@ -3876,11 +3885,44 @@ g_socket_condition_timed_wait (GSocket *socket,
if (timeout == -1)
timeout = WSA_INFINITE;
current_condition = update_condition (socket);
g_mutex_lock (&socket->priv->win32_source_lock);
current_condition = update_condition_unlocked (socket);
while ((condition & current_condition) == 0)
{
res = WSAWaitForMultipleEvents (num_events, events,
FALSE, timeout, FALSE);
if (!socket->priv->waiting)
{
socket->priv->waiting = TRUE;
socket->priv->waiting_result = 0;
g_mutex_unlock (&socket->priv->win32_source_lock);
res = WSAWaitForMultipleEvents (num_events, events, FALSE, timeout, FALSE);
g_mutex_lock (&socket->priv->win32_source_lock);
socket->priv->waiting = FALSE;
socket->priv->waiting_result = res;
g_cond_broadcast (&socket->priv->win32_source_cond);
}
else
{
if (timeout != WSA_INFINITE)
{
if (!g_cond_wait_until (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock, timeout))
{
res = WSA_WAIT_TIMEOUT;
break;
}
else
{
res = socket->priv->waiting_result;
}
}
else
{
g_cond_wait (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock);
res = socket->priv->waiting_result;
}
}
if (res == WSA_WAIT_FAILED)
{
int errsv = get_socket_errno ();
......@@ -3901,7 +3943,7 @@ g_socket_condition_timed_wait (GSocket *socket,
if (g_cancellable_set_error_if_cancelled (cancellable, error))
break;
current_condition = update_condition (socket);
current_condition = update_condition_unlocked (socket);
if (timeout != WSA_INFINITE)
{
......@@ -3910,6 +3952,7 @@ g_socket_condition_timed_wait (GSocket *socket,
timeout = 0;
}
}
g_mutex_unlock (&socket->priv->win32_source_lock);
remove_condition_watch (socket, &condition);
if (num_events > 1)
g_cancellable_release_fd (cancellable);
......@@ -4405,6 +4448,8 @@ g_socket_send_message_with_timeout (GSocket *socket,
while (1)
{
win32_unset_event_mask (socket, FD_WRITE);
if (address)
result = WSASendTo (socket->priv->fd,
bufs, num_vectors,
......@@ -4426,8 +4471,6 @@ g_socket_send_message_with_timeout (GSocket *socket,
if (errsv == WSAEWOULDBLOCK)
{
win32_unset_event_mask (socket, FD_WRITE);
if (timeout != 0)
{
if (!block_on_timeout (socket, G_IO_OUT, timeout,
......@@ -4875,6 +4918,8 @@ g_socket_receive_message_with_timeout (GSocket *socket,
/* do it */
while (1)
{
win32_unset_event_mask (socket, FD_READ);
addrlen = sizeof addr;
if (address)
result = WSARecvFrom (socket->priv->fd,
......@@ -4896,8 +4941,6 @@ g_socket_receive_message_with_timeout (GSocket *socket,
if (errsv == WSAEWOULDBLOCK)
{
win32_unset_event_mask (socket, FD_READ);
if (timeout != 0)
{
if (!block_on_timeout (socket, G_IO_IN, timeout,
......@@ -4911,7 +4954,6 @@ g_socket_receive_message_with_timeout (GSocket *socket,
socket_set_error_lazy (error, errsv, _("Error receiving message: %s"));
return -1;
}
win32_unset_event_mask (socket, FD_READ);
break;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment