Cannot Close socket connection after calling soup_session_websocket_connect_async but without waiting for the connected callback
After calling soup_session_websocket_connect_async, libsoup seems to establish a connection with the server, when the callback(passed to soup_session_websocket_connect_async) is triggered, I can get a connection(using soup_session_websocket_connect_finish). Then I can use soup_websocket_connection_close to close the connection.
What I want is closing the connection even before the callback is triggered. I tried soup_session_abort, g_cancellable_cancel, but they do not seem to work, the connection is not closed on the server side. It seems the connection to the server only closes after client calls soup_websocket_connection_close(From the server side log, calling soup_session_abort and g_cancellable_cancel does not trigger soup_websocket_closed_cb on server side, sometimes g_alive_connection_count will not decrease after calling stop_client on client, especially the stop_client is called few time after start_client).
So how to close the connection after calling soup_session_websocket_connect_async but before the connection callback is triggered?
I am using libsoup 2.74.3
Please help, Thanks!
The codes
Server Codes
std::mutex g_lock_alive;
int g_alive_connection_count = 0;
void soup_websocket_closed_cb(SoupWebsocketConnection* connection, gpointer user_data) {
std::lock_guard<std::mutex> auto_lock(g_lock_alive);
g_alive_connection_count--;
printf("aliving connection count:%d \n", g_alive_connection_count)
}
void soup_websocket_handler(SoupServer* server, SoupWebsocketConnection* connection, const char* path, SoupClientContext* client, gpointer user_data) {
g_signal_connect(G_OBJECT(connection), "closed", G_CALLBACK(soup_websocket_closed_cb), NULL);
std::lock_guard<std::mutex> auto_lock(g_lock_alive);
g_alive_connection_count++;
printf("aliving connection count:%d \n", g_alive_connection_count)
}
void start_server() {
int argc = 0;
gst_init(&argc, NULL);
GMainContext *m_context = g_main_context_default();
GMainLoop *m_loop = g_main_loop_new(m_context, false);
SoupServer* m_server = soup_server_new(SOUP_SERVER_SERVER_HEADER, "Demo Server", NULL);
// Set up the request handler
soup_server_add_websocket_handler(m_server, "/test", NULL, NULL,
soup_websocket_handler, NULL, NULL);
// Set the port to listen on
soup_server_listen_all(m_server, 8080, SOUP_SERVER_LISTEN_IPV4_ONLY, NULL);
g_main_loop_run(m_loop);
}
Client Codes
class SocketClient {
public:
SocketClient() {
}
virtual ~SocketClient() {
Quit();
}
public:
void Start(const char *server_ip);
protected:
void RunLoop();
static gboolean AsyncQuit(gpointer user_data);
void OnQuitting();
static gboolean AsyncConnectFun(gpointer user_data);
void ConnectServer();
static gboolean
ServerConnectedCallback(SoupSession *session, GAsyncResult *res,
void *user_data);
bool OnServerConnected(SoupSession *session, GAsyncResult *res);
static void
ServerClosedCallback(SoupWebsocketConnection *conn, void *user_data);
void OnServerClosed(SoupWebsocketConnection *connection);
protected:
std::string m_server_ip;
bool m_receive_loop_running = true;
std::shared_ptr<std::thread> m_receive_thread;
std::mutex m_resource_lock;
GMainContext *m_context = NULL;
GMainLoop *m_receive_loop = NULL;
SoupSession *m_soup_session = NULL;
GCancellable *m_cancellable = NULL;
SoupMessage *m_soup_message = NULL;
SoupLogger *m_logger = NULL;
SoupWebsocketConnection *m_connection = NULL;
};
void SocketClient::Start(const char *server_ip) {
if (server_ip) {
m_server_ip = server_ip;
}
m_receive_thread = std::make_shared<std::thread>([this]() {
RunLoop();
});
}
void SocketClient::RunLoop() {
GMainContext *context = g_main_context_new();
g_main_context_invoke(context, (GSourceFunc) AsyncConnectFun, this);
g_main_context_push_thread_default(context);
GMainLoop *loop = g_main_loop_new(context, FALSE);
m_receive_loop = loop;
m_context = context;
g_main_loop_run(loop);
{
std::lock_guard<std::mutex> auto_lock(m_resource_lock);
m_context = NULL;
}
m_receive_loop = NULL;
if (m_connection) {
SoupWebsocketState state = soup_websocket_connection_get_state(m_connection);
if (SOUP_WEBSOCKET_STATE_OPEN == state)
{
soup_websocket_connection_close(m_connection, 1000, "");
}
g_object_unref(m_connection);
m_connection = NULL;
}
if (m_cancellable) {
g_object_unref(m_cancellable);
m_cancellable = NULL;
}
if (m_soup_session) {
soup_session_abort(m_soup_session);
g_object_unref(m_soup_session);
m_soup_session = NULL;
}
if (m_soup_message) {
g_object_unref(m_soup_message);
m_soup_message = NULL;
}
if (m_logger) {
g_object_unref(m_logger);
m_logger = NULL;
}
g_main_context_pop_thread_default(context);
g_main_loop_unref(loop);
g_main_context_unref(context);
m_receive_loop_running = false;
}
void SocketClient::Quit() {
{
std::lock_guard<std::mutex> auto_lock(m_resource_lock);
if (m_context) {
g_main_context_invoke(m_context, (GSourceFunc) AsyncQuit, this);
}
}
while(m_receive_loop_running) {
}
if (m_receive_thread) {
m_receive_thread->join();
m_receive_thread = nullptr;
}
}
gboolean SocketClient::AsyncQuit(gpointer user_data) {
SocketClient *client = (SocketClient *) user_data;
if (client) {
client->OnQuitting();
}
return G_SOURCE_REMOVE;
}
void SocketClient::OnQuitting() {
if (!m_connection && m_cancellable) {
g_cancellable_cancel(m_cancellable);
}
if (m_receive_loop) {
g_main_loop_quit(m_receive_loop);
}
}
gboolean SocketClient::AsyncConnectFun(gpointer user_data) {
SocketClient *client = (SocketClient *) user_data;
if (client) {
client->ConnectServer();
}
return G_SOURCE_REMOVE;
}
void SocketClient::ConnectServer() {
const char *https_aliases[] = {"wss", NULL};
SoupSession *soup_session = soup_session_new_with_options(
SOUP_SESSION_SSL_STRICT, TRUE,
SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL);
// Set up SoupLogger
SoupLogger *logger = soup_logger_new(SOUP_LOGGER_LOG_BODY, -1);
soup_logger_set_printer(logger, log_callback, NULL, NULL);
soup_session_add_feature(soup_session, SOUP_SESSION_FEATURE(logger));
gchar *server_url = g_strdup(m_server_ip.c_str());
SoupMessage *soup_message = soup_message_new(SOUP_METHOD_GET, server_url);
g_free(server_url);
// Once connected, we will register
m_cancellable = g_cancellable_new();
soup_session_websocket_connect_async(soup_session, soup_message, NULL,
NULL, m_cancellable,
(GAsyncReadyCallback) ServerConnectedCallback,
this);
m_soup_session = soup_session;
m_logger = logger;
m_soup_message = soup_message;
}
gboolean SocketClient::ServerConnectedCallback(SoupSession *session,
GAsyncResult *res,
void *user_data) {
SocketClient *client = (SocketClient *) user_data;
if (client) {
return client->OnServerConnected(session, res);
}
return FALSE;
}
bool
SocketClient::OnServerConnected(SoupSession *session, GAsyncResult *res) {
GError *error = NULL;
SoupWebsocketConnection *connection = soup_session_websocket_connect_finish(session, res, &error);
if (error) {
g_error_free(error);
if (m_receive_loop) {
g_main_loop_quit(m_receive_loop);
}
return false;
}
soup_websocket_connection_set_max_incoming_payload_size(connection,
16 * 1024 * 1024);
soup_websocket_connection_set_keepalive_interval(connection, 1);
g_signal_connect (connection, "message",
G_CALLBACK(ServerMessageCallback), this);
g_signal_connect (connection, "closed", G_CALLBACK(ServerClosedCallback),
this);
if (m_cancellable) {
g_object_unref(m_cancellable);
m_cancellable = NULL;
}
if (m_connection) {
soup_websocket_connection_close(m_connection, 1000, "");
g_object_unref(m_connection);
}
m_connection = connection;
return true;
}
void SocketClient::ServerClosedCallback(SoupWebsocketConnection *conn,
void *user_data) {
SocketClient *client = (SocketClient *) user_data;
if (client) {
client->OnServerClosed(conn);
}
}
void SocketClient::OnServerClosed(SoupWebsocketConnection *connection) {
SoupWebsocketState state = soup_websocket_connection_get_state(connection);
if (m_receive_loop) {
g_main_loop_quit(m_receive_loop);
}
if (state == SOUP_WEBSOCKET_STATE_CLOSED) {
}
}
void SocketClient::ServerMessageCallback(SoupWebsocketConnection *conn,
SoupWebsocketDataType type,
GBytes *message, void *user_data) {
SocketClient *client = (SocketClient *) user_data;
if (client) {
client->OnServerMessage(conn, type, message);
}
}
void SocketClient::OnServerMessage(SoupWebsocketConnection *connection,
SoupWebsocketDataType type,
GBytes *message) {
}
}
std::mutex g_lock;
std::shared_ptr<SocketClient> g_client;
void start_client(const char *server_ip) {
std::lock_guard<std::mutex> auto_lock(g_lock);
if (g_client) {
g_client = nullptr;
}
if (!g_client) {
g_client = std::make_shared<SocketClient>();
}
g_client->Start(server_ip);
}
void stop_client() {
std::lock_guard<std::mutex> auto_lock(g_lock);
if (g_client) {
g_client = nullptr;
}
}
int main() {
while(TRUE) {
start_client();
g_usleep(200000); //200 million seconds
stop_client();
}
}