1 module photon.windows.core;
2 version(Windows):
3 private:
4 
5 import core.sys.windows.core;
6 import core.sys.windows.winsock2;
7 import core.atomic;
8 import core.thread;
9 import core.internal.spinlock;
10 import std.exception;
11 import std.windows.syserror;
12 import core.stdc.stdlib;
13 import std.random;
14 import std.stdio;
15 import std.traits;
16 import std.meta;
17 
18 import rewind.map;
19 
20 import photon.ds.common;
21 import photon.ds.intrusive_queue;
22 import photon.windows.support;
23 
24 shared struct RawEvent {
25 nothrow:
26     this(bool signaled) {
27         ev = cast(shared(HANDLE))CreateEventA(null, FALSE, signaled, null);
28         assert(ev != null, "Failed to create RawEvent");
29     }
30 
31     void waitAndReset() {
32         auto ret = WaitForSingleObject(cast(HANDLE)ev, INFINITE);
33         assert(ret == WAIT_OBJECT_0, "Failed while waiting on event");
34     }
35     
36     void trigger() { 
37         auto ret = SetEvent(cast(HANDLE)ev);
38         assert(ret != 0);
39     }
40     
41     HANDLE ev;
42 }
43 
44 struct MultiAwait
45 {
46     int n;
47     void delegate() trigger; 
48     MultiAwaitBox* box;
49 }
50 
51 struct MultiAwaitBox {
52     shared size_t refCount;
53     shared FiberExt fiber;
54 }
55 
56 extern(Windows) VOID waitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WAIT  Wait, TP_WAIT_RESULT WaitResult) {
57     auto fiber = cast(FiberExt)Context;
58     fiber.schedule();
59 }
60 
61 
62 extern(Windows) VOID waitAnyCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WAIT  Wait, TP_WAIT_RESULT WaitResult) {
63     auto await = cast(MultiAwait*)Context;
64     auto fiber = cast()steal(await.box.fiber);
65     if (fiber) {
66         logf("AwaitAny callback waking up on %d object", await.n);
67         fiber.wakeUpObject = await.n;
68         fiber.schedule();
69     }
70     else {
71         logf("AwaitAny callback - triggering awaitable again");
72         await.trigger();
73     }
74     auto cnt = atomicFetchSub(await.box.refCount, 1);
75     if (cnt == 1) free(await.box);    
76     free(await);
77     CloseThreadpoolWait(Wait);
78 }
79 
80 /// Event object
81 public struct Event {
82 
83     @disable this(this);
84 
85     this(bool signaled) {
86         ev = cast(HANDLE)CreateEventA(null, FALSE, signaled, null);
87         assert(ev != null, "Failed to create Event");
88     }
89 
90     /// Wait for the event to be triggered, then reset and return atomically
91     void waitAndReset() {
92         auto wait = CreateThreadpoolWait(&waitCallback, cast(void*)currentFiber, &environ);
93         wenforce(wait != null, "Failed to create threadpool wait object");
94         SetThreadpoolWait(wait, cast(HANDLE)ev, null);
95         FiberExt.yield();
96         CloseThreadpoolWait(wait);
97     }
98 
99     ///
100     void waitAndReset() shared {
101         this.unshared.waitAndReset();
102     }
103 
104     private void registerForWaitAny(int n, MultiAwaitBox* box) {
105         auto context = cast(MultiAwait*)calloc(1, MultiAwait.sizeof);
106         context.box = box;
107         context.n = n;
108         context.trigger = cast(void delegate())&this.trigger;
109         auto wait = CreateThreadpoolWait(&waitAnyCallback, cast(void*)context, &environ);
110         wenforce(wait != null, "Failed to create threadpool wait object");
111         SetThreadpoolWait(wait, cast(HANDLE)ev, null);
112     }
113 
114     private void registerForWaitAny(int n, MultiAwaitBox* box) shared {
115         this.unshared.registerForWaitAny(n, box);
116     }
117     
118     /// Trigger the event.
119     void trigger() { 
120         auto ret = SetEvent(cast(HANDLE)ev);
121         assert(ret != 0);
122     }
123 
124     void trigger() shared {
125         this.unshared.trigger();
126     }
127     
128 private:
129     HANDLE ev;
130 }
131 
132 ///
133 public Event event(bool signaled) {
134     return Event(signaled);
135 }
136 
137 /// Semaphore object
138 public struct Semaphore {
139     @disable this(this);
140 
141     this(int count) {
142         // set max count to MacOS pipe limit
143         sem = cast(HANDLE)CreateSemaphoreA(null, count, 4096, null);
144         assert(sem != null, "Failed to create semaphore");
145     }
146 
147     this(int count) shared {
148         // set max count to MacOS pipe limit
149         sem = cast(shared(HANDLE))CreateSemaphoreA(null, count, 4096, null);
150         assert(sem != null, "Failed to create semaphore");
151     }
152     
153     /// 
154     void wait() {
155         auto wait = CreateThreadpoolWait(&waitCallback, cast(void*)currentFiber, &environ);
156         wenforce(wait != null, "Failed to create threadpool wait object");
157         SetThreadpoolWait(wait, cast(HANDLE)sem, null);
158         FiberExt.yield();
159         CloseThreadpoolWait(wait);
160     }
161 
162     void wait() shared {
163         this.unshared.wait();
164     }
165 
166     private void registerForWaitAny(int n, MultiAwaitBox* box) {
167         auto context = cast(MultiAwait*)calloc(1, MultiAwait.sizeof);
168         context.box = box;
169         context.n = n;
170         context.trigger = { this.trigger(1); };
171         auto wait = CreateThreadpoolWait(&waitAnyCallback, cast(void*)context, &environ);
172         wenforce(wait != null, "Failed to create threadpool wait object");
173         SetThreadpoolWait(wait, cast(HANDLE)sem, null);
174     }
175 
176     private void registerForWaitAny(int n, MultiAwaitBox* box) shared {
177         this.unshared.registerForWaitAny(n, box);
178     }
179 
180     
181     /// 
182     void trigger(int count) { 
183         auto ret = ReleaseSemaphore(cast(HANDLE)sem, count, null);
184         assert(ret);
185     }
186 
187     ///
188     void trigger(int count) shared {
189         this.unshared.trigger(count);
190     }
191 
192     /// 
193     void dispose() {
194         CloseHandle(cast(HANDLE)sem);
195     }
196 
197     ///
198     void dispose() shared {
199         this.unshared.dispose();
200     }
201     
202 private:
203     HANDLE sem;
204 }
205 
206 ///
207 public auto semaphore(int count) {
208     return Semaphore(count);
209 }
210 
211 extern(Windows) VOID timerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_TIMER Timer) {
212     FiberExt fiber = cast(FiberExt)Context;
213     fiber.schedule();
214 }
215 
216 ///
217 struct Timer {
218     void wait(Duration dur) {
219         auto timer = CreateThreadpoolTimer(&timerCallback, cast(void*)currentFiber, &environ);
220         wenforce(timer != null, "Failed to create threadpool timer");
221         FILETIME time;
222         long hnsecs = -dur.total!"hnsecs";
223         time.dwHighDateTime = cast(DWORD)(hnsecs >> 32);
224         time.dwLowDateTime = hnsecs & 0xFFFF_FFFF;
225         SetThreadpoolTimer(timer, &time, 0, 0);
226         FiberExt.yield();
227         CloseThreadpoolTimer(timer);
228     }
229 }
230 
231 ///
232 public auto timer() {
233     return Timer();
234 }
235 
236 ///
237 public void delay(Duration req) {
238     auto tm = Timer(); // Stateless on Windows
239     tm.wait(req);
240 }
241 
242 /// 
243 enum isAwaitable(E) = is (E : Event) || is (E : Semaphore) 
244     || is(E : Event*) || is(E : Semaphore*);
245 
246 ///
247 public size_t awaitAny(Awaitable...)(auto ref Awaitable args) 
248 if (allSatisfy!(isAwaitable, Awaitable)) {
249     auto box = cast(MultiAwaitBox*)calloc(1, MultiAwaitBox.sizeof);
250     box.refCount = args.length;
251     box.fiber = cast(shared)currentFiber;
252     foreach (int i, ref v; args) {
253         v.registerForWaitAny(i, box);
254     }
255     FiberExt.yield();
256     return currentFiber.wakeUpObject;
257 }
258 
259 ///
260 public size_t awaitAny(Awaitable)(Awaitable[] args) 
261 if (allSatisfy!(isAwaitable, Awaitable)) {
262     auto box = cast(MultiAwaitBox*)calloc(1, MultiAwaitBox.sizeof);
263     box.refCount = args.length;
264     box.fiber = cast(shared)currentFiber;
265     foreach (int i, ref v; args) {
266         v.registerForWaitAny(i, box);
267     }
268     FiberExt.yield();
269     return currentFiber.wakeUpObject;
270 }
271 
272 struct SchedulerBlock {
273     shared IntrusiveQueue!(FiberExt, RawEvent) queue;
274     shared uint assigned;
275     size_t[1] padding;
276 }
277 static assert(SchedulerBlock.sizeof == 64);
278 
279 class FiberExt : Fiber { 
280     FiberExt next;
281     uint numScheduler;
282     int bytesTransfered;
283     int wakeUpObject;
284 
285     enum PAGESIZE = 4096;
286     
287     this(void function() fn, uint numSched) nothrow {
288         super(fn);
289         numScheduler = numSched;
290     }
291 
292     this(void delegate() dg, uint numSched) nothrow {
293         super(dg);
294         numScheduler = numSched;
295     }
296 
297     void schedule() nothrow
298     {
299         scheds[numScheduler].queue.push(this);
300     }
301 }
302 
303 package(photon) shared SchedulerBlock[] scheds;
304 
305 enum MAX_THREADPOOL_SIZE = 100;
306 FiberExt currentFiber;
307 __gshared Map!(SOCKET, FiberExt) ioWaiters = new Map!(SOCKET, FiberExt); // mapping of sockets to awaiting fiber
308 __gshared RawEvent termination; // termination event, triggered once last fiber exits
309 __gshared HANDLE iocp; // IO Completion port
310 __gshared PTP_POOL threadPool; // for synchronious syscalls
311 __gshared TP_CALLBACK_ENVIRON_V3 environ; // callback environment for the pool
312 shared int alive; // count of non-terminated Fibers scheduled
313 
314 
315 public void startloop() {
316     SYSTEM_INFO info;
317     GetSystemInfo(&info);
318     // TODO: handle NUMA case
319     uint threads = info.dwNumberOfProcessors;
320     scheds = new SchedulerBlock[threads];
321     foreach(ref sched; scheds) {
322         sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0));
323     }
324     threadPool = CreateThreadpool(null);
325     wenforce(threadPool != null, "Failed to create threadpool");
326     SetThreadpoolThreadMaximum(threadPool, MAX_THREADPOOL_SIZE);
327     wenforce(SetThreadpoolThreadMinimum(threadPool, 1) == TRUE, "Failed to set threadpool minimum size");
328     InitializeThreadpoolEnvironment(&environ);
329     SetThreadpoolCallbackPool(&environ, threadPool);
330 
331     termination = RawEvent(false);
332     iocp = CreateIoCompletionPort(cast(HANDLE)INVALID_HANDLE_VALUE, null, 0, 1);
333     wenforce(iocp != null, "Failed to create IO Completion Port");
334     wenforce(CreateThread(null, 0, &eventLoop, null, 0, null) != null, "Failed to start event loop");
335 }
336 
337 /// Convenience overload for functions
338 public void go(void function() func) {
339     go({ func(); });
340 }
341 
342 /// Setup a fiber task to run on the Photon scheduler.
343 public void go(void delegate() func) {
344     import std.random;
345     uint choice;
346     if (scheds.length == 1) choice = 0;
347     else {
348         uint a = uniform!"[)"(0, cast(uint)scheds.length);
349         uint b = uniform!"[)"(0, cast(uint)scheds.length-1);
350         if (a == b) b = cast(uint)scheds.length-1;
351         uint loadA = scheds[a].assigned;
352         uint loadB = scheds[b].assigned;
353         if (loadA < loadB) choice = a;
354         else choice = b;
355     }
356     atomicOp!"+="(scheds[choice].assigned, 1);
357     atomicOp!"+="(alive, 1);
358     auto f = new FiberExt(func, choice);
359     logf("Assigned %x -> %d scheduler", cast(void*)f, choice);
360     f.schedule();
361 }
362 
363 package(photon) void schedulerEntry(size_t n)
364 {
365     // TODO: handle NUMA case
366     wenforce(SetThreadAffinityMask(GetCurrentThread(), 1L<<n), "failed to set affinity");
367     shared SchedulerBlock* sched = scheds.ptr + n;
368     while (alive > 0) {
369         sched.queue.event.waitAndReset();
370         for(;;) {
371             FiberExt f = sched.queue.drain();
372             if (f is null) break; // drained an empty queue, time to sleep
373             do {
374                 auto next = f.next; //save next, it will be reused on scheduling
375                 currentFiber = f;
376                 logf("Fiber %x started", cast(void*)f);
377                 try {
378                     f.call();
379                 }
380                 catch (Exception e) {
381                     stderr.writeln(e);
382                     atomicOp!"-="(alive, 1);
383                 }
384                 if (f.state == FiberExt.State.TERM) {
385                     logf("Fiber %s terminated", cast(void*)f);
386                     atomicOp!"-="(alive, 1);
387                 }
388                 f = next;
389             } while(f !is null);
390         }
391     }
392     termination.trigger();
393     foreach (ref s; scheds) {
394         s.queue.event.trigger();
395     }
396 }
397 
398 enum int MAX_COMPLETIONS = 500;
399 
400 extern(Windows) uint eventLoop(void* param) {
401     HANDLE[2] events;
402     events[0] = iocp;
403     events[1] = cast(HANDLE)termination.ev;
404     logf("Started event loop! IOCP = %x termination = %x", iocp, termination.ev);
405     for (;;) {
406         auto ret = WaitForMultipleObjects(2, events.ptr, FALSE, INFINITE);
407         logf("Got signalled in event loop %d", ret);
408         if (ret == WAIT_OBJECT_0) { // iocp
409             OVERLAPPED_ENTRY[MAX_COMPLETIONS] entries = void;
410             uint count = 0;
411             while(GetQueuedCompletionStatusEx(iocp, entries.ptr, MAX_COMPLETIONS, &count, 0, FALSE)) {
412                 logf("Dequeued I/O events=%d", count);
413                 foreach (e; entries[0..count]) {
414                     SOCKET sock = cast(SOCKET)e.lpCompletionKey;
415                     FiberExt fiber = ioWaiters[sock];
416                     fiber.bytesTransfered = cast(int)e.dwNumberOfBytesTransferred;
417                     fiber.schedule();
418                 }
419                 if (count < MAX_COMPLETIONS) break;
420             }
421         }
422         else if (ret == WAIT_OBJECT_0 + 1) { // termination
423             break;
424         }
425         else {
426             logf("Failed to wait for multiple objects: %x", ret);
427             break;
428         }
429     }
430     ExitThread(0);
431     return 0;
432 }
433 
434 
435 // ===========================================================================
436 // INTERCEPTS
437 // ===========================================================================
438 
439 extern(Windows) SOCKET socket(int af, int type, int protocol) {
440     logf("Intercepted socket!");
441     SOCKET s = WSASocketW(af, type, protocol, null, 0, WSA_FLAG_OVERLAPPED);
442     registerSocket(s);
443     return s;
444 }
445 
446 struct AcceptState {
447     SOCKET socket;
448     sockaddr* addr;
449     LPINT addrlen;
450     FiberExt fiber;
451 }
452 
453 extern(Windows) VOID acceptJob(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
454 {
455     AcceptState* state = cast(AcceptState*)Context;
456     logf("Started threadpool job");
457     SOCKET resp = WSAAccept(state.socket, state.addr, state.addrlen, null, 0);
458     if (resp != INVALID_SOCKET) {
459         registerSocket(resp);
460     }
461     state.socket = resp;
462     state.fiber.schedule();
463 }
464 
465 extern(Windows) SOCKET accept(SOCKET s, sockaddr* addr, LPINT addrlen) {
466     logf("Intercepted accept!");
467     AcceptState state;
468     state.socket = s;
469     state.addr = addr;
470     state.addrlen = addrlen;
471     state.fiber = currentFiber;
472     PTP_WORK work = CreateThreadpoolWork(&acceptJob, &state, &environ);
473     wenforce(work != null, "Failed to create work for threadpool");
474     SubmitThreadpoolWork(work);
475     FiberExt.yield();
476     CloseThreadpoolWork(work);
477     return state.socket;
478 }
479 
480 void registerSocket(SOCKET s) {
481     HANDLE port = iocp;
482     wenforce(CreateIoCompletionPort(cast(void*)s, port, cast(size_t)s, 0) == port, "failed to register I/O completion");
483 }
484 
485 extern(Windows) int recv(SOCKET s, void* buf, int len, int flags) {
486     OVERLAPPED overlapped;
487     WSABUF wsabuf = WSABUF(cast(uint)len, buf);
488     ioWaiters[s] = currentFiber;
489     uint received = 0;
490     int ret = WSARecv(s, &wsabuf, 1, &received, cast(uint*)&flags, cast(LPWSAOVERLAPPED)&overlapped, null);
491     logf("Got recv %d", ret);
492     if (ret >= 0) {
493         FiberExt.yield();
494         return received;
495     }
496     else {
497         auto lastError = GetLastError();
498         logf("Last error = %d", lastError);
499         if (lastError == ERROR_IO_PENDING) {
500             FiberExt.yield();
501             return currentFiber.bytesTransfered;
502         }
503         else
504             return ret;
505     }
506 }
507 
508 extern(Windows) int send(SOCKET s, void* buf, int len, int flags) {
509     OVERLAPPED overlapped;
510     WSABUF wsabuf = WSABUF(cast(uint)len, buf);
511     ioWaiters[s] = currentFiber;
512     uint sent = 0;
513     int ret = WSASend(s, &wsabuf, 1, &sent, flags, cast(LPWSAOVERLAPPED)&overlapped, null);
514     logf("Get send %d", ret);
515     if (ret >= 0) {
516         FiberExt.yield();
517         return sent;
518     }
519     else {
520         auto lastError = GetLastError();
521         logf("Last error = %d", lastError);
522         if (lastError == ERROR_IO_PENDING) {
523             FiberExt.yield();
524             return currentFiber.bytesTransfered;
525         }
526         else 
527             return ret;
528     }
529 }
530 
531 
532 extern(Windows) void Sleep(DWORD dwMilliseconds) {
533     if (currentFiber !is null) {
534         auto tm = timer();
535         tm.wait(dwMilliseconds.msecs);
536     } else {
537         SleepEx(dwMilliseconds, FALSE);
538     }
539 }