TimedQueue.vala 8.1 KB
Newer Older
1
/* Copyright 2016 Software Freedom Conservancy Inc.
2 3
 *
 * This software is licensed under the GNU Lesser General Public License
4
 * (version 2.1 or later).  See the COPYING file in this distribution.
5 6
 */

7
// TimedQueue is a specialized collection class.  It holds items in order, but rather than being
8 9 10 11 12 13 14 15
// manually dequeued, they are dequeued automatically after a specified amount of time has elapsed
// for that item.  As of today, it's possible the item will be dequeued a bit later than asked
// for, but it will never be early.  Future implementations might tighten up the lateness.
//
// The original design was to use a signal to notify when an item has been dequeued, but Vala has
// a bug with passing an unnamed type as a signal parameter:
// https://bugzilla.gnome.org/show_bug.cgi?id=628639
//
16 17
// The rate the items come off the queue can be spaced out.  Note that this can cause items to back
// up.  As of today, TimedQueue makes no effort to combat this.
18 19 20

public delegate void DequeuedCallback<G>(G item);

21
public class TimedQueue<G> {
22 23
    private class Element<G> {
        public G item;
24
        public ulong ready;
25
        
26
        public Element(G item, ulong ready) {
27 28 29 30 31 32 33 34 35 36
            this.item = item;
            this.ready = ready;
        }
        
        public static int64 comparator(void *a, void *b) {
            return (int64) ((Element *) a)->ready - (int64) ((Element *) b)->ready;
        }
    }
    
    private uint hold_msec;
37
    private unowned DequeuedCallback<G> callback;
38
    private Gee.EqualDataFunc<G> equal_func;
39 40 41 42
    private int priority;
    private uint timer_id = 0;
    private SortedList<Element<G>> queue;
    private uint dequeue_spacing_msec = 0;
43
    private ulong last_dequeue = 0;
44 45 46 47
    private bool paused_state = false;
    
    public virtual signal void paused(bool is_paused) {
    }
48 49 50 51
    
    // Initial design was to have a signal that passed the dequeued G, but bug in valac meant
    // finding a workaround, namely using a delegate:
    // https://bugzilla.gnome.org/show_bug.cgi?id=628639
52
    public TimedQueue(uint hold_msec, DequeuedCallback<G> callback,
53
        owned Gee.EqualDataFunc? equal_func = null, int priority = Priority.DEFAULT) {
54 55
        this.hold_msec = hold_msec;
        this.callback = callback;
56 57 58 59 60 61
        
        if (equal_func != null)
            this.equal_func = (owned) equal_func;
        else
            this.equal_func = (Gee.EqualDataFunc<G>) (Gee.Functions.get_equal_func_for(typeof(G)));
            
62 63 64 65 66 67 68
        this.priority = priority;
        
        queue = new SortedList<Element<G>>(Element.comparator);
        
        timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority);
    }
    
69
    ~TimedQueue() {
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
        if (timer_id != 0)
            Source.remove(timer_id);
    }
    
    public uint get_dequeue_spacing_msec() {
        return dequeue_spacing_msec;
    }
    
    public void set_dequeue_spacing_msec(uint msec) {
        if (msec == dequeue_spacing_msec)
            return;
        
        if (timer_id != 0)
            Source.remove(timer_id);
        
        dequeue_spacing_msec = msec;
        
        timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority);
    }
    
    private uint get_heartbeat_timeout() {
        return ((dequeue_spacing_msec == 0)
            ? (hold_msec / 10) 
            : (dequeue_spacing_msec / 2)).clamp(10, uint.MAX);
    }
    
    protected virtual void notify_dequeued(G item) {
        callback(item);
    }
    
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
    public bool is_paused() {
        return paused_state;
    }
    
    public void pause() {
        if (paused_state)
            return;
        
        paused_state = true;
        
        paused(true);
    }
    
    public void unpause() {
        if (!paused_state)
            return;
        
        paused_state = false;
        
        paused(false);
    }
    
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
    public virtual void clear() {
        queue.clear();
    }
    
    public virtual bool contains(G item) {
        foreach (Element<G> e in queue) {
            if (equal_func(item, e.item))
                return true;
        }
        
        return false;
    }
    
    public virtual bool enqueue(G item) {
        return queue.add(new Element<G>(item, calc_ready_time()));
    }
    
