1 module photon;
2 
3 import core.thread;
4 import core.lifetime;
5 import std.meta;
6 
7 import photon.ds.ring_queue;
8 
9 version(Windows) public import photon.windows.core;
10 else version(linux) public import photon.linux.core;
11 else version(freeBSD) public import photon.freebsd.core;
12 else version(OSX) public import photon.macos.core;
13 else static assert(false, "Target OS not supported by Photon yet!");
14 
15 /// Start sheduler and run fibers until all are terminated.
16 void runFibers()
17 {
18     Thread runThread(size_t n){ // damned D lexical capture "semantics"
19         auto t = new Thread(() => schedulerEntry(n));
20         t.start();
21         return t;
22     }
23     Thread[] threads = new Thread[scheds.length-1];
24     foreach (i; 0..threads.length){
25         threads[i] = runThread(i+1);
26     }
27     schedulerEntry(0);
28     foreach (t; threads)
29         t.join();
30     version(linux) stoploop();
31 }
32 
33 struct Channel(T) {
34     __gshared RingQueue!(T, Event)* buf;
35     __gshared T item;
36     bool loaded;
37 
38     this(size_t capacity) {
39         buf = allocRingQueue!T(capacity, event(false), event(false));
40     }
41 
42     this(this) {
43         buf.retain();
44     }
45 
46     void put(T value) {
47         buf.push(move(value));
48     }
49 
50     void put(T value) shared {
51         buf.push(move(value));
52     }
53 
54     void close() shared {
55         buf.close();
56     }
57 
58     bool empty() {
59         if (loaded) return false;
60         loaded = buf.tryPop(item);
61         return !loaded;
62     }
63 
64     bool empty() shared {
65         if (loaded) return false;
66         loaded = buf.tryPop(item);
67         return !loaded;
68     }
69 
70     ref T front() {
71         return cast()item;
72     }
73 
74     ref T front() shared {
75         return item;
76     }
77 
78     void popFront() {
79         loaded = false;
80     }
81 
82     void popFront() shared {
83         loaded = false;
84     }
85 
86     ~this() {
87         if (buf) {
88             if (buf.release) {
89                 disposeRingQueue(buf);
90                 buf = null;
91             }
92         }
93     }
94 }
95 
96 /++
97     Create ref-counted channel that is safe to shared between multiple fibers.
98     In essence it's a multiple producer single consumer queue, that implements
99     `OutputRange` and `InputRange` concepts.
100 +/
101 auto channel(T)(size_t capacity = 1) {
102     return cast(shared)Channel!T(capacity);
103 }
104 
105 unittest {
106     import std.range.primitives, std.traits;
107     import std.algorithm;
108     static assert(isInputRange!(Channel!int));
109     static assert(isInputRange!(Unqual!(shared Channel!int)));
110     static assert(isOutputRange!(shared Channel!int, int));
111     //
112     auto ch = channel!int(10);
113     foreach (i; 0..10){
114         ch.put(i);
115     }
116     ch.close();
117     auto sum = ch.sum;
118     assert(sum == 45);
119 }
120 
121 
122 
123 /++
124     Multiplex between multiple channels, executes a lambda attached to the first
125     channel that becomes ready to read.
126 +/
127 void select(Args...)(auto ref Args args)
128 if (allSatisfy!(isChannel, Even!Args) && allSatisfy!(isHandler, Odd!Args)) {
129     void delegate()[args.length/2] handlers = void;
130     Event*[args.length/2] events = void;
131     static foreach (i, v; args) {
132         static if(i % 2 == 0) {
133             events[i/2] = &v.buf.rtr;
134         }
135         else {
136             handlers[i/2] = v;
137         }
138     }
139     for (;;) {
140         auto n = awaitAny(events[]);
141     L_dispatch:
142         switch(n) {
143             static foreach (i, channel; Even!(args)) {
144                 case i:
145                     if (channel.buf.readyToRead())
146                         return handlers[n]();
147                     break L_dispatch;
148             }
149             default:
150                 assert(0);
151         }
152     }
153 }
154 
155 /// Trait for testing if a type is Channel
156 enum isChannel(T) = is(T == Channel!(V), V);
157 
158 enum isHandler(T) = is(T == void delegate());
159 
160 private template Even(T...) {
161     static assert(T.length % 2 == 0);
162     static if (T.length > 0) {
163         alias Even = AliasSeq!(T[0], Even!(T[2..$]));
164     }
165     else {
166         alias Even = AliasSeq!();
167     }
168 }
169 
170 private template Odd(T...) {
171     static assert(T.length % 2 == 0);
172     static if (T.length > 0) {
173         alias Odd = AliasSeq!(T[1], Odd!(T[2..$]));
174     }
175     else {
176         alias Odd = AliasSeq!();
177     }
178 }
179 
180 unittest {
181     static assert(Even!(1, 2, 3, 4) == AliasSeq!(1, 3));
182     static assert(Odd!(1, 2, 3, 4) == AliasSeq!(2, 4));
183     static assert(isChannel!(Channel!int));
184     static assert(isChannel!(shared Channel!int));
185 }