Commit 8dd604e4 authored by Felix Häcker's avatar Felix Häcker

async_client: implement a task queue

parent 090cd3eb
......@@ -48,58 +48,71 @@ pub enum Task{
}
pub struct AsyncClient{
pub next_task: Arc<Mutex<Option<Task>>>,
pub task_queue: Arc<Mutex<Vec<Task>>>,
task_id: Arc<Mutex<i32>>,
task_queue_enabled: Arc<Mutex<bool>>,
pub base_url: Arc<String>,
pub sender: Sender<Message>,
}
impl AsyncClient{
pub fn new(base_url: &str, sender: Sender<Message>) -> Self{
let mut next_task = Arc::new(Mutex::new(None));
let mut task_queue = Arc::new(Mutex::new(Vec::new()));
let task_id = Arc::new(Mutex::new(0));
let task_queue_enabled = Arc::new(Mutex::new(true));
let base_url = Arc::new(base_url.to_string());
Self{
next_task,
task_queue,
task_id,
task_queue_enabled,
base_url,
sender,
}
}
pub fn set_task(&mut self, task: Task){
/// Adds a task to the queue. If you don't want to use a queue set "set_task_queue(false)"
pub fn add_task(&mut self, task: Task){
// Increase the unique task id
*self.task_id.lock().unwrap() += 1;
// Unset old task
self.clear_task();
// add new task to the queue
if *self.task_queue_enabled.lock().unwrap() {
self.task_queue.lock().unwrap().push(task);
}else{
self.clear_queue();
self.task_queue.lock().unwrap().push(task);
}
}
*self.next_task.lock().unwrap() = Some(task);
pub fn clear_queue(&mut self){
self.task_queue.lock().unwrap().clear();
self.sender.send(Message::Clear).unwrap();
}
fn clear_task(&mut self){
*self.next_task.lock().unwrap() = None;
/// Enabled: Tasks will get added to a queue, and every task gets processed
/// Disabled: A new tasks replaces the current one
pub fn set_task_queue(&mut self, queue: bool){
*self.task_queue_enabled.lock().unwrap() = queue;
}
/// Starts the task processing loop
pub fn start_loop(&mut self){
let next_task = self.next_task.clone();
let task_queue = self.task_queue.clone();
let task_id = self.task_id.clone();
let task_queue_enabled = self.task_queue_enabled.clone();
let base_url = self.base_url.clone();
let sender = self.sender.clone();
thread::spawn(move || {
loop{
// Check if a task is available
if(next_task.lock().unwrap().clone().is_some()){
if(!task_queue.lock().unwrap().is_empty()){
let mut sync_client = Client::new(&base_url);
// Clear data from previous task
sender.send(Message::Clear);
// Get the task, and unset "next_task"
let task: Task = next_task.lock().unwrap().clone().unwrap();
*next_task.lock().unwrap() = None;
// Get the next task to process
let task: Task = task_queue.lock().unwrap().pop().unwrap();
// Copy current task id
let working_id = task_id.lock().unwrap().clone();
......@@ -150,10 +163,10 @@ impl AsyncClient{
_ => (),
};
if (working_id == *task_id.lock().unwrap()){
if ( *task_queue_enabled.lock().unwrap() || working_id == *task_id.lock().unwrap()){
sender.send(result_message.unwrap());
}else {
debug!("Task id changed, so don't send result message");
debug!("Task id changed and queue is disabled, so don't send result message");
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment