1 module photon.ds.blocking_queue; 2 3 import photon.ds.common; 4 import core.sync.condition; 5 import std.container; 6 7 shared class BlockingQueue(T) : WorkQueue!T { 8 private shared Condition cond; 9 private shared DList!T queue; 10 11 this() { 12 cond = cast(shared)(new Condition(new Mutex)); 13 } 14 15 void push(T item) { 16 cond.unshared.mutex.lock(); 17 scope(exit) cond.unshared.mutex.unlock(); 18 queue.unshared.insertBack(item); 19 cond.unshared.notify(); 20 } 21 22 T pop() { 23 cond.unshared.mutex.lock(); 24 scope(exit) cond.unshared.mutex.unlock(); 25 while(queue.unshared.empty()) 26 cond.unshared.wait(); 27 T tmp = queue.unshared.front; 28 queue.unshared.removeFront(); 29 return tmp; 30 } 31 32 bool tryPop(ref T item) { 33 cond.unshared.mutex.lock(); 34 scope(exit) cond.unshared.mutex.unlock(); 35 if (queue.unshared.empty) { 36 return false; 37 } 38 item = queue.unshared.front; 39 queue.unshared.removeFront(); 40 return true; 41 } 42 43 @property bool empty() { 44 cond.unshared.mutex.lock(); 45 scope(exit) cond.unshared.mutex.unlock(); 46 return queue.unshared.empty; 47 } 48 } 49 50 unittest 51 { 52 shared BlockingQueue!int bq = new shared BlockingQueue!int; 53 assert(bq.empty == true); 54 bq.push(3); 55 assert(bq.pop == 3); 56 bq.push(2); 57 bq.push(1); 58 bq.push(0); 59 bq.push(-5); 60 assert(bq.pop == 2); 61 int i; 62 assert(bq.tryPop(i) == true); 63 assert(i == 1); 64 while(!bq.empty) 65 bq.pop(); 66 assert(bq.empty == true); 67 } 68 69 unittest 70 { 71 import core.thread; 72 shared BlockingQueue!int q = new shared BlockingQueue!int; 73 void producer() { 74 foreach (v; 0..100) { 75 q.push(v); 76 } 77 } 78 void consumer() { 79 foreach (v; 0..100) { 80 assert(q.pop == v); 81 } 82 } 83 auto prod = new Thread(&producer); 84 auto cons = new Thread(&consumer); 85 cons.start(); 86 prod.start(); 87 prod.join(); 88 cons.join(); 89 }