Commit 0b2622b1 authored by Daniel García Moreno's avatar Daniel García Moreno

Add a message queue and retry send on failure

I've added a message queue for new messages to keep the list of temp
messages, this messages are sent one by one and that means that we'll
preserve the order always.

If a send message fails the app tries to resend the same message again
and don't try to send any new message util the first one is sent
correctly.

Fix #201, #173 and #140
parent 16e6e805
Pipeline #14910 passed with stages
in 39 minutes and 22 seconds
......@@ -203,9 +203,9 @@ pub fn get_message_context(bk: &Backend, msg: Message) -> Result<(), Error> {
pub fn send_msg(bk: &Backend, msg: Message) -> Result<(), Error> {
let roomid = msg.room.clone();
let msgid = msg.get_txn_id();
let url = bk.url(&format!("rooms/{}/send/m.room.message/{}", roomid, msgid), vec![])?;
let id = msg.id.unwrap_or_default();
let url = bk.url(&format!("rooms/{}/send/m.room.message/{}", roomid, id), vec![])?;
let mut attrs = json!({
"body": msg.body.clone(),
......@@ -223,10 +223,11 @@ pub fn send_msg(bk: &Backend, msg: Message) -> Result<(), Error> {
let tx = bk.tx.clone();
query!("put", &url, &attrs,
move |_| {
tx.send(BKResponse::SendMsg).unwrap();
move |js: JsonValue| {
let evid = js["event_id"].as_str().unwrap_or_default();
tx.send(BKResponse::SentMsg(id, evid.to_string())).unwrap();
},
|err| { tx.send(BKResponse::SendMsgError(err)).unwrap(); }
|_| { tx.send(BKResponse::SendMsgError(Error::SendMsgError(id))).unwrap(); }
);
Ok(())
......
......@@ -90,9 +90,10 @@ pub fn send(bk: &Backend, roomid: String, sticker: &Sticker) -> Result<(), Error
let now = Local::now();
let msg = format!("{}{}{}", roomid, sticker.name, now.to_string());
let digest = md5::compute(msg.as_bytes());
let msgid = format!("{:x}", digest);
// TODO: we need to generate the msg.id in the frontend
let id = format!("{:x}", digest);
let url = bk.url(&format!("rooms/{}/send/m.sticker/{}", roomid, msgid), vec![])?;
let url = bk.url(&format!("rooms/{}/send/m.sticker/{}", roomid, id), vec![])?;
let attrs = json!({
"body": sticker.body.clone(),
......@@ -106,10 +107,11 @@ pub fn send(bk: &Backend, roomid: String, sticker: &Sticker) -> Result<(), Error
let tx = bk.tx.clone();
query!("put", &url, &attrs,
move |_| {
tx.send(BKResponse::SendMsg).unwrap();
move |js: JsonValue| {
let evid = js["event_id"].as_str().unwrap_or_default();
tx.send(BKResponse::SentMsg(id, evid.to_string())).unwrap();
},
|err| { tx.send(BKResponse::SendMsgError(err)).unwrap(); }
|_| { tx.send(BKResponse::SendMsgError(Error::SendMsgError(id))).unwrap(); }
);
Ok(())
......
......@@ -104,7 +104,7 @@ pub enum BKResponse {
RoomMessagesInit(Vec<Message>),
RoomMessagesTo(Vec<Message>),
RoomMembers(String, Vec<Member>),
SendMsg,
SentMsg(String, String),
DirectoryProtocols(Vec<Protocol>),
DirectorySearch(Vec<Room>),
JoinRoom,
......
......@@ -17,6 +17,7 @@ pub enum Error {
CacheError,
ReqwestError(reqwest::Error),
MatrixError(JsonValue),
SendMsgError(String),
}
impl From<reqwest::Error> for Error {
......
......@@ -23,6 +23,8 @@ pub enum InternalCommand {
SelectRoom(Room),
LoadMoreNormal,
RemoveInv(String),
AppendTmpMessages,
ForceDequeueMessage,
#[allow(dead_code)]
SendSticker(Sticker),
#[allow(dead_code)]
......@@ -63,6 +65,12 @@ pub fn appop_loop(rx: Receiver<InternalCommand>) {
Ok(InternalCommand::RemoveInv(rid)) => {
APPOP!(remove_inv, (rid));
}
Ok(InternalCommand::AppendTmpMessages) => {
APPOP!(append_tmp_msgs);
}
Ok(InternalCommand::ForceDequeueMessage) => {
APPOP!(force_dequeue_message);
}
Ok(InternalCommand::SendSticker(sticker)) => {
APPOP!(send_sticker, (sticker));
}
......
......@@ -13,6 +13,7 @@ use std::process::Command;
use glib;
use backend::BKResponse;
use fractal_api::error::Error;
use std::sync::mpsc::RecvError;
......@@ -133,7 +134,8 @@ pub fn backend_loop(rx: Receiver<BKResponse>) {
Ok(BKResponse::RoomMessagesTo(msgs)) => {
APPOP!(show_room_messages_top, (msgs));
}
Ok(BKResponse::SendMsg) => {
Ok(BKResponse::SentMsg(txid, evid)) => {
APPOP!(msg_sent, (txid, evid));
APPOP!(sync);
}
Ok(BKResponse::DirectoryProtocols(protocols)) => {
......@@ -240,9 +242,17 @@ pub fn backend_loop(rx: Receiver<BKResponse>) {
APPOP!(show_error, (error));
APPOP!(set_state, (st));
},
Ok(BKResponse::SendMsgError(_)) => {
let error = gettext("Error sending message");
APPOP!(show_error, (error));
Ok(BKResponse::SendMsgError(err)) => {
match err {
Error::SendMsgError(txid) => {
println!("ERROR sending {}: retrying send", txid);
APPOP!(retry_send);
},
_ => {
let error = gettext("Error sending message");
APPOP!(show_error, (error));
}
}
}
Ok(BKResponse::DirectoryError(_)) => {
let error = gettext("Error searching for rooms");
......
......@@ -29,7 +29,7 @@ pub enum MsgPos {
pub struct TmpMsg {
pub msg: Message,
pub widget: gtk::Widget,
pub widget: Option<gtk::Widget>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
......@@ -174,7 +174,6 @@ impl AppOp {
}
self.shown_messages += 1;
}
self.remove_tmp_room_message(&msg);
}
}
......@@ -187,51 +186,52 @@ impl AppOp {
let m;
{
let mb = widgets::MessageBox::new(r, &msg, &self);
m = mb.widget();
m = mb.tmpwidget();
}
messages.add(&m);
}
if let Some(w) = messages.get_children().iter().last() {
self.tmp_msgs.push(TmpMsg {
self.msg_queue.insert(0, TmpMsg {
msg: msg.clone(),
widget: w.clone(),
widget: Some(w.clone()),
});
};
}
pub fn clear_tmp_msgs(&mut self) {
let messages = self.ui.builder
.get_object::<gtk::ListBox>("message_list")
.expect("Can't find message_list in ui file.");
for t in self.tmp_msgs.iter() {
messages.remove(&t.widget);
for t in self.msg_queue.iter_mut() {
if let Some(ref w) = t.widget {
w.destroy();
}
t.widget = None;
}
self.tmp_msgs.clear();
}
pub fn remove_tmp_room_message(&mut self, msg: &Message) {
pub fn append_tmp_msgs(&mut self) {
let messages = self.ui.builder
.get_object::<gtk::ListBox>("message_list")
.expect("Can't find message_list in ui file.");
let mut rmidx = None;
for (i, t) in self.tmp_msgs.iter().enumerate() {
if t.msg.sender == msg.sender &&
t.msg.mtype == msg.mtype &&
t.msg.room == msg.room &&
t.msg.body == msg.body {
if let Some(r) = self.rooms.get(&self.active_room.clone().unwrap_or_default()) {
let mut widgets = vec![];
for t in self.msg_queue.iter().rev().filter(|m| m.msg.room == r.id) {
let m;
{
let mb = widgets::MessageBox::new(r, &t.msg, &self);
m = mb.tmpwidget();
}
messages.remove(&t.widget);
rmidx = Some(i);
break;
messages.add(&m);
if let Some(w) = messages.get_children().iter().last() {
widgets.push(w.clone());
}
}
}
if rmidx.is_some() {
self.tmp_msgs.remove(rmidx.unwrap());
for (t, w) in self.msg_queue.iter_mut().rev().zip(widgets.iter()) {
t.widget = Some(w.clone());
}
}
}
......@@ -246,6 +246,45 @@ impl AppOp {
}
}
pub fn msg_sent(&mut self, _txid: String, evid: String) {
if let Some(ref mut m) = self.msg_queue.pop() {
if let Some(ref w) = m.widget {
w.destroy();
}
m.widget = None;
m.msg.id = Some(evid);
self.show_room_messages(vec![m.msg.clone()], false);
}
self.force_dequeue_message();
}
pub fn retry_send(&mut self) {
let tx = self.internal.clone();
gtk::timeout_add(5000, move || {
tx.send(InternalCommand::ForceDequeueMessage).unwrap();
gtk::Continue(false)
});
}
pub fn force_dequeue_message(&mut self) {
self.sending_message = false;
self.dequeue_message();
}
pub fn dequeue_message(&mut self) {
if self.sending_message {
return;
}
self.sending_message = true;
if let Some(next) = self.msg_queue.last() {
let msg = next.msg.clone();
self.backend.send(BKCommand::SendMsg(msg)).unwrap();
} else {
self.sending_message = false;
}
}
pub fn send_message(&mut self, msg: String) {
if msg.is_empty() {
// Not sending empty messages
......@@ -299,8 +338,9 @@ impl AppOp {
}
}
m.id = Some(m.get_txn_id());
self.add_tmp_room_message(m.clone());
self.backend.send(BKCommand::SendMsg(m)).unwrap();
self.dequeue_message();
}
pub fn attach_file(&mut self) {
......@@ -373,10 +413,15 @@ impl AppOp {
self.loading_more = false;
}
pub fn show_room_messages(&mut self, msgs: Vec<Message>, init: bool) -> Option<()> {
for msg in msgs.iter() {
pub fn show_room_messages(&mut self, newmsgs: Vec<Message>, init: bool) -> Option<()> {
let mut msgs = vec![];
for msg in newmsgs.iter() {
if let Some(r) = self.rooms.get_mut(&msg.room) {
r.messages.push(msg.clone());
if !r.messages.contains(msg) {
r.messages.push(msg.clone());
msgs.push(msg.clone());
}
}
}
......
......@@ -58,7 +58,8 @@ pub struct AppOp {
pub internal: Sender<InternalCommand>,
pub syncing: bool,
tmp_msgs: Vec<TmpMsg>,
pub msg_queue: Vec<TmpMsg>,
pub sending_message: bool,
shown_messages: usize,
pub last_viewed_messages: HashMap<String, Message>,
......@@ -122,7 +123,8 @@ impl AppOp {
server_url: String::from(globals::DEFAULT_HOMESERVER),
identity_url: String::from(globals::DEFAULT_IDENTITYSERVER),
syncing: false,
tmp_msgs: vec![],
msg_queue: vec![],
sending_message: false,
shown_messages: 0,
last_viewed_messages: HashMap::new(),
state: AppState::Login,
......
......@@ -188,6 +188,7 @@ impl AppOp {
self.is_last_viewed(&msg));
self.internal.send(command).unwrap();
}
self.internal.send(InternalCommand::AppendTmpMessages).unwrap();
self.internal.send(InternalCommand::SetPanel(RoomPanel::Room)).unwrap();
if !room.messages.is_empty() {
......
......@@ -46,6 +46,14 @@ impl<'a> MessageBox<'a> {
}
}
pub fn tmpwidget(&self) -> gtk::ListBoxRow {
let w = self.widget();
if let Some(style) = w.get_style_context() {
style.add_class("msg-tmp");
}
w
}
pub fn widget(&self) -> gtk::ListBoxRow {
// msg
// +--------+---------+
......@@ -181,7 +189,6 @@ impl<'a> MessageBox<'a> {
/// Add classes to the widget depending on the properties:
///
/// * msg-tmp: if the message doesn't have id
/// * msg-mention: if the message contains the username in the body and
/// sender is not app user
/// * msg-emote: if the message is an emote
......@@ -192,10 +199,6 @@ impl<'a> MessageBox<'a> {
let body: &str = &msg.body;
if let Some(style) = w.get_style_context() {
// temp msg, not sent yet
if msg.id.is_none() || msg.id.clone().unwrap_or_default().is_empty() {
style.add_class("msg-tmp");
}
// mentions
if String::from(body).contains(uname) && msg.sender != uid {
style.add_class("msg-mention");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment