Commit 215e594d authored by Michael Gratton's avatar Michael Gratton 🤞

Merge branch 'wip/limit-replay-op-retries' into 'master'

Replay queue and IMAP IDLE cleanup

See merge request !99
parents 3a120f11 57e39084
Pipeline #57325 passed with stages
in 21 minutes and 17 seconds
......@@ -961,7 +961,7 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
return;
} catch (Error err) {
ErrorContext context = new ErrorContext(err);
if (is_unrecoverable_failure(err)) {
if (!is_recoverable_failure(err)) {
debug("Unrecoverable failure opening remote, forcing closed: %s",
context.format_full_error());
yield force_close(
......
......@@ -42,7 +42,7 @@ private abstract class Geary.ImapEngine.ReplayOperation : Geary.BaseObject, Gee.
public enum OnError {
THROW,
RETRY,
IGNORE
IGNORE_REMOTE
}
public string name { get; set; }
......
......@@ -12,10 +12,18 @@
* ensure the execution of the operations maintains consistent.
*/
private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
// this value is high because delays between back-to-back unsolicited notifications have been
// see as high as 250ms
// Maximum number of times a retry-able operation should be
// retried before failing. It's set to 1 since we only attempt to
// retry if there's some transient error, so if it doesn't work
// second time around, it probably won't work at all.
private const int MAX_OP_RETRIES = 1;
// This value is high because delays between back-to-back
// unsolicited notifications have been see as high as 250ms
private const int NOTIFICATION_QUEUE_WAIT_MSEC = 1000;
private enum State {
OPEN,
CLOSING,
......@@ -29,7 +37,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
public CloseReplayQueue() {
// LOCAL_AND_REMOTE to make sure this operation is flushed all the way down the pipe
base ("CloseReplayQueue", ReplayOperation.Scope.LOCAL_AND_REMOTE, OnError.IGNORE);
base ("CloseReplayQueue", ReplayOperation.Scope.LOCAL_AND_REMOTE, OnError.IGNORE_REMOTE);
}
public override async ReplayOperation.Status replay_local_async()
......@@ -522,9 +530,10 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
// If a recoverable failure and operation allows
// remote replay and not closing, re-schedule now
if ((op.on_remote_error == ReplayOperation.OnError.RETRY)
&& !is_unrecoverable_failure(replay_err)
&& state == State.OPEN) {
if (op.on_remote_error == RETRY &&
op.remote_retry_count <= MAX_OP_RETRIES &&
is_recoverable_failure(replay_err) &&
state == State.OPEN) {
debug("Schedule op retry %s on %s", op.to_string(), to_string());
// the Folder will disconnect and reconnect due to the hard error and
......@@ -532,15 +541,16 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
// normalized
op.remote_retry_count++;
remote_queue.send(op);
continue;
} else if (op.on_remote_error == ReplayOperation.OnError.IGNORE) {
} else if (op.on_remote_error == IGNORE_REMOTE &&
is_remote_error(replay_err)) {
// ignoring error, simply notify as completed and continue
debug("Ignoring op %s on %s", op.to_string(), to_string());
debug("Ignoring remote error op %s on %s", op.to_string(), to_string());
} else {
debug("Throwing remote error for op %s on %s: %s", op.to_string(), to_string(),
debug("Throwing error for op %s on %s: %s", op.to_string(), to_string(),
replay_err.message);
// store for notification
remote_err = replay_err;
}
......
......@@ -18,8 +18,8 @@ namespace Geary.ImapEngine {
* succeed if tried again unless some action is taken, such as
* authentication failures, protocol parsing errors, and so on.
*/
private static bool is_unrecoverable_failure(GLib.Error err) {
return !(
private static bool is_recoverable_failure(GLib.Error err) {
return (
err is EngineError.SERVER_UNAVAILABLE ||
err is IOError.BROKEN_PIPE ||
err is IOError.BUSY ||
......
......@@ -82,7 +82,7 @@ private abstract class Geary.ImapEngine.AbstractListEmail : Geary.ImapEngine.Sen
public AbstractListEmail(string name, MinimalFolder owner, Geary.Email.Field required_fields,
Folder.ListFlags flags, Cancellable? cancellable) {
base(name, OnError.IGNORE);
base(name, OnError.IGNORE_REMOTE);
this.owner = owner;
this.required_fields = required_fields;
......
......@@ -22,7 +22,7 @@ private class Geary.ImapEngine.ReplayAppend : Geary.ImapEngine.ReplayOperation {
Cancellable? cancellable) {
// IGNORE remote errors because the reconnect will re-normalize the folder, making this
// append moot
base ("Append", Scope.REMOTE_ONLY, OnError.IGNORE);
base ("Append", Scope.REMOTE_ONLY, OnError.IGNORE_REMOTE);
this.owner = owner;
this.remote_count = remote_count;
......
......@@ -17,7 +17,7 @@ private class Geary.ImapEngine.ReplayRemoval : Geary.ImapEngine.ReplayOperation
public ReplayRemoval(MinimalFolder owner, int remote_count, Imap.SequenceNumber position) {
// remote error will cause folder to reconnect and re-normalize, making this remove moot
base ("Removal", Scope.LOCAL_AND_REMOTE, OnError.IGNORE);
base ("Removal", Scope.LOCAL_AND_REMOTE, OnError.IGNORE_REMOTE);
this.owner = owner;
this.remote_count = remote_count;
......
......@@ -198,8 +198,7 @@ public class Geary.Imap.ClientConnection : BaseObject {
this.idle_timer.start();
}
} else {
this.idle_timer.reset();
exit_idle();
cancel_idle();
}
}
......@@ -317,7 +316,7 @@ public class Geary.Imap.ClientConnection : BaseObject {
this.pending_queue.send(new_command);
// Exit IDLE so we can get on with life
exit_idle();
cancel_idle();
}
public string to_string() {
......@@ -328,6 +327,25 @@ public class Geary.Imap.ClientConnection : BaseObject {
);
}
/**
* Returns the command that has been sent with the given tag.
*
* This should be private, but is internal for the
* ClientSession.on_received_status_response IDLE workaround.
*/
internal Command? get_sent_command(Tag tag) {
Command? sent = null;
if (tag.is_tagged()) {
foreach (Command queued in this.sent_queue) {
if (tag.equal_to(queued.tag)) {
sent = queued;
break;
}
}
}
return sent;
}
private async void open_channels_async() throws Error {
assert(ios != null);
assert(ser == null);
......@@ -394,7 +412,8 @@ public class Geary.Imap.ClientConnection : BaseObject {
}
}
private inline void exit_idle() {
private inline void cancel_idle() {
this.idle_timer.reset();
IdleCommand? idle = this.current_command as IdleCommand;
if (idle != null) {
idle.exit_idle();
......@@ -425,8 +444,7 @@ public class Geary.Imap.ClientConnection : BaseObject {
// Only send IDLE commands if they are the last in the
// queue, there's no point otherwise.
bool pending_idle = pending is IdleCommand;
if (!pending_idle || this.pending_queue.is_empty) {
if (!(pending is IdleCommand) || this.pending_queue.is_empty) {
yield flush_command(pending, cancellable);
}
......@@ -434,9 +452,6 @@ public class Geary.Imap.ClientConnection : BaseObject {
// command, since that might have changed.
if (this.pending_queue.is_empty) {
yield this.ser.flush_stream(cancellable);
if (this.idle_when_quiet && !pending_idle) {
this.idle_timer.start();
}
}
} catch (GLib.Error err) {
if (!(err is GLib.IOError.CANCELLED)) {
......@@ -482,19 +497,6 @@ public class Geary.Imap.ClientConnection : BaseObject {
}
}
private Command? get_sent_command(Tag tag) {
Command? sent = null;
if (tag.is_tagged()) {
foreach (Command queued in this.sent_queue) {
if (tag.equal_to(queued.tag)) {
sent = queued;
break;
}
}
}
return sent;
}
private void check_connection() throws ImapError {
if (this.cx == null) {
throw new ImapError.NOT_CONNECTED(
......@@ -504,37 +506,34 @@ public class Geary.Imap.ClientConnection : BaseObject {
}
private void on_parameters_ready(RootParameters root) {
ServerResponse response;
try {
response = ServerResponse.migrate_from_server(root);
StatusResponse? status = response as StatusResponse;
if (status != null) {
on_status_response(status);
return;
}
ServerData? data = response as ServerData;
if (data != null) {
on_server_data(data);
return;
}
ContinuationResponse? continuation = response as ContinuationResponse;
if (continuation != null) {
on_continuation_response(continuation);
return;
ServerResponse response = ServerResponse.migrate_from_server(root);
GLib.Type type = response.get_type();
if (type == typeof(StatusResponse)) {
on_status_response((StatusResponse) response);
} else if (type == typeof(ServerData)) {
on_server_data((ServerData) response);
} else if (type == typeof(ContinuationResponse)) {
on_continuation_response((ContinuationResponse) response);
} else {
warning(
"[%s] Unknown ServerResponse of type %s received: %s:",
to_string(), response.get_type().name(),
response.to_string()
);
}
} catch (ImapError err) {
received_bad_response(root, err);
return;
}
warning(
"[%s] Unknown ServerResponse of type %s received: %s:",
to_string(), response.get_type().name(),
response.to_string()
);
if (this.pending_queue.is_empty && this.sent_queue.is_empty) {
// There's nothing remaining to send, and every sent
// command has been dealt with, so ready an IDLE command.
if (this.idle_when_quiet) {
this.idle_timer.start();
}
}
}
private void on_status_response(StatusResponse status)
......
......@@ -1735,38 +1735,48 @@ public class Geary.Imap.ClientSession : BaseObject {
private void on_received_status_response(StatusResponse status_response) {
this.last_seen = GLib.get_real_time();
// reschedule keepalive (traffic seen on channel)
schedule_keepalive();
// If a CAPABILITIES ResponseCode, decode and update capabilities ...
// some servers do this to prevent a second round-trip
ResponseCode? response_code = status_response.response_code;
if (response_code != null) {
try {
if (response_code.get_response_code_type().is_value(ResponseCodeType.CAPABILITY)) {
capabilities = response_code.get_capabilities(ref next_capabilities_revision);
debug("[%s] %s %s", to_string(), status_response.status.to_string(),
capabilities.to_string());
capability(capabilities);
// XXX Need to ignore emitted IDLE status responses. They are
// emitted by ClientConnection because it doesn't make any
// sense not to, and so they get logged by that class's
// default handlers, but because they are snooped on here (and
// even worse are used to push FSM transitions, rather relying
// on the actual commands themselves), we need to check for
// IDLE responses and ignore them.
Command? command = this.cx.get_sent_command(status_response.tag);
if (command == null || !(command is IdleCommand)) {
// If a CAPABILITIES ResponseCode, decode and update
// capabilities ... some servers do this to prevent a
// second round-trip
ResponseCode? response_code = status_response.response_code;
if (response_code != null) {
try {
if (response_code.get_response_code_type().is_value(ResponseCodeType.CAPABILITY)) {
capabilities = response_code.get_capabilities(ref next_capabilities_revision);
debug("[%s] %s %s", to_string(), status_response.status.to_string(),
capabilities.to_string());
capability(capabilities);
}
} catch (Error err) {
debug("[%s] Unable to convert response code to capabilities: %s", to_string(),
err.message);
}
} catch (Error err) {
debug("[%s] Unable to convert response code to capabilities: %s", to_string(),
err.message);
}
}
// update state machine before notifying subscribers, who may turn around and query ClientSession
if (status_response.is_completion) {
fsm.issue(Event.RECV_COMPLETION, null, status_response, null);
} else {
fsm.issue(Event.RECV_STATUS, null, status_response, null);
}
// update state machine before notifying subscribers, who
// may turn around and query ClientSession
if (status_response.is_completion) {
fsm.issue(Event.RECV_COMPLETION, null, status_response, null);
} else {
fsm.issue(Event.RECV_STATUS, null, status_response, null);
}
status_response_received(status_response);
status_response_received(status_response);
}
}
private void notify_received_data(ServerData server_data) throws ImapError {
switch (server_data.server_data_type) {
case ServerDataType.CAPABILITY:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment