sync.rs 6.42 KB
Newer Older
1
use globals;
2
use std::{thread, time};
3 4 5 6
use error::Error;
use util::json_q;
use util::get_rooms_from_json;
use util::get_rooms_timeline_from_json;
7
use util::get_rooms_notifies_from_json;
8
use util::parse_sync_events;
9
use util::parse_m_direct;
10 11 12 13
use backend::types::BKResponse;
use backend::types::Backend;
use types::Room;

14
pub fn sync(bk: &Backend, new_since: Option<String>) -> Result<(), Error> {
15 16 17 18 19
    let tk = bk.data.lock().unwrap().access_token.clone();
    if tk.is_empty() {
        return Err(Error::BackendError);
    }

20
    let mut since = bk.data.lock().unwrap().since.clone().or(new_since);
21 22 23
    let userid = bk.data.lock().unwrap().user_id.clone();

    let mut params: Vec<(&str, String)> = vec![];
24
    params.push(("full_state", String::from("false")));
25

26 27
    let timeout;

28 29 30 31 32
    if let Some(since) = since.clone() {
        params.push(("since", since));
        params.push(("timeout", String::from("30000")));
        timeout = 30;
    } else {
33 34 35
        let filter = format!("{{
            \"room\": {{
                \"state\": {{
36 37
                    \"types\": [\"m.room.*\"],
                    \"not_types\": [\"m.room.member\"]
38 39
                }},
                \"timeline\": {{
40
                    \"types\": [\"m.room.message\", \"m.sticker\"],
41
                    \"limit\": {}
42
                }},
43
                \"ephemeral\": {{ \"types\": [] }}
44 45 46 47 48 49
            }},
            \"presence\": {{ \"types\": [] }},
            \"event_format\": \"client\",
            \"event_fields\": [\"type\", \"content\", \"sender\", \"event_id\", \"age\", \"unsigned\"]
        }}", globals::PAGE_LIMIT);

50 51
        params.push(("filter", String::from(filter)));
        params.push(("timeout", String::from("0")));
52
        timeout = 0;
53 54 55 56 57 58 59 60 61 62 63
    }

    let baseu = bk.get_base_url()?;
    let url = bk.url("sync", params)?;

    let tx = bk.tx.clone();
    let data = bk.data.clone();

    let attrs = json!(null);

    thread::spawn(move || {
64
        match json_q("get", &url, &attrs, timeout) {
65 66
            Ok(r) => {
                let next_batch = String::from(r["next_batch"].as_str().unwrap_or(""));
67
                if let Some(since) = since {
68 69 70 71 72 73
                    // New rooms
                    match get_rooms_from_json(&r, &userid, &baseu) {
                        Ok(rs) => tx.send(BKResponse::NewRooms(rs)).unwrap(),
                        Err(err) => tx.send(BKResponse::SyncError(err)).unwrap(),
                    };

74
                    // Message events
75
                    match get_rooms_timeline_from_json(&baseu, &r, tk.clone(), since.clone()) {
76 77 78
                        Ok(msgs) => tx.send(BKResponse::RoomMessages(msgs)).unwrap(),
                        Err(err) => tx.send(BKResponse::RoomMessagesError(err)).unwrap(),
                    };
79
                    // Room notifications
80
                    match get_rooms_notifies_from_json(&r) {
81 82 83 84 85 86 87
                        Ok(notifies) => {
                            for (r, n, h) in notifies {
                                tx.send(BKResponse::RoomNotifications(r.clone(), n, h)).unwrap();
                            }
                        },
                        Err(_) => {}
                    };
88 89 90 91 92 93 94
                    // Other events
                    match parse_sync_events(&r) {
                        Err(err) => tx.send(BKResponse::SyncError(err)).unwrap(),
                        Ok(events) => {
                            for ev in events {
                                match ev.stype.as_ref() {
                                    "m.room.name" => {
95
                                        let name = String::from(ev.content["name"].as_str().unwrap_or(""));
96 97 98
                                        tx.send(BKResponse::RoomName(ev.room.clone(), name)).unwrap();
                                    }
                                    "m.room.topic" => {
99
                                        let t = String::from(ev.content["topic"].as_str().unwrap_or(""));
100 101 102 103 104 105 106 107
                                        tx.send(BKResponse::RoomTopic(ev.room.clone(), t)).unwrap();
                                    }
                                    "m.room.avatar" => {
                                        tx.send(BKResponse::NewRoomAvatar(ev.room.clone())).unwrap();
                                    }
                                    "m.room.member" => {
                                        tx.send(BKResponse::RoomMemberEvent(ev)).unwrap();
                                    }
108 109 110
                                    "m.sticker" => {
                                        // This event is managed in the room list
                                    }
111
                                    _ => {
112
                                        eprintln!("EVENT NOT MANAGED: {:?}", ev);
113 114 115 116 117
                                    }
                                }
                            }
                        }
                    };
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
                } else {
                   data.lock().unwrap().m_direct = parse_m_direct(&r);

                    let rooms = match get_rooms_from_json(&r, &userid, &baseu) {
                        Ok(rs) => rs,
                        Err(err) => {
                            tx.send(BKResponse::SyncError(err)).unwrap();
                            vec![]
                        }
                    };

                    let mut def: Option<Room> = None;
                    let jtr = data.lock().unwrap().join_to_room.clone();
                    if !jtr.is_empty() {
                        if let Some(r) = rooms.iter().find(|x| x.id == jtr) {
                            def = Some(r.clone());
                        }
                    }
                    tx.send(BKResponse::Rooms(rooms, def)).unwrap();
137 138 139
                }

                tx.send(BKResponse::Sync(next_batch.clone())).unwrap();
140 141 142 143 144
                data.lock().unwrap().since = if !next_batch.is_empty() {
                    Some(next_batch)
                } else {
                    None
                }
145
            },
146 147 148 149 150 151 152 153
            Err(err) => {
                // we wait if there's an error to avoid 100% CPU
                eprintln!("Sync Error, waiting 10 seconds to respond for the next sync");
                let ten_seconds = time::Duration::from_millis(10000);
                thread::sleep(ten_seconds);

                tx.send(BKResponse::SyncError(err)).unwrap();
            }
154 155 156 157 158 159 160
        };
    });

    Ok(())
}

pub fn force_sync(bk: &Backend) -> Result<(), Error> {
161
    bk.data.lock().unwrap().since = None;
162
    sync(bk, None)
163
}