From 1de9e035b0c3e849ff03c5aaddde457792f9b5b4 Mon Sep 17 00:00:00 2001 From: Ignacio Casal Quinteiro Date: Fri, 30 Nov 2018 17:02:00 +0100 Subject: [PATCH] Add wing_poll This is a new polling method allowing to poll more than 64 handles based on the glib one. When we reach the limit of 64 we create a thread and we poll on that thread for a batch of handles this way we overcome the limit. --- tests/meson.build | 5 +- tests/poll.c | 619 ++++++++++++++++++++++++++++++++++++++++++++++ wing/meson.build | 2 + wing/wing.h | 1 + wing/wingpoll.c | 403 ++++++++++++++++++++++++++++++ wing/wingpoll.h | 46 ++++ 6 files changed, 1075 insertions(+), 1 deletion(-) create mode 100644 tests/poll.c create mode 100644 wing/wingpoll.c create mode 100644 wing/wingpoll.h diff --git a/tests/meson.build b/tests/meson.build index fbbfc14..9079a7e 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -1,10 +1,13 @@ unit_tests = [ 'named-pipe', + 'poll', ] +winsock2_dep = cc.find_library('ws2_32') + foreach unit: unit_tests exe = executable(unit, unit + '.c', - dependencies: wing_dep, + dependencies: [ wing_dep, winsock2_dep ], include_directories: wing_inc) test(unit, exe, args: [ '--tap', '-k' ]) endforeach diff --git a/tests/poll.c b/tests/poll.c new file mode 100644 index 0000000..2e372da --- /dev/null +++ b/tests/poll.c @@ -0,0 +1,619 @@ +/* Unit test for W32 version of wing_poll() + * + * Copyright © 2017 Руслан Ижбулатов + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, see . + */ + +#define _WIN32_WINNT 0x0600 + +#include +#include +#include + +#define NUM_POLLEES 999 +#define NUM_POLLFDS 1000 + +#define ASYNC_CONNECT_OK(r) (r == 0 || (r < 0 && GetLastError () == WSAEWOULDBLOCK)) + +#define REPEAT 1 + +static void +init_networking (void) +{ + WSADATA wsadata; + + if (WSAStartup (MAKEWORD (2, 0), &wsadata) != 0) + g_error ("Windows Sockets could not be initialized"); +} + +static void +prepare_fds (SOCKET sockets[], + GPollFD fds[], + int num_pollees) +{ + gint i; + + for (i = 0; i < num_pollees; i++) + { + fds[i].fd = (gintptr) WSACreateEvent (); + g_assert (WSAEventSelect (sockets[i], (HANDLE) fds[i].fd, FD_READ | FD_CLOSE) == 0); + } +} + +static void +reset_fds (GPollFD fds[], + int num_pollees) +{ + gint i; + + for (i = 0; i < num_pollees; i++) + { + WSAResetEvent ((HANDLE) fds[i].fd); + fds[i].events = G_IO_IN | G_IO_OUT | G_IO_ERR; + fds[i].revents = 0; + } +} + +static void +reset_fds_msg (GPollFD fds[], + int num_pollfds) +{ + fds[num_pollfds - 1].fd = G_WIN32_MSG_HANDLE; + fds[num_pollfds - 1].events = G_IO_IN; + fds[num_pollfds - 1].revents = 0; +} + +static void +check_fds (SOCKET sockets[], + GPollFD fds[], + int num_pollees) +{ + gint i; + + for (i = 0; i < num_pollees; i++) + { + if (fds[i].revents != 0) + { + WSANETWORKEVENTS events; + g_assert (WSAEnumNetworkEvents (sockets[i], 0, &events) == 0); + + fds[i].revents = 0; + if (events.lNetworkEvents & (FD_READ | FD_ACCEPT)) + fds[i].revents |= G_IO_IN; + + if (events.lNetworkEvents & FD_WRITE) + fds[i].revents |= G_IO_OUT; + else + { + /* We have called WSAEnumNetworkEvents() above but it didn't + * set FD_WRITE. + */ + if (events.lNetworkEvents & FD_CONNECT) + { + if (events.iErrorCode[FD_CONNECT_BIT] == 0) + fds[i].revents |= G_IO_OUT; + else + fds[i].revents |= (G_IO_HUP | G_IO_ERR); + } + if (fds[i].revents == 0 && (events.lNetworkEvents & (FD_CLOSE))) + fds[i].revents |= G_IO_HUP; + } + } + } +} + +static void +prepare_sockets (SOCKET sockets[], + SOCKET opp_sockets[], + GPollFD fds[], + int num_pollees) +{ + gint i; + SOCKET server; + struct sockaddr_in sa; + unsigned long ul = 1; + int sa_size; + int r; + + server = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + g_assert (server != INVALID_SOCKET); + + memset(&sa, 0, sizeof sa); + + sa.sin_family = AF_INET; + sa.sin_port = 0; + sa.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + sa_size = sizeof (sa); + + g_assert (bind (server, (const struct sockaddr *) &sa, sa_size) == 0); + g_assert (getsockname (server, (struct sockaddr *) &sa, &sa_size) == 0); + g_assert (listen (server, 1) == 0); + + for (i = 0; i < num_pollees; i++) + { + opp_sockets[i] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + g_assert (opp_sockets[i] != INVALID_SOCKET); + g_assert (ioctlsocket (opp_sockets[i], FIONBIO, &ul) == 0); + + r = connect (opp_sockets[i], (const struct sockaddr *) &sa, sizeof (sa)); + g_assert (ASYNC_CONNECT_OK (r)); + + sockets[i] = accept (server, NULL, NULL); + g_assert (sockets[i] != INVALID_SOCKET); + g_assert (ioctlsocket (sockets[i], FIONBIO, &ul) == 0); + } + + closesocket (server); +} + +static void +cleanup_sockets (SOCKET sockets[], + SOCKET opp_sockets[], + int num_pollees) +{ + gint i; + + for (i = 0; i < num_pollees; i++) + { + closesocket (sockets[i]); + closesocket (opp_sockets[i]); + } +} + +static void +bucketize (gint64 val, + gint buckets[], + gint64 bucket_limits[], + gint count) +{ + gint i; + + if (val > bucket_limits[count - 1]) + { + buckets[count - 1] += 1; + return; + } + + for (i = count - 1; i > 0; i--) + if (val < bucket_limits[i] && val >= bucket_limits[i - 1]) + { + buckets[i] += 1; + return; + } + + buckets[0] += 1; +} + +static void +print_buckets (gint buckets[], + gint64 bucket_limits[], + gint count) +{ + gint i; + + for (i = 0; i < count; i++) + if (i < count - 1) + g_print ("%-4lld-%4lld|", i == 0 ? 0 : bucket_limits[i - 1], bucket_limits[i] - 1); + else + g_print (" >= %-4lld|", bucket_limits[i - 1]); + + g_print ("\n"); + + for (i = 0; i < count; i++) + { + gint len; + gint padding; + gint j; + if (buckets[i] < 10) + len = 1; + else if (buckets[i] < 100) + len = 2; + else if (buckets[i] < 1000) + len = 3; + else + len = 4; + padding = 9 - len; + for (j = 0; j < padding / 2; j++) + g_print (" "); + if (buckets[i] != 0) + g_print ("%*d", len, buckets[i]); + else + g_print (" "); + for (j = padding / 2; j < padding; j++) + g_print (" "); + g_print (" "); + } + + g_print ("\n\n"); +} + +static void +test_wing_poll (void) +{ + SOCKET sockets[NUM_POLLEES]; + GPollFD fds[NUM_POLLFDS]; + SOCKET opp_sockets[NUM_POLLEES]; + gint i; + gint activatable; + gint64 times[REPEAT][2]; +#define BUCKET_COUNT 25 + gint64 bucket_limits[BUCKET_COUNT] = {3, 5, 10, 15, 20, 25, 30, 35, 40, 50, 60, 70, 80, 90, 100, 120, 150, 180, 220, 280, 350, 450, 600, 800, 1000}; + gint buckets[BUCKET_COUNT]; + gint64 times_avg = 0, times_min = G_MAXINT64, times_max = 0; + + prepare_sockets (sockets, opp_sockets, fds, NUM_POLLEES); + prepare_fds (sockets, fds, NUM_POLLEES); + + times_avg = 0; + times_min = G_MAXINT64; + times_max = 0; + memset (buckets, 0, sizeof (gint) * BUCKET_COUNT); + + for (i = 0; i < REPEAT; i++) + { + gint r; + gint64 diff; + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + times[i][0] = g_get_monotonic_time (); + r = wing_poll (fds, NUM_POLLFDS, 0); + times[i][1] = g_get_monotonic_time (); + g_assert (r == 0); + diff = times[i][1] - times[i][0]; + if (times_min > diff) + times_min = diff; + if (times_max < diff) + times_max = diff; + times_avg += diff; + bucketize (diff, buckets, bucket_limits, BUCKET_COUNT); + } + + times_avg /= NUM_POLLEES; + g_print ("\nempty poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg); + print_buckets (buckets, bucket_limits, BUCKET_COUNT); + + times_avg = 0; + times_min = G_MAXINT64; + times_max = 0; + memset (buckets, 0, sizeof (gint) * BUCKET_COUNT); + + activatable = 0; + + for (i = 0; i < REPEAT; i++) + { + gint r, s, v, t; + gint64 diff; + MSG msg; + gboolean found_app; + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + s = send (opp_sockets[activatable], (const char *) &t, 1, 0); + g_assert (PostMessage (NULL, WM_APP, 1, 2)); + /* This is to ensure that all sockets catch up, otherwise some might not poll active */ + g_usleep (G_USEC_PER_SEC / 1000); + + times[i][0] = g_get_monotonic_time (); + r = wing_poll (fds, NUM_POLLFDS, 1000); + times[i][1] = g_get_monotonic_time (); + + check_fds (sockets, fds, NUM_POLLEES); + v = recv (sockets[activatable], (char *) &t, 1, 0); + found_app = FALSE; + while (!found_app && PeekMessage (&msg, NULL, 0, 0, PM_REMOVE)) + if (msg.message == WM_APP && msg.wParam == 1 && msg.lParam == 2) + found_app = TRUE; + g_assert (s == 1); + g_assert (r == 2); + g_assert (v == 1); + g_assert (found_app); + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + r = wing_poll (fds, NUM_POLLFDS, 0); + check_fds (sockets, fds, NUM_POLLEES); + g_assert (r == 0); + diff = times[i][1] - times[i][0]; + if (times_min > diff) + times_min = diff; + if (times_max < diff) + times_max = diff; + times_avg += diff; + activatable = (activatable + 1) % NUM_POLLEES; + bucketize (diff, buckets, bucket_limits, BUCKET_COUNT); + } + + times_avg /= NUM_POLLEES; + g_print ("1-socket + msg poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg); + print_buckets (buckets, bucket_limits, BUCKET_COUNT); + + times_avg = 0; + times_min = G_MAXINT64; + times_max = 0; + memset (buckets, 0, sizeof (gint) * BUCKET_COUNT); + + activatable = 0; + + for (i = 0; i < REPEAT; i++) + { + gint r, s, v, t; + gint64 diff; + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + s = send (opp_sockets[activatable], (const char *) &t, 1, 0); + + g_usleep (G_USEC_PER_SEC / 1000); + + times[i][0] = g_get_monotonic_time (); + r = wing_poll (fds, NUM_POLLFDS, 1000); + times[i][1] = g_get_monotonic_time (); + + check_fds (sockets, fds, NUM_POLLEES); + v = recv (sockets[activatable], (char *) &t, 1, 0); + g_assert (s == 1); + g_assert (r == 1); + g_assert (v == 1); + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + r = wing_poll (fds, NUM_POLLFDS, 0); + check_fds (sockets, fds, NUM_POLLEES); + g_assert (r == 0); + + diff = times[i][1] - times[i][0]; + if (times_min > diff) + times_min = diff; + if (times_max < diff) + times_max = diff; + times_avg += diff; + activatable = (activatable + 1) % NUM_POLLEES; + bucketize (diff, buckets, bucket_limits, BUCKET_COUNT); + } + + times_avg /= NUM_POLLEES; + g_print ("1-socket poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg); + print_buckets (buckets, bucket_limits, BUCKET_COUNT); + + times_avg = 0; + times_min = G_MAXINT64; + times_max = 0; + memset (buckets, 0, sizeof (gint) * BUCKET_COUNT); + + for (i = 0; i < REPEAT; i++) + { + gint r, s, v, t; + gint64 diff; + gint j; + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + s = v = 0; + + for (j = 0; j < NUM_POLLEES / 2; j++) + s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0; + + g_usleep (G_USEC_PER_SEC / 1000); + + times[i][0] = g_get_monotonic_time (); + r = wing_poll (fds, NUM_POLLFDS, 1000); + times[i][1] = g_get_monotonic_time (); + check_fds (sockets, fds, NUM_POLLEES); + for (j = 0; j < NUM_POLLEES / 2; j++) + v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0; + g_assert (s == NUM_POLLEES / 2); + g_assert (r == NUM_POLLEES / 2); + g_assert (v == NUM_POLLEES / 2); + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + r = wing_poll (fds, NUM_POLLFDS, 0); + check_fds (sockets, fds, NUM_POLLEES); + g_assert (r == 0); + + diff = times[i][1] - times[i][0]; + if (times_min > diff) + times_min = diff; + if (times_max < diff) + times_max = diff; + times_avg += diff; + bucketize (diff, buckets, bucket_limits, BUCKET_COUNT); + } + + times_avg /= NUM_POLLEES; + g_print ("half-socket poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg); + print_buckets (buckets, bucket_limits, BUCKET_COUNT); + + times_avg = 0; + times_min = G_MAXINT64; + times_max = 0; + memset (buckets, 0, sizeof (gint) * BUCKET_COUNT); + + for (i = 0; i < REPEAT; i++) + { + gint r, s, v, t; + gint64 diff; + gint j; + MSG msg; + gboolean found_app; + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + s = v = 0; + + for (j = 0; j < NUM_POLLEES / 2; j++) + s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0; + g_assert (PostMessage (NULL, WM_APP, 1, 2)); + + /* This is to ensure that all sockets catch up, otherwise some might not poll active */ + g_usleep (G_USEC_PER_SEC / 1000); + + times[i][0] = g_get_monotonic_time (); + r = wing_poll (fds, NUM_POLLFDS, 1000); + times[i][1] = g_get_monotonic_time (); + check_fds (sockets, fds, NUM_POLLEES); + for (j = 0; j < NUM_POLLEES / 2; j++) + v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0; + found_app = FALSE; + while (!found_app && PeekMessage (&msg, NULL, 0, 0, PM_REMOVE)) + if (msg.message == WM_APP && msg.wParam == 1 && msg.lParam == 2) + found_app = TRUE; + g_assert (s == NUM_POLLEES / 2); + g_assert (r == NUM_POLLEES / 2 + 1); + g_assert (v == NUM_POLLEES / 2); + g_assert (found_app); + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + r = wing_poll (fds, NUM_POLLFDS, 0); + check_fds (sockets, fds, NUM_POLLEES); + g_assert (r == 0); + + diff = times[i][1] - times[i][0]; + if (times_min > diff) + times_min = diff; + if (times_max < diff) + times_max = diff; + times_avg += diff; + bucketize (diff, buckets, bucket_limits, BUCKET_COUNT); + } + + times_avg /= NUM_POLLEES; + g_print ("half-socket + msg poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg); + print_buckets (buckets, bucket_limits, BUCKET_COUNT); + + times_avg = 0; + times_min = G_MAXINT64; + times_max = 0; + memset (buckets, 0, sizeof (gint) * BUCKET_COUNT); + + for (i = 0; i < REPEAT; i++) + { + gint r, s, v, t; + gint64 diff; + gint j; + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + s = v = 0; + + for (j = 0; j < NUM_POLLEES; j++) + s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0; + + g_usleep (G_USEC_PER_SEC / 1000); + + times[i][0] = g_get_monotonic_time (); + r = wing_poll (fds, NUM_POLLFDS, 1000); + times[i][1] = g_get_monotonic_time (); + check_fds (sockets, fds, NUM_POLLEES); + for (j = 0; j < NUM_POLLEES; j++) + v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0; + g_assert (s == NUM_POLLEES); + g_assert (r == NUM_POLLEES); + g_assert (v == NUM_POLLEES); + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + r = wing_poll (fds, NUM_POLLFDS, 0); + check_fds (sockets, fds, NUM_POLLEES); + g_assert (r == 0); + + diff = times[i][1] - times[i][0]; + if (times_min > diff) + times_min = diff; + if (times_max < diff) + times_max = diff; + times_avg += diff; + bucketize (diff, buckets, bucket_limits, BUCKET_COUNT); + } + + times_avg /= NUM_POLLEES; + g_print ("%d-socket poll time: \n%4lldns - %4lldns, average %4lldns\n", NUM_POLLEES, times_min, times_max, times_avg); + print_buckets (buckets, bucket_limits, BUCKET_COUNT); + + activatable = 0; + times_avg = 0; + times_min = G_MAXINT64; + times_max = 0; + memset (buckets, 0, sizeof (gint) * BUCKET_COUNT); + + for (i = 0; i < REPEAT; i++) + { + gint r, s, v, t; + gint64 diff; + gint j; + MSG msg; + gboolean found_app; + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + s = v = 0; + + for (j = 0; j < activatable; j++) + s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0; + g_assert (PostMessage (NULL, WM_APP, 1, 2)); + + g_usleep (G_USEC_PER_SEC / 1000); + + times[i][0] = g_get_monotonic_time (); + r = wing_poll (fds, NUM_POLLFDS, 1000); + times[i][1] = g_get_monotonic_time (); + check_fds (sockets, fds, NUM_POLLEES); + for (j = 0; j < activatable; j++) + v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0; + found_app = FALSE; + while (!found_app && PeekMessage (&msg, NULL, 0, 0, PM_REMOVE)) + if (msg.message == WM_APP && msg.wParam == 1 && msg.lParam == 2) + found_app = TRUE; + g_assert (s == activatable); + g_assert (r == activatable + 1); + g_assert (v == activatable); + g_assert (found_app); + + reset_fds (fds, NUM_POLLEES); + reset_fds_msg (fds, NUM_POLLFDS); + r = wing_poll (fds, NUM_POLLFDS, 0); + check_fds (sockets, fds, NUM_POLLEES); + g_assert (r == 0); + + diff = times[i][1] - times[i][0]; + if (times_min > diff) + times_min = diff; + if (times_max < diff) + times_max = diff; + times_avg += diff; + bucketize (diff, buckets, bucket_limits, BUCKET_COUNT); + activatable = (activatable + 1) % NUM_POLLEES; + } + + times_avg /= NUM_POLLEES; + g_print ("variable socket number + msg poll time: \n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg); + print_buckets (buckets, bucket_limits, BUCKET_COUNT); + + cleanup_sockets (sockets, opp_sockets, NUM_POLLEES); +} + +int +main (int argc, + char *argv[]) +{ + g_test_init (&argc, &argv, NULL); + init_networking (); + + g_test_add_func ("/poll/wing-poll", test_wing_poll); + + return g_test_run (); +} diff --git a/wing/meson.build b/wing/meson.build index 596e283..8da554b 100644 --- a/wing/meson.build +++ b/wing/meson.build @@ -10,6 +10,7 @@ headers = [ 'wingnamedpipeconnection.h', 'wingnamedpipelistener.h', 'wingoutputstream.h', + 'wingpoll.h', 'wingservice.h', 'wingservicemanager.h', 'wingsource.h', @@ -27,6 +28,7 @@ sources = [ 'wingnamedpipeconnection.c', 'wingnamedpipelistener.c', 'wingoutputstream.c', + 'wingpoll.c', 'wingservice.c', 'wingservice-private.h', 'wingservicemanager.c', diff --git a/wing/wing.h b/wing/wing.h index 7113f06..ce3badd 100644 --- a/wing/wing.h +++ b/wing/wing.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include diff --git a/wing/wingpoll.c b/wing/wingpoll.c new file mode 100644 index 0000000..0b63aa6 --- /dev/null +++ b/wing/wingpoll.c @@ -0,0 +1,403 @@ +/* + * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald + * + * gpoll.c: poll(2) abstraction + * Copyright 1998 Owen Taylor + * Copyright 2008 Red Hat, Inc. + * Copyright (C) 2018 NICE s.r.l. + * + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +/* + * Modified by the GLib Team and others 1997-2000. See the AUTHORS + * file for a list of people on the GLib Team. See the ChangeLog + * files for a list of changes. These files are distributed with + * GLib at ftp://ftp.gtk.org/pub/gtk/. + */ + +#include "wingpoll.h" + +#ifdef G_OS_WIN32 +#define STRICT +#include +#include +#endif /* G_OS_WIN32 */ + +#ifdef _WIN32 +/* Always enable debugging printout on Windows, as it is more often + * needed there... + */ +#define G_MAIN_POLL_DEBUG +#endif + +#ifdef G_MAIN_POLL_DEBUG +static gboolean _wing_main_poll_debug = FALSE; +#endif + +static int +poll_rest (GPollFD *msg_fd, + GPollFD *stop_fd, + HANDLE *handles, + GPollFD *handle_to_fd[], + gint nhandles, + gint timeout) +{ + DWORD ready; + GPollFD *f; + int recursed_result; + + if (msg_fd != NULL) + { + /* Wait for either messages or handles + * -> Use MsgWaitForMultipleObjectsEx + */ + if (_wing_main_poll_debug) + g_print (" MsgWaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout); + + ready = MsgWaitForMultipleObjectsEx (nhandles, handles, timeout, + QS_ALLINPUT, MWMO_ALERTABLE); + + if (ready == WAIT_FAILED) + { + gchar *emsg; + + emsg = g_win32_error_message (GetLastError ()); + g_warning ("MsgWaitForMultipleObjectsEx failed: %s", emsg); + g_free (emsg); + } + } + else if (nhandles == 0) + { + /* No handles to wait for, just the timeout */ + if (timeout == INFINITE) + ready = WAIT_FAILED; + else + { + /* Wait for the current process to die, more efficient than SleepEx(). */ + WaitForSingleObjectEx (GetCurrentProcess (), timeout, TRUE); + ready = WAIT_TIMEOUT; + } + } + else + { + /* Wait for just handles + * -> Use WaitForMultipleObjectsEx + */ + if (_wing_main_poll_debug) + g_print (" WaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout); + + ready = WaitForMultipleObjectsEx (nhandles, handles, FALSE, timeout, TRUE); + if (ready == WAIT_FAILED) + { + gchar *emsg; + + emsg = g_win32_error_message (GetLastError ()); + g_warning ("WaitForMultipleObjectsEx failed: %s", emsg); + g_free (emsg); + } + } + + if (_wing_main_poll_debug) + g_print (" wait returns %ld%s\n", + ready, + (ready == WAIT_FAILED ? " (WAIT_FAILED)" : + (ready == WAIT_TIMEOUT ? " (WAIT_TIMEOUT)" : + (msg_fd != NULL && ready == WAIT_OBJECT_0 + nhandles ? " (msg)" : "")))); + + if (ready == WAIT_FAILED) + return -1; + else if (ready == WAIT_TIMEOUT || + ready == WAIT_IO_COMPLETION) + return 0; + else if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nhandles) + { + msg_fd->revents |= G_IO_IN; + + /* If we have a timeout, or no handles to poll, be satisfied + * with just noticing we have messages waiting. + */ + if (timeout != 0 || nhandles == 0) + return 1; + + /* If no timeout and handles to poll, recurse to poll them, + * too. + */ + recursed_result = poll_rest (NULL, stop_fd, handles, handle_to_fd, nhandles, 0); + return (recursed_result == -1) ? -1 : 1 + recursed_result; + } + else if (ready >= WAIT_OBJECT_0 && ready < WAIT_OBJECT_0 + nhandles) + { + int retval; + + f = handle_to_fd[ready - WAIT_OBJECT_0]; + f->revents = f->events; + if (_wing_main_poll_debug) + g_print (" got event %p\n", (HANDLE) f->fd); + + /* Do not count the stop_fd */ + retval = (f != stop_fd) ? 1 : 0; + + /* If no timeout and polling several handles, recurse to poll + * the rest of them. + */ + if (timeout == 0 && nhandles > 1) + { + /* Poll the handles with index > ready */ + HANDLE *shorter_handles; + GPollFD **shorter_handle_to_fd; + gint shorter_nhandles; + + shorter_handles = &handles[ready - WAIT_OBJECT_0 + 1]; + shorter_handle_to_fd = &handle_to_fd[ready - WAIT_OBJECT_0 + 1]; + shorter_nhandles = nhandles - (ready - WAIT_OBJECT_0 + 1); + + recursed_result = poll_rest (NULL, stop_fd, shorter_handles, shorter_handle_to_fd, shorter_nhandles, 0); + return (recursed_result == -1) ? -1 : retval + recursed_result; + } + return retval; + } + + return 0; +} + +typedef struct +{ + HANDLE handles[MAXIMUM_WAIT_OBJECTS]; + GPollFD *handle_to_fd[MAXIMUM_WAIT_OBJECTS]; + GPollFD *msg_fd; + GPollFD *stop_fd; + gint nhandles; + gint timeout; +} GWin32PollThreadData; + +static gint +poll_single_thread (GWin32PollThreadData *data) +{ + int retval; + + /* Polling for several things? */ + if (data->nhandles > 1 || (data->nhandles > 0 && data->msg_fd != NULL)) + { + /* First check if one or several of them are immediately + * available + */ + retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, 0); + + /* If not, and we have a significant timeout, poll again with + * timeout then. Note that this will return indication for only + * one event, or only for messages. + */ + if (retval == 0 && (data->timeout == INFINITE || data->timeout > 0)) + retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, data->timeout); + } + else + { + /* Just polling for one thing, so no need to check first if + * available immediately + */ + retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, data->timeout); + } + + return retval; +} + +static void +fill_poll_thread_data (GPollFD *fds, + guint nfds, + gint timeout, + GPollFD *stop_fd, + GWin32PollThreadData *data) +{ + GPollFD *f; + + data->timeout = timeout; + + if (stop_fd != NULL) + { + if (_wing_main_poll_debug) + g_print (" Stop FD: %p", (HANDLE) stop_fd->fd); + + data->stop_fd = stop_fd; + data->handle_to_fd[data->nhandles] = stop_fd; + data->handles[data->nhandles++] = (HANDLE) stop_fd->fd; + } + + for (f = fds; f < &fds[nfds]; ++f) + { + if ((data->nhandles == MAXIMUM_WAIT_OBJECTS) || + (data->msg_fd != NULL && (data->nhandles == MAXIMUM_WAIT_OBJECTS - 1))) + { + g_warning ("Too many handles to wait for!"); + break; + } + + if (f->fd == G_WIN32_MSG_HANDLE && (f->events & G_IO_IN)) + { + if (_wing_main_poll_debug && data->msg_fd == NULL) + g_print (" MSG"); + data->msg_fd = f; + } + else if (f->fd > 0) + { + if (_wing_main_poll_debug) + g_print (" %p", (HANDLE) f->fd); + data->handle_to_fd[data->nhandles] = f; + data->handles[data->nhandles++] = (HANDLE) f->fd; + } + + f->revents = 0; + } +} + +static guint __stdcall +poll_thread_run (gpointer user_data) +{ + GWin32PollThreadData *data = user_data; + + /* Docs say that it is safer to call _endthreadex by our own */ + _endthreadex (poll_single_thread (data)); + + g_assert_not_reached (); + + return 0; +} + +/* One slot for a possible msg object and another for the stop event */ +#define MAXIMUM_WAIT_OBJECTS_PER_THREAD (MAXIMUM_WAIT_OBJECTS - 2) + +gint +wing_poll (GPollFD *fds, + guint nfds, + gint timeout) +{ + guint nthreads, threads_remain; + HANDLE thread_handles[MAXIMUM_WAIT_OBJECTS]; + GWin32PollThreadData *threads_data; + GPollFD stop_event = { 0, }; + GPollFD *f; + guint i, fds_idx = 0; + DWORD ready; + DWORD thread_retval; + int retval; + GPollFD *msg_fd = NULL; + + if (timeout == -1) + timeout = INFINITE; + + /* Simple case without extra threads */ + if (nfds <= MAXIMUM_WAIT_OBJECTS) + { + GWin32PollThreadData data = { 0, }; + + if (_wing_main_poll_debug) + g_print ("wing_poll: waiting for"); + + fill_poll_thread_data (fds, nfds, timeout, NULL, &data); + + if (_wing_main_poll_debug) + g_print ("\n"); + + retval = poll_single_thread (&data); + if (retval == -1) + for (f = fds; f < &fds[nfds]; ++f) + f->revents = 0; + + return retval; + } + + if (_wing_main_poll_debug) + g_print ("wing_poll: polling with threads\n"); + + nthreads = nfds / MAXIMUM_WAIT_OBJECTS_PER_THREAD; + threads_remain = nfds % MAXIMUM_WAIT_OBJECTS_PER_THREAD; + if (threads_remain > 0) + nthreads++; + + if (nthreads > MAXIMUM_WAIT_OBJECTS) + { + g_warning ("Too many handles to wait for in threads!"); + nthreads = MAXIMUM_WAIT_OBJECTS; + } + +#if GLIB_SIZEOF_VOID_P == 8 + stop_event.fd = (gint64)CreateEventW (NULL, TRUE, FALSE, NULL); +#else + stop_event.fd = (gint)CreateEventW (NULL, TRUE, FALSE, NULL); +#endif + stop_event.events = G_IO_IN; + + threads_data = g_new0 (GWin32PollThreadData, nthreads); + for (i = 0; i < nthreads; i++) + { + guint thread_fds; + guint ignore; + + if (i == (nthreads - 1) && threads_remain > 0) + thread_fds = threads_remain; + else + thread_fds = MAXIMUM_WAIT_OBJECTS_PER_THREAD; + + fill_poll_thread_data (fds + fds_idx, thread_fds, timeout, &stop_event, &threads_data[i]); + fds_idx += thread_fds; + + /* We must poll for messages from the same thread, so poll it along with the threads */ + if (threads_data[i].msg_fd != NULL) + { + msg_fd = threads_data[i].msg_fd; + threads_data[i].msg_fd = NULL; + } + + thread_handles[i] = (HANDLE) _beginthreadex (NULL, 0, poll_thread_run, &threads_data[i], 0, &ignore); + } + + /* Wait for at least one thread to return */ + if (msg_fd != NULL) + ready = MsgWaitForMultipleObjectsEx (nthreads, thread_handles, timeout, + QS_ALLINPUT, MWMO_ALERTABLE); + else + ready = WaitForMultipleObjects (nthreads, thread_handles, timeout > 0, timeout); + + /* Signal the stop in case any of the threads did not stop yet */ + SetEvent ((HANDLE)stop_event.fd); + + /* Wait for the rest of the threads to finish */ + WaitForMultipleObjects (nthreads, thread_handles, TRUE, INFINITE); + + /* The return value of all the threads give us all the fds that changed state */ + retval = 0; + if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nthreads) + { + msg_fd->revents |= G_IO_IN; + retval = 1; + } + + for (i = 0; i < nthreads; i++) + { + if (GetExitCodeThread (thread_handles[i], &thread_retval)) + retval = retval == -1 ? -1 : thread_retval == -1 ? -1 : retval + thread_retval; + + CloseHandle (thread_handles[i]); + } + + if (retval == -1) + for (f = fds; f < &fds[nfds]; ++f) + f->revents = 0; + + g_free (threads_data); + CloseHandle ((HANDLE)stop_event.fd); + + return retval; +} diff --git a/wing/wingpoll.h b/wing/wingpoll.h new file mode 100644 index 0000000..111cbb6 --- /dev/null +++ b/wing/wingpoll.h @@ -0,0 +1,46 @@ +/* + * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald + * + * gpoll.c: poll(2) abstraction + * Copyright 1998 Owen Taylor + * Copyright 2008 Red Hat, Inc. + * Copyright (C) 2018 NICE s.r.l. + * + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +/* + * Modified by the GLib Team and others 1997-2000. See the AUTHORS + * file for a list of people on the GLib Team. See the ChangeLog + * files for a list of changes. These files are distributed with + * GLib at ftp://ftp.gtk.org/pub/gtk/. + */ + +#ifndef WING_POLL_H +#define WING_POLL_H + +#include +#include + +G_BEGIN_DECLS + +WING_AVAILABLE_IN_ALL +gint wing_poll (GPollFD *fds, + guint nfds, + gint timeout); + +G_END_DECLS + +#endif /* WING_UTILS_H */ -- GitLab