1 module photon.ds.blocking_queue;
2 
3 import photon.ds.common;
4 import core.sync.condition;
5 import std.container;
6 
7 shared class BlockingQueue(T) : WorkQueue!T {
8     private shared Condition cond;
9     private shared DList!T queue;
10 
11     this() {
12         cond = cast(shared)(new Condition(new Mutex));
13     }
14 
15     void push(T item) {
16         cond.unshared.mutex.lock();
17         scope(exit) cond.unshared.mutex.unlock();
18         queue.unshared.insertBack(item);
19         cond.unshared.notify();
20     }
21 
22     T pop() {
23         cond.unshared.mutex.lock();
24         scope(exit) cond.unshared.mutex.unlock();
25         while(queue.unshared.empty())
26             cond.unshared.wait();
27         T tmp = queue.unshared.front;
28         queue.unshared.removeFront();
29         return tmp;
30     }
31 
32     bool tryPop(ref T item) {
33         cond.unshared.mutex.lock();
34         scope(exit) cond.unshared.mutex.unlock();
35         if (queue.unshared.empty) {
36             return false;
37         }
38         item = queue.unshared.front;
39         queue.unshared.removeFront();
40         return true;
41     }
42 
43     @property bool empty() {
44         cond.unshared.mutex.lock();
45         scope(exit) cond.unshared.mutex.unlock();
46         return queue.unshared.empty;
47     }
48 }
49 
50 unittest
51 {
52     shared BlockingQueue!int bq = new shared BlockingQueue!int;
53     assert(bq.empty == true);
54     bq.push(3);
55     assert(bq.pop == 3);
56     bq.push(2);
57     bq.push(1);
58     bq.push(0);
59     bq.push(-5);
60     assert(bq.pop == 2);
61     int i;
62     assert(bq.tryPop(i) == true);
63     assert(i == 1);
64     while(!bq.empty)
65         bq.pop();
66     assert(bq.empty == true);
67 }
68 
69 unittest
70 {
71     import core.thread;
72     shared BlockingQueue!int q = new shared BlockingQueue!int;
73     void producer() {
74         foreach (v; 0..100) {
75             q.push(v);
76         }
77     }
78     void consumer() {
79         foreach (v; 0..100) {
80             assert(q.pop == v);
81         }
82     }
83     auto prod = new Thread(&producer);
84     auto cons = new Thread(&consumer);
85     cons.start();
86     prod.start();
87     prod.join();
88     cons.join();
89 }