1 module photon.ds.ring_queue; 2 3 import core.atomic; 4 import core.internal.spinlock; 5 import core.stdc.stdlib; 6 import core.lifetime; 7 8 import photon.exceptions; 9 10 11 struct RingQueue(T, Event) 12 { 13 T* store; 14 size_t length; 15 size_t fetch, insert, size; 16 Event cts, rtr; // clear to send, ready to recieve 17 bool closed; 18 shared size_t refCount; 19 AlignedSpinLock lock; 20 21 this(size_t capacity, Event cts, Event rtr) 22 { 23 store = cast(T*)malloc(T.sizeof * capacity); 24 length = capacity; 25 size = 0; 26 fetch = insert = 0; 27 this.cts = move(cts); 28 this.rtr = move(rtr); 29 closed = false; 30 refCount = 1; 31 lock = AlignedSpinLock(SpinLock.Contention.brief); 32 } 33 34 void push(T ctx) 35 { 36 lock.lock(); 37 while (size == length) { 38 lock.unlock(); 39 cts.waitAndReset(); 40 lock.lock(); 41 } 42 if (closed) { 43 lock.unlock(); 44 throw new ChannelClosed(); 45 } 46 bool notify = false; 47 move(ctx, store[insert++]); 48 if (insert == length) insert = 0; 49 if (size == 0) notify = true; 50 size += 1; 51 lock.unlock(); 52 if (notify) rtr.trigger(); 53 } 54 55 bool tryPop(out T output) 56 { 57 lock.lock(); 58 while (size == 0 && !closed) { 59 lock.unlock(); 60 rtr.waitAndReset(); 61 lock.lock(); 62 } 63 if (size == 0 && closed) { 64 lock.unlock(); 65 return false; 66 } 67 bool notify = false; 68 move(store[fetch++], output); 69 if (fetch == length) fetch = 0; 70 if (size == length) notify = true; 71 size -= 1; 72 lock.unlock(); 73 if (notify) cts.trigger(); 74 return true; 75 } 76 77 bool readyToRead() { 78 lock.lock(); 79 scope(exit) lock.unlock(); 80 return size > 0; 81 } 82 83 bool empty() { 84 lock.lock(); 85 scope(exit) lock.unlock(); 86 if (closed && size == 0) return true; 87 return false; 88 } 89 90 void close() { 91 lock.lock(); 92 closed = true; 93 lock.unlock(); 94 cts.trigger(); 95 rtr.trigger(); 96 } 97 98 void retain() { 99 auto cnt = atomicFetchAdd(refCount, 1); 100 assert(cnt != 0); 101 } 102 103 // true if time to release 104 bool release() { 105 auto cnt = atomicFetchSub(refCount, 1); 106 if (cnt == 1) return true; 107 return false; 108 } 109 } 110 111 auto allocRingQueue(T, Event)(size_t capacity, Event cts, Event rtr){ 112 alias Q = RingQueue!(T,Event); 113 auto ptr = cast(Q*)malloc(Q.sizeof); 114 emplace(ptr, capacity, move(cts), move(rtr)); 115 return ptr; 116 } 117 118 void disposeRingQueue(T, Event)(RingQueue!(T, Event)* q) { 119 free(q.store); 120 free(q); 121 } 122 123 unittest { 124 import photon; 125 import std.exception; 126 auto cts = event(false); 127 auto rtr = event(false); 128 auto q = allocRingQueue!(int, Event)(2, move(cts), move(rtr)); 129 q.push(1); 130 q.push(2); 131 int result; 132 assert(q.tryPop(result) && result == 1); 133 assert(q.tryPop(result) && result == 2); 134 q.retain(); 135 assert(!q.release()); 136 assert(q.release()); 137 q.close(); 138 assert(!q.tryPop(result)); 139 assertThrown!ChannelClosed(q.push(3)); 140 disposeRingQueue(q); 141 } 142 143 unittest { 144 import photon; 145 auto cts = event(false); 146 auto rtr = event(false); 147 auto q = allocRingQueue!(string, Event)(1, move(cts), move(rtr)); 148 assert(!q.empty); 149 q.push("hello"); 150 string result; 151 assert(q.tryPop(result) && result == "hello"); 152 assert(!q.empty); 153 q.push("world"); 154 q.close(); 155 assert(q.tryPop(result) && result == "world"); 156 assert(!q.tryPop(result)); 157 assert(q.empty); 158 }