1 module photon.ds.intrusive_queue; 2 3 import photon.ds.common; 4 import core.internal.spinlock; 5 6 shared struct IntrusiveQueue(T, Event) 7 if (is(T : Object)) { 8 private: 9 SpinLock lock = SpinLock(SpinLock.Contention.brief); 10 T head; 11 T tail; 12 bool exhausted = true; 13 public: 14 Event event; 15 16 this(Event ev) { 17 event = ev; 18 } 19 20 void push(T item) { 21 item.next = null; 22 lock.lock(); 23 if (tail is null) { 24 head = tail = cast(shared)item; 25 bool shouldTrigger = exhausted; 26 exhausted = false; 27 lock.unlock(); 28 if (shouldTrigger) event.trigger(); 29 } 30 else { 31 tail.next = cast(shared)item; 32 tail = cast(shared)item; 33 lock.unlock(); 34 } 35 } 36 37 bool tryPop(ref T item) nothrow { 38 lock.lock(); 39 if (!head) { 40 exhausted = true; 41 lock.unlock(); 42 return false; 43 } 44 else { 45 item = head.unshared; 46 head = head.next; 47 if (head is null) tail = null; 48 lock.unlock(); 49 return true; 50 } 51 } 52 53 // drain the whole queue in one go 54 T drain() nothrow { 55 lock.lock(); 56 if (head is null) { 57 exhausted = true; 58 lock.unlock(); 59 return null; 60 } 61 else { 62 auto r = head.unshared; 63 head = tail = null; 64 lock.unlock(); 65 return r; 66 } 67 } 68 } 69 70 unittest { 71 static class Box(T) { 72 Box next; 73 T item; 74 this(T k) { 75 item = k; 76 } 77 } 78 79 static struct EmptyEvent { 80 shared nothrow void trigger(){} 81 } 82 shared q = IntrusiveQueue!(Box!int, EmptyEvent)(); 83 q.push(new Box!int(1)); 84 q.push(new Box!int(2)); 85 q.push(new Box!int(3)); 86 Box!int ret; 87 q.tryPop(ret); 88 assert(ret.item == 1); 89 q.tryPop(ret); 90 assert(ret.item == 2); 91 92 q.push(new Box!int(4)); 93 q.tryPop(ret); 94 assert(ret.item == 3); 95 q.tryPop(ret); 96 assert(ret.item == 4); 97 q.push(new Box!int(5)); 98 99 q.tryPop(ret); 100 assert(ret.item == 5); 101 assert(q.tryPop(ret) == false); 102 }