1 module photon.ds.ring_queue;
2 
3 import core.atomic;
4 import core.internal.spinlock;
5 import core.stdc.stdlib;
6 import core.lifetime;
7 
8 import photon.exceptions;
9 
10 
11 struct RingQueue(T, Event)
12 {
13     T* store;
14     size_t length;
15     size_t fetch, insert, size;
16     Event cts, rtr; // clear to send, ready to recieve
17     bool closed;
18     shared size_t refCount;
19     AlignedSpinLock lock;
20     
21     this(size_t capacity, Event cts, Event rtr)
22     {
23         store = cast(T*)malloc(T.sizeof * capacity);
24         length = capacity;
25         size = 0;
26         fetch = insert = 0;
27         this.cts = move(cts);
28         this.rtr = move(rtr);
29         closed = false;
30         refCount = 1;
31         lock = AlignedSpinLock(SpinLock.Contention.brief);
32     }
33 
34     void push(T ctx)
35     {
36         lock.lock();
37         while (size == length) {
38             lock.unlock();
39             cts.waitAndReset();
40             lock.lock();
41         }
42         if (closed) {
43             lock.unlock();
44             throw new ChannelClosed();
45         }
46         bool notify = false;
47         move(ctx, store[insert++]);
48         if (insert == length) insert = 0;
49         if (size == 0) notify = true;
50         size += 1;
51         lock.unlock();
52         if (notify) rtr.trigger();
53     }
54 
55     bool tryPop(out T output)
56     {
57         lock.lock();
58         while (size == 0 && !closed) {
59             lock.unlock();
60             rtr.waitAndReset();
61             lock.lock();
62         }
63         if (size == 0 && closed) {
64             lock.unlock();
65             return false;
66         }
67         bool notify = false;
68         move(store[fetch++], output);
69         if (fetch == length) fetch = 0;
70         if (size == length) notify = true;
71         size -= 1;
72         lock.unlock();
73         if (notify) cts.trigger();
74         return true;
75     }
76 
77     bool readyToRead() {
78         lock.lock();
79         scope(exit) lock.unlock();
80         return size > 0;
81     }
82 
83     bool empty() {
84         lock.lock();
85         scope(exit) lock.unlock();
86         if (closed && size == 0) return true;
87         return false;
88     }
89 
90     void close() {
91         lock.lock();
92         closed = true;
93         lock.unlock();
94         cts.trigger();
95         rtr.trigger();
96     }
97 
98     void retain() {
99         auto cnt = atomicFetchAdd(refCount, 1);
100         assert(cnt != 0);
101     }
102 
103     // true if time to release
104     bool release() {
105         auto cnt = atomicFetchSub(refCount, 1);
106         if (cnt == 1) return true;
107         return false;
108     }
109 }
110 
111 auto allocRingQueue(T, Event)(size_t capacity, Event cts, Event rtr){
112     alias Q = RingQueue!(T,Event);
113     auto ptr = cast(Q*)malloc(Q.sizeof);
114     emplace(ptr, capacity, move(cts), move(rtr));
115     return ptr;
116 }
117 
118 void disposeRingQueue(T, Event)(RingQueue!(T, Event)* q) {
119     free(q.store);
120     free(q);
121 }
122 
123 unittest {
124     import photon;
125     import std.exception;
126     auto cts = event(false);
127     auto rtr = event(false);
128     auto q = allocRingQueue!(int, Event)(2, move(cts), move(rtr));
129     q.push(1);
130     q.push(2);
131     int result;
132     assert(q.tryPop(result) && result == 1);
133     assert(q.tryPop(result) && result == 2);
134     q.retain();
135     assert(!q.release());
136     assert(q.release());
137     q.close();
138     assert(!q.tryPop(result));
139     assertThrown!ChannelClosed(q.push(3));
140     disposeRingQueue(q);
141 }
142 
143 unittest {
144     import photon;
145     auto cts = event(false);
146     auto rtr = event(false);
147     auto q = allocRingQueue!(string, Event)(1, move(cts), move(rtr));
148     assert(!q.empty);
149     q.push("hello");
150     string result;
151     assert(q.tryPop(result) && result == "hello");
152     assert(!q.empty);
153     q.push("world");
154     q.close();
155     assert(q.tryPop(result) && result == "world");
156     assert(!q.tryPop(result));
157     assert(q.empty);
158 }