1 module photon; 2 3 import core.thread; 4 import core.lifetime; 5 import std.meta; 6 7 import photon.ds.ring_queue; 8 9 version(Windows) public import photon.windows.core; 10 else version(linux) public import photon.linux.core; 11 else version(freeBSD) public import photon.freebsd.core; 12 else version(OSX) public import photon.macos.core; 13 else static assert(false, "Target OS not supported by Photon yet!"); 14 15 /// Start sheduler and run fibers until all are terminated. 16 void runFibers() 17 { 18 Thread runThread(size_t n){ // damned D lexical capture "semantics" 19 auto t = new Thread(() => schedulerEntry(n)); 20 t.start(); 21 return t; 22 } 23 Thread[] threads = new Thread[scheds.length-1]; 24 foreach (i; 0..threads.length){ 25 threads[i] = runThread(i+1); 26 } 27 schedulerEntry(0); 28 foreach (t; threads) 29 t.join(); 30 version(linux) stoploop(); 31 } 32 33 struct Channel(T) { 34 __gshared RingQueue!(T, Event)* buf; 35 __gshared T item; 36 bool loaded; 37 38 this(size_t capacity) { 39 buf = allocRingQueue!T(capacity, event(false), event(false)); 40 } 41 42 this(this) { 43 buf.retain(); 44 } 45 46 void put(T value) { 47 buf.push(move(value)); 48 } 49 50 void put(T value) shared { 51 buf.push(move(value)); 52 } 53 54 void close() shared { 55 buf.close(); 56 } 57 58 bool empty() { 59 if (loaded) return false; 60 loaded = buf.tryPop(item); 61 return !loaded; 62 } 63 64 bool empty() shared { 65 if (loaded) return false; 66 loaded = buf.tryPop(item); 67 return !loaded; 68 } 69 70 ref T front() { 71 return cast()item; 72 } 73 74 ref T front() shared { 75 return item; 76 } 77 78 void popFront() { 79 loaded = false; 80 } 81 82 void popFront() shared { 83 loaded = false; 84 } 85 86 ~this() { 87 if (buf) { 88 if (buf.release) { 89 disposeRingQueue(buf); 90 buf = null; 91 } 92 } 93 } 94 } 95 96 /++ 97 Create ref-counted channel that is safe to shared between multiple fibers. 98 In essence it's a multiple producer single consumer queue, that implements 99 `OutputRange` and `InputRange` concepts. 100 +/ 101 auto channel(T)(size_t capacity = 1) { 102 return cast(shared)Channel!T(capacity); 103 } 104 105 unittest { 106 import std.range.primitives, std.traits; 107 import std.algorithm; 108 static assert(isInputRange!(Channel!int)); 109 static assert(isInputRange!(Unqual!(shared Channel!int))); 110 static assert(isOutputRange!(shared Channel!int, int)); 111 // 112 auto ch = channel!int(10); 113 foreach (i; 0..10){ 114 ch.put(i); 115 } 116 ch.close(); 117 auto sum = ch.sum; 118 assert(sum == 45); 119 } 120 121 122 123 /++ 124 Multiplex between multiple channels, executes a lambda attached to the first 125 channel that becomes ready to read. 126 +/ 127 void select(Args...)(auto ref Args args) 128 if (allSatisfy!(isChannel, Even!Args) && allSatisfy!(isHandler, Odd!Args)) { 129 void delegate()[args.length/2] handlers = void; 130 Event*[args.length/2] events = void; 131 static foreach (i, v; args) { 132 static if(i % 2 == 0) { 133 events[i/2] = &v.buf.rtr; 134 } 135 else { 136 handlers[i/2] = v; 137 } 138 } 139 for (;;) { 140 auto n = awaitAny(events[]); 141 L_dispatch: 142 switch(n) { 143 static foreach (i, channel; Even!(args)) { 144 case i: 145 if (channel.buf.readyToRead()) 146 return handlers[n](); 147 break L_dispatch; 148 } 149 default: 150 assert(0); 151 } 152 } 153 } 154 155 /// Trait for testing if a type is Channel 156 enum isChannel(T) = is(T == Channel!(V), V); 157 158 enum isHandler(T) = is(T == void delegate()); 159 160 private template Even(T...) { 161 static assert(T.length % 2 == 0); 162 static if (T.length > 0) { 163 alias Even = AliasSeq!(T[0], Even!(T[2..$])); 164 } 165 else { 166 alias Even = AliasSeq!(); 167 } 168 } 169 170 private template Odd(T...) { 171 static assert(T.length % 2 == 0); 172 static if (T.length > 0) { 173 alias Odd = AliasSeq!(T[1], Odd!(T[2..$])); 174 } 175 else { 176 alias Odd = AliasSeq!(); 177 } 178 } 179 180 unittest { 181 static assert(Even!(1, 2, 3, 4) == AliasSeq!(1, 3)); 182 static assert(Odd!(1, 2, 3, 4) == AliasSeq!(2, 4)); 183 static assert(isChannel!(Channel!int)); 184 static assert(isChannel!(shared Channel!int)); 185 }