139
    public virtual bool enqueue_many(Gee.Collection<G> items) {
140
        ulong ready_time = calc_ready_time();
141 142 143 144 145 146 147 148
        
        Gee.ArrayList<Element<G>> elements = new Gee.ArrayList<Element<G>>();
        foreach (G item in items)
            elements.add(new Element<G>(item, ready_time));
        
        return queue.add_list(elements);
    }
    
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
    public virtual bool remove_first(G item) {
        Gee.Iterator<Element<G>> iter = queue.iterator();
        while (iter.next()) {
            Element<G> e = iter.get();
            if (equal_func(item, e.item)) {
                iter.remove();
                
                return true;
            }
        }
        
        return false;
    }
    
    public virtual int size {
        get {
            return queue.size;
        }
    }
    
169 170
    private ulong calc_ready_time() {
        return now_ms() + (ulong) hold_msec;
171 172 173
    }
    
    private bool on_heartbeat() {
174 175 176
        if (paused_state)
            return true;
        
177
        ulong now = 0;
178 179 180 181 182 183 184 185 186
        
        for (;;) {
            if (queue.size == 0)
                break;
            
            Element<G>? head = queue.get_at(0);
            assert(head != null);
            
            if (now == 0)
187
                now = now_ms();
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
            
            if (head.ready > now)
                break;
            
            // if a space of time is required between dequeues, check now
            if ((dequeue_spacing_msec != 0) && ((now - last_dequeue) < dequeue_spacing_msec))
                break;
            
            Element<G>? h = queue.remove_at(0);
            assert(head == h);
            
            notify_dequeued(head.item);
            last_dequeue = now;
            
            // if a dequeue spacing is in place, it's a lock that only one item is dequeued per
            // heartbeat
            if (dequeue_spacing_msec != 0)
                break;
        }
        
        return true;
    }
}

212
// HashTimedQueue uses a HashMap for quick lookups of elements via contains().
213

214
public class HashTimedQueue<G> : TimedQueue<G> {
215 216
    private Gee.HashMap<G, int> item_count;
    
217
    public HashTimedQueue(uint hold_msec, DequeuedCallback<G> callback,
218
        owned Gee.HashDataFunc<G>? hash_func = null, owned Gee.EqualDataFunc<G>? equal_func = null,
219
        int priority = Priority.DEFAULT) {
220
        base (hold_msec, callback, (owned) equal_func, priority);
221
        
222
        item_count = new Gee.HashMap<G, int>((owned) hash_func, (owned) equal_func);
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
    }
    
    protected override void notify_dequeued(G item) {
        removed(item);
        
        base.notify_dequeued(item);
    }
    
    public override void clear() {
        item_count.clear();
        
        base.clear();
    }
    
    public override bool contains(G item) {
        return item_count.has_key(item);
    }
    
    public override bool enqueue(G item) {
        if (!base.enqueue(item))
            return false;
        
        item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
        
        return true;
    }
    
250 251 252 253 254 255 256 257 258 259
    public override bool enqueue_many(Gee.Collection<G> items) {
        if (!base.enqueue_many(items))
            return false;
        
        foreach (G item in items)
            item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
        
        return true;
    }
    
260 261 262 263 264 265 266 267 268 269
    public override bool remove_first(G item) {
        if (!base.remove_first(item))
            return false;
        
        removed(item);
        
        return true;
    }
    
    private void removed(G item) {
270 271 272 273
        // item in question is either already removed
        // or was never added, safe to do nothing here
        if (!item_count.has_key(item))
            return;
274 275 276 277 278 279 280 281 282 283 284
        
        int count = item_count.get(item);
        assert(count > 0);
        
        if (--count == 0)
            item_count.unset(item);
        else
            item_count.set(item, count);
    }
}