1 module photon.linux.core;
2 version(linux):
3 private:
4 
5 import std.stdio;
6 import std.string;
7 import std.format;
8 import std.exception;
9 import std.conv;
10 import std.array;
11 import std.meta;
12 import std.random;
13 import core.thread;
14 import core.internal.spinlock;
15 import core.sys.posix.sys.types;
16 import core.sys.posix.sys.socket;
17 import core.sys.posix.poll;
18 import core.sys.posix.netinet.in_;
19 import core.sys.posix.unistd;
20 import core.sys.linux.epoll;
21 import core.sys.linux.timerfd;
22 import core.sys.linux.sys.eventfd;
23 import core.sync.mutex;
24 import core.stdc.errno;
25 import core.atomic;
26 import core.sys.posix.stdlib: abort;
27 import core.sys.posix.fcntl;
28 import core.memory;
29 import core.sys.posix.sys.mman;
30 import core.sys.posix.pthread;
31 import core.stdc.stdlib;
32 import core.sys.linux.sys.signalfd;
33 
34 import photon.linux.support;
35 import photon.linux.syscalls;
36 import photon.ds.common;
37 import photon.ds.intrusive_queue;
38 
39 shared struct RawEvent {
40 nothrow:
41     this(int init) {
42         fd = eventfd(init, 0);
43     }
44 
45     void waitAndReset() {
46         byte[8] bytes = void;
47         ssize_t r;
48         do {
49             r = raw_read(fd, bytes.ptr, 8);
50         } while(r < 0 && errno == EINTR);
51         r.checked("event reset");
52     }
53     
54     void trigger() { 
55         union U {
56             ulong cnt;
57             ubyte[8] bytes;
58         }
59         U value;
60         value.cnt = 1;
61         ssize_t r;
62         do {
63             r = raw_write(fd, value.bytes.ptr, 8);
64         } while(r < 0 && errno == EINTR);
65         r.checked("event trigger");
66     }
67     
68     int fd;
69 }
70 
71 ///
72 public struct Event {
73 nothrow:
74     private int evfd;
75 
76     private this(bool signaled) {
77         evfd = eventfd(signaled ? 1 : 0, EFD_NONBLOCK);
78         interceptFd!(Fcntl.noop)(evfd);
79     }
80 
81     int fd() { return evfd; }
82 
83     @disable this(this);
84 
85     /// Wait for the event to be triggered, then reset and return atomically
86     void waitAndReset() {
87         byte[8] bytes = void;
88         ssize_t r;
89         do {
90             r = read(evfd, bytes.ptr, bytes.sizeof);
91         } while (r < 0 && errno == EINTR);
92     }
93 
94     void waitAndReset() shared {
95         this.unshared.waitAndReset();
96     }
97 
98     /// Trigger the event.
99     void trigger() { 
100         union U {
101             ulong cnt;
102             ubyte[8] bytes;
103         }
104         U value;
105         value.cnt = 1;
106         ssize_t r;
107         do {
108             r = write(evfd, value.bytes.ptr, value.sizeof);
109         } while(r < 0 && errno == EINTR);
110     }
111 
112     void trigger() shared {
113         this.unshared.trigger();
114     }
115 }
116 
117 ///
118 public auto event(bool triggered) {
119     return Event(triggered);
120 }
121 
122 ///
123 public struct Semaphore {
124 nothrow:
125     private int evfd;
126     ///
127     this(int count) {
128         evfd = eventfd(count, EFD_NONBLOCK | EFD_SEMAPHORE);
129         interceptFd!(Fcntl.noop)(evfd);
130     }
131 
132     int fd() { return evfd; }
133 
134     @disable this(this);
135 
136     ///
137     void wait() {
138         ubyte[8] bytes = void;
139         ssize_t r;
140         do {
141             // go through event loop
142             r = read(evfd, bytes.ptr, bytes.sizeof);
143         } while (r < 0 && errno == EINTR);
144     }
145 
146     void wait() shared {
147         this.unshared.wait();
148     }
149 
150     ///
151     void trigger(int count) {
152         union U {
153             ulong cnt;
154             ubyte[8] bytes;
155         }
156         U value;
157         value.cnt = count;
158         ssize_t r;
159         do {
160             r = write(evfd, value.bytes.ptr, value.sizeof);
161         } while(r < 0 && errno == EINTR);
162     }
163 
164     void trigger(int count) shared {
165         this.unshared.trigger(count);
166     }
167 
168     /// Free this semaphore
169     void dispose() {
170         close(evfd).checked;
171     }
172 
173     void dispose() shared {
174         this.unshared.dispose();
175     }
176 }
177 
178 ///
179 public auto nothrow semaphore(int initialCount) {
180     return Semaphore(initialCount);
181 }
182 
183 /// A fiber-friendly timer.
184 public struct Timer {
185 nothrow:
186     private int timerfd;
187 
188 
189     int fd() {
190         return timerfd;
191     }
192 
193     static void duration2ts(timespec *ts, Duration d) {
194         auto total = d.total!"nsecs"();
195         ts.tv_sec = total / 1_000_000_000;
196         ts.tv_nsec = total % 1_000_000_000;
197     }
198 
199     private void arm(const timespec *ts_timeout) {
200         itimerspec its;
201         its.it_value = *ts_timeout;
202         its.it_interval.tv_sec = 0;
203         its.it_interval.tv_nsec = 0;
204         timerfd_settime(timerfd, 0, &its, null);
205     }
206 
207     private void arm(Duration timeout) {
208         timespec ts_timeout;
209         duration2ts(&ts_timeout, timeout); //convert duration to timespec
210         arm(&ts_timeout);
211     }
212 
213     private void disarm() {
214         itimerspec its; // zeros
215         timerfd_settime(timerfd, 0, &its, null);
216     }
217 
218     /// Wait for the timer to trigger.
219     void wait(T)(T duration)
220     if (is(T : const timespec*) || is(T : Duration)) {
221         arm(duration);
222         union U {
223             ulong timeouts;
224             ubyte[8] bytes;
225         }
226         U u = void;
227         ssize_t r;
228         do {
229             r = read(timerfd, u.bytes.ptr, U.sizeof);
230         } while (r < 0 && errno == EINTR);
231         disarm();
232     }
233 
234     void dispose() { 
235         close(timerfd).checked;
236     }
237 }
238 
239 ///
240 public nothrow Timer timer() {
241     int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC).checked;
242     interceptFd!(Fcntl.noop)(timerfd);
243     return Timer(timerfd);
244 }
245 
246 struct AwaitingFiber {
247     shared FiberExt fiber;
248     AwaitingFiber* next;
249 
250     void scheduleAll(int wakeFd) nothrow
251     {
252         auto w = &this;
253         FiberExt head;
254         // first process all AwaitingFibers since they are on stack
255         do {
256             auto fiber = steal(w.fiber);
257             if (fiber) {
258                 fiber.unshared.next = head;
259                 head = fiber.unshared;
260             }
261             w = w.next;
262         } while(w);
263         while(head) {
264             logf("Waking with FD=%d", wakeFd);
265             head.wakeFd = wakeFd;
266             auto next = head.next; // schedule uses .next pointer
267             head.schedule();
268             head = next;
269         }
270     }
271 }
272 
273 class FiberExt : Fiber { 
274     FiberExt next;
275     uint numScheduler;
276     int wakeFd; // recieves fd that woken us up
277 
278     enum PAGESIZE = 4096;
279     
280     this(void function() fn, uint numSched) nothrow {
281         super(fn);
282         numScheduler = numSched;
283     }
284 
285     this(void delegate() dg, uint numSched) nothrow {
286         super(dg);
287         numScheduler = numSched;
288     }
289 
290     void schedule() nothrow
291     {
292         scheds[numScheduler].queue.push(this);
293     }
294 }
295 
296 FiberExt currentFiber; 
297 shared RawEvent termination; // termination event, triggered once last fiber exits
298 shared pthread_t eventLoop; // event loop, runs outside of D runtime
299 shared int alive; // count of non-terminated Fibers scheduled
300 
301 Timer[] timerPool; // thread-local pool of preallocated timers
302 
303 nothrow auto getSleepTimer() {
304     Timer tm;
305     if (timerPool.length == 0) {
306         tm = timer();
307     } else {
308         tm = timerPool[$-1];
309         timerPool.length = timerPool.length-1;
310     }
311     return tm;
312 }
313 
314 nothrow void freeSleepTimer(Timer tm) {
315     timerPool.assumeSafeAppend();
316     timerPool ~= tm;
317 }
318 
319 /// Delay fiber execution by `req` duration.
320 public nothrow void delay(T)(T req)
321 if (is(T : const timespec*) || is(T : Duration)) {
322     auto tm = getSleepTimer();
323     tm.wait(req);
324     freeSleepTimer(tm);
325 }
326 
327 /// 
328 enum isAwaitable(E) = is (E : Event) || is (E : Semaphore) 
329     || is(E : Event*) || is(E : Semaphore*);
330 
331 ///
332 public size_t awaitAny(Awaitable...)(auto ref Awaitable args) 
333 if (allSatisfy!(isAwaitable, Awaitable)) {
334     pollfd* fds = cast(pollfd*)calloc(args.length, pollfd.sizeof);
335     scope(exit) free(fds);
336     foreach (i, ref arg; args) {
337         fds[i].fd = arg.fd;
338         fds[i].events = POLL_IN;
339     }
340     int resp;
341     do {
342         resp = poll(fds, args.length, -1); 
343     } while (resp < 0 && errno == EINTR);
344     foreach (idx, ref fd; fds[0..args.length]) {
345         if (fd.revents & POLL_IN) {
346             ubyte[8] tmp;
347             read(fds[idx].fd, tmp.ptr, tmp.sizeof);
348             return idx;
349         }
350     }
351     assert(0);
352 }
353 
354 ///
355 public size_t awaitAny(Awaitable)(Awaitable[] args) 
356 if (allSatisfy!(isAwaitable, Awaitable)) {
357     pollfd* fds = cast(pollfd*)calloc(args.length, pollfd.sizeof);
358     scope(exit) free(fds);
359     foreach (i, ref arg; args) {
360         fds[i].fd = arg.fd;
361         fds[i].events = POLL_IN;
362     }
363     ssize_t resp;
364     do {
365         resp = poll(fds, args.length, -1); 
366     } while (resp < 0 && errno == EINTR);
367     foreach (idx, ref fd; fds[0..args.length]) {
368         if (fd.revents & POLL_IN) {
369             ubyte[8] tmp;
370             read(fds[idx].fd, tmp.ptr, tmp.sizeof);
371             return idx;
372         }
373     }
374     assert(0);
375 }
376 
377 struct SchedulerBlock {
378     shared IntrusiveQueue!(FiberExt, RawEvent) queue;
379     shared uint assigned;
380     size_t[2] padding;
381 }
382 static assert(SchedulerBlock.sizeof == 64);
383 
384 package(photon) shared SchedulerBlock[] scheds;
385 
386 enum int MAX_EVENTS = 500;
387 enum int SIGNAL = 42;
388 
389 package(photon) void schedulerEntry(size_t n)
390 {
391     int tid = gettid();
392     cpu_set_t mask;
393     CPU_SET(n, &mask);
394     sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity");
395     shared SchedulerBlock* sched = scheds.ptr + n;
396     while (alive > 0) {
397         sched.queue.event.waitAndReset();
398         for(;;) {
399             FiberExt f = sched.queue.drain();
400             if (f is null) break; // drained an empty queue, time to sleep
401             do {
402                 auto next = f.next; //save next, it will be reused on scheduling
403                 currentFiber = f;
404                 logf("Fiber %x started", cast(void*)f);
405                 try {
406                     f.call();
407                 }
408                 catch (Exception e) {
409                     stderr.writeln(e);
410                     atomicOp!"-="(alive, 1);
411                 }
412                 if (f.state == FiberExt.State.TERM) {
413                     logf("Fiber %s terminated", cast(void*)f);
414                     atomicOp!"-="(alive, 1);
415                 }
416                 f = next;
417             } while(f !is null);
418         }
419     }
420     termination.trigger();
421     foreach (ref s; scheds) {
422         s.queue.event.trigger();
423     }
424 }
425 
426 /// Convenience overload for functions
427 public void go(void function() func) {
428     go({ func(); });
429 }
430 
431 /// Setup a fiber task to run on the Photon scheduler.
432 public void go(void delegate() func) {
433     uint choice;
434     if (scheds.length == 1) choice = 0;
435     else {
436         uint a = uniform!"[)"(0, cast(uint)scheds.length);
437         uint b = uniform!"[)"(0, cast(uint)scheds.length-1);
438         if (a == b) b = cast(uint)scheds.length-1;
439         uint loadA = scheds[a].assigned;
440         uint loadB = scheds[b].assigned;
441         if (loadA < loadB) choice = a;
442         else choice = b;
443     }
444     atomicOp!"+="(scheds[choice].assigned, 1);
445     atomicOp!"+="(alive, 1);
446     auto f = new FiberExt(func, choice);
447     logf("Assigned %x -> %d scheduler", cast(void*)f, choice);
448     f.schedule();
449 }
450 
451 shared Descriptor[] descriptors;
452 shared int event_loop_fd;
453 shared int signal_loop_fd;
454 
455 enum ReaderState: uint {
456     EMPTY = 0,
457     UNCERTAIN = 1,
458     READING = 2,
459     READY = 3
460 }
461 
462 enum WriterState: uint {
463     READY = 0,
464     UNCERTAIN = 1,
465     WRITING = 2,
466     FULL = 3
467 }
468 
469 enum DescriptorState: uint {
470     NOT_INITED,
471     INITIALIZING,
472     NONBLOCKING,
473     THREADPOOL
474 }
475 
476 // list of awaiting fibers
477 shared struct Descriptor {
478     ReaderState _readerState;   
479     AwaitingFiber* _readerWaits;
480     WriterState _writerState;
481     AwaitingFiber* _writerWaits;
482     DescriptorState state;
483 nothrow:
484     ReaderState readerState()() {
485         return atomicLoad(_readerState);
486     }
487 
488     WriterState writerState()() {
489         return atomicLoad(_writerState);
490     }
491 
492     // try to change state & return whatever it happend to be in the end
493     bool changeReader()(ReaderState from, ReaderState to) {
494         return cas(&_readerState, from, to);
495     }
496 
497     // ditto for writer
498     bool changeWriter()(WriterState from, WriterState to) {
499         return cas(&_writerState, from, to);
500     }
501 
502     //
503     shared(AwaitingFiber)* readWaiters()() {
504         return atomicLoad(_readerWaits);
505     }
506 
507     //
508     shared(AwaitingFiber)* writeWaiters()(){
509         return atomicLoad(_writerWaits);
510     }
511 
512     // try to enqueue reader fiber given old head
513     bool enqueueReader()(shared(AwaitingFiber)* fiber) {
514         auto head = readWaiters;
515         if (head == fiber) {
516             return true; // TODO: HACK
517         }
518         fiber.next = head;
519         return cas(&_readerWaits, head, fiber);
520     }
521 
522     void removeReader()(shared(AwaitingFiber)* fiber) {
523         auto head = steal(_readerWaits);
524         if (head is null || head.next is null) return;
525         head = removeFromList(head.unshared, fiber);
526         cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null);
527     }
528 
529     // try to enqueue writer fiber given old head
530     bool enqueueWriter()(shared(AwaitingFiber)* fiber) {
531         auto head = writeWaiters;
532         if (head == fiber) {
533             return true; // TODO: HACK
534         }
535         fiber.next = head;
536         return cas(&_writerWaits, head, fiber);
537     }
538 
539     void removeWriter()(shared(AwaitingFiber)* fiber) {
540         auto head = steal(_writerWaits);
541         if (head is null || head.next is null) return;
542         head = removeFromList(head.unshared, fiber);
543         cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null);
544     }
545 
546     // try to schedule readers - if fails - someone added a reader, it's now his job to check state
547     void scheduleReaders()(int wakeFd) {
548         auto w = steal(_readerWaits);
549         if (w) w.unshared.scheduleAll(wakeFd);
550     }
551 
552     // try to schedule writers, ditto
553     void scheduleWriters()(int wakeFd) {
554         auto w = steal(_writerWaits);
555         if (w) w.unshared.scheduleAll(wakeFd);
556     }
557 }
558 
559 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*)
560 {
561     version(photon_tracing) printStats();
562     _exit(9);
563 }
564 
565 version(photon_tracing) 
566 void printStats()
567 {
568     // TODO: report on various events in eventloop/scheduler
569     string msg = "Tracing report:\n\n";
570     write(2, msg.ptr, msg.length);
571 }
572 
573 public void startloop()
574 {
575     import core.cpuid;
576     uint threads = threadsPerCPU;
577 
578     event_loop_fd = cast(int)epoll_create1(0).checked("ERROR: Failed to create event-loop!");
579     // use RT signals, disable default termination on signal received
580     sigset_t mask;
581     sigemptyset(&mask);
582     sigaddset(&mask, SIGNAL);
583     pthread_sigmask(SIG_BLOCK, &mask, null).checked;
584     signal_loop_fd = cast(int)signalfd(-1, &mask, 0).checked("ERROR: Failed to create signalfd!");
585 
586     epoll_event event;
587     event.events = EPOLLIN;
588     event.data.fd = signal_loop_fd;
589     epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, signal_loop_fd, &event).checked;
590 
591     termination = RawEvent(0);
592     event.events = EPOLLIN;
593     event.data.fd = termination.fd;
594     epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, termination.fd, &event).checked;
595 
596     {
597         
598         sigaction_t action;
599         action.sa_sigaction = &graceful_shutdown_on_signal;
600         sigaction(SIGTERM, &action, null).checked;
601     }
602 
603     ssize_t fdMax = sysconf(_SC_OPEN_MAX).checked;
604     descriptors = (cast(shared(Descriptor*)) mmap(null, fdMax * Descriptor.sizeof, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0))[0..fdMax];
605     scheds = new SchedulerBlock[threads];
606     foreach(ref sched; scheds) {
607         sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0));
608     }
609     eventLoop = pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null);
610 }
611 
612 package(photon) void stoploop()
613 {
614     void* ret;
615     pthread_join(eventLoop, &ret);
616 }
617 
618 extern(C) void* processEventsEntry(void*)
619 {
620     for (;;) {
621         epoll_event[MAX_EVENTS] events = void;
622         signalfd_siginfo[20] fdsi = void;
623         int r;
624         do {
625             r = epoll_wait(event_loop_fd, events.ptr, MAX_EVENTS, -1);
626         } while (r < 0 && errno == EINTR);
627         checked(r);
628         for (int n = 0; n < r; n++) {
629             int fd = events[n].data.fd;
630             if (fd == termination.fd) {
631                 foreach(ref s; scheds) s.queue.event.trigger();
632                 return null;
633             }
634             else if (fd == signal_loop_fd) {
635                 logf("Intercepted our aio SIGNAL");
636                 ssize_t r2 = raw_read(signal_loop_fd, &fdsi, fdsi.sizeof);
637                 logf("aio events = %d", r2 / signalfd_siginfo.sizeof);
638                 if (r2 % signalfd_siginfo.sizeof != 0)
639                     checked(r2, "ERROR: failed read on signalfd");
640 
641                 for(int i = 0; i < r2 / signalfd_siginfo.sizeof; i++) { //TODO: stress test multiple signals
642                     logf("Processing aio event idx = %d", i);
643                     if (fdsi[i].ssi_signo == SIGNAL) {
644                         logf("HIT our SIGNAL");
645                         auto fiber = cast(FiberExt)cast(void*)fdsi[i].ssi_ptr;
646                         fiber.schedule();
647                     }
648                 }
649             }
650             else {
651                 auto descriptor = descriptors.ptr + fd;
652                 if (descriptor.state == DescriptorState.NONBLOCKING) {
653                     if (events[n].events & EPOLLIN) {
654                         logf("Read event for fd=%d", fd);
655                         auto state = descriptor.readerState;
656                         logf("read state = %d", state);
657                         final switch(state) with(ReaderState) { 
658                             case EMPTY:
659                                 logf("Trying to schedule readers");
660                                 descriptor.changeReader(EMPTY, READY);
661                                 descriptor.scheduleReaders(fd);
662                                 logf("Scheduled readers");
663                                 break;
664                             case UNCERTAIN:
665                                 descriptor.changeReader(UNCERTAIN, READY);
666                                 break;
667                             case READING:
668                                 if (!descriptor.changeReader(READING, UNCERTAIN)) {
669                                     if (descriptor.changeReader(EMPTY, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake readers
670                                         descriptor.scheduleReaders(fd);
671                                 }
672                                 break;
673                             case READY:
674                                 descriptor.scheduleReaders(fd);
675                                 break;
676                         }
677                         logf("Awaits %x", cast(void*)descriptor.readWaiters);
678                     }
679                     if (events[n].events & EPOLLOUT) {
680                         logf("Write event for fd=%d", fd);
681                         auto state = descriptor.writerState;
682                         logf("write state = %d", state);
683                         final switch(state) with(WriterState) { 
684                             case FULL:
685                                 descriptor.changeWriter(FULL, READY);
686                                 descriptor.scheduleWriters(fd);
687                                 break;
688                             case UNCERTAIN:
689                                 descriptor.changeWriter(UNCERTAIN, READY);
690                                 break;
691                             case WRITING:
692                                 if (!descriptor.changeWriter(WRITING, UNCERTAIN)) {
693                                     if (descriptor.changeWriter(FULL, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake writers
694                                         descriptor.scheduleWriters(fd);
695                                 }
696                                 break;
697                             case READY:
698                                 descriptor.scheduleWriters(fd);
699                                 break;
700                         }
701                         logf("Awaits %x", cast(void*)descriptor.writeWaiters);
702                     }
703                 }
704             }
705         }
706     }
707 }
708 
709 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF }
710 enum SyscallKind { accept, read, write, connect }
711 
712 // intercept - a filter for file descriptor, changes flags and register on first use
713 void interceptFd(Fcntl needsFcntl)(int fd) nothrow {
714     logf("Hit interceptFD");
715     if (fd < 0 || fd >= descriptors.length) return;
716     if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) {
717         logf("First use, registering fd = %s", fd);
718         static if(needsFcntl == Fcntl.explicit) {
719             int flags = fcntl(fd, F_GETFL, 0);
720             fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked;
721             logf("Setting FCNTL. %x", cast(void*)currentFiber);
722         }
723         epoll_event event;
724         event.events = EPOLLIN | EPOLLOUT | EPOLLET;
725         event.data.fd = fd;
726         if (epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, fd, &event) < 0 && errno == EPERM) {
727             logf("isSocket = false FD = %s", fd);
728             descriptors[fd].state = DescriptorState.THREADPOOL;
729         }
730         else {
731             logf("isSocket = true FD = %s", fd);
732             descriptors[fd].state = DescriptorState.NONBLOCKING;
733         }
734     }
735 }
736 
737 void deregisterFd(int fd) nothrow {
738     if(fd >= 0 && fd < descriptors.length) {
739         auto descriptor = descriptors.ptr + fd;
740         atomicStore(descriptor._writerState, WriterState.READY);
741         atomicStore(descriptor._readerState, ReaderState.EMPTY);
742         descriptor.scheduleReaders(fd);
743         descriptor.scheduleWriters(fd);
744         atomicStore(descriptor.state, DescriptorState.NOT_INITED);
745     }
746 }
747 
748 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...)
749                         (int fd, T args) nothrow {
750     if (currentFiber is null) {
751         logf("%s PASSTHROUGH FD=%s", name, fd);
752         return syscall(ident, fd, args).withErrorno;
753     }
754     else {
755         logf("HOOKED %s FD=%d", name, fd);
756         interceptFd!(fcntlStyle)(fd);
757         shared(Descriptor)* descriptor = descriptors.ptr + fd;
758         if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) {
759             logf("%s syscall THREADPOLL FD=%d", name, fd);
760             //TODO: offload syscall to thread-pool
761             return syscall(ident, fd, args).withErrorno;
762         }
763     L_start:
764         shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null);
765         // set flags argument if able to avoid direct fcntl calls
766         static if (fcntlStyle != Fcntl.explicit)
767         {
768             args[2] |= fcntlStyle;
769         }
770         //if (kind == SyscallKind.accept)
771         logf("kind:s args:%s", kind, args);
772         static if(kind == SyscallKind.accept || kind == SyscallKind.read) {
773             auto state = descriptor.readerState;
774             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
775             final switch (state) with (ReaderState) {
776             case EMPTY:
777                 logf("EMPTY - enqueue reader");
778                 if (!descriptor.enqueueReader(&await)) goto L_start;
779                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
780                 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd);
781                 FiberExt.yield();
782                 goto L_start;
783             case UNCERTAIN:
784                 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING
785                 goto case READING;
786             case READY:
787                 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads
788                 goto case READING;
789             case READING:
790                 ssize_t resp = syscall(ident, fd, args);
791                 static if (kind == SyscallKind.accept) {
792                     if (resp >= 0) // for accept we never know if we emptied the queue
793                         descriptor.changeReader(READING, UNCERTAIN);
794                     else if (resp == -ERR || resp == -EAGAIN) {
795                         if (descriptor.changeReader(READING, EMPTY))
796                             goto case EMPTY;
797                         goto L_start; // became UNCERTAIN or READY in meantime
798                     }
799                 }
800                 else static if (kind == SyscallKind.read) {
801                     if (resp == args[1]) // length is 2nd in (buf, length, ...)
802                         descriptor.changeReader(READING, UNCERTAIN);
803                     else if(resp >= 0)
804                         descriptor.changeReader(READING, EMPTY);
805                     else if (resp == -ERR || resp == -EAGAIN) {
806                         if (descriptor.changeReader(READING, EMPTY))
807                             goto case EMPTY;
808                         goto L_start; // became UNCERTAIN or READY in meantime
809                     }
810                 }
811                 else
812                     static assert(0);
813                 return withErrorno(resp);
814             }
815         }
816         else static if(kind == SyscallKind.write || kind == SyscallKind.connect) {
817             //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking?
818             auto state = descriptor.writerState;
819             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
820             final switch (state) with (WriterState) {
821             case FULL:
822                 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber);
823                 if (!descriptor.enqueueWriter(&await)) goto L_start;
824                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
825                 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd);
826                 FiberExt.yield();
827                 goto L_start;
828             case UNCERTAIN:
829                 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber);
830                 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING
831                 goto case WRITING;
832             case READY:
833                 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes
834                 goto case WRITING;
835             case WRITING:
836                 ssize_t resp = syscall(ident, fd, args);
837                 static if (kind == SyscallKind.connect) {
838                     if(resp >= 0) {
839                         descriptor.changeWriter(WRITING, READY);
840                     }
841                     else if (resp == -ERR || resp == -EALREADY) {
842                         if (descriptor.changeWriter(WRITING, FULL)) {
843                             goto case FULL;
844                         }
845                         goto L_start; // became UNCERTAIN or READY in meantime
846                     }
847                     return withErrorno(resp);
848                 }
849                 else {
850                     if (resp == args[1]) // (buf, len) args to syscall
851                         descriptor.changeWriter(WRITING, UNCERTAIN);
852                     else if(resp >= 0) {
853                         logf("Short-write on FD=%d, become FULL", fd);
854                         descriptor.changeWriter(WRITING, FULL);
855                     }
856                     else if (resp == -ERR || resp == -EAGAIN) {
857                         if (descriptor.changeWriter(WRITING, FULL)) {
858                             logf("Sudden block on FD=%d, become FULL", fd);
859                             goto case FULL;
860                         }
861                         goto L_start; // became UNCERTAIN or READY in meantime
862                     }
863                     return withErrorno(resp);
864                 }
865             }
866         }
867         assert(0);
868     }
869 }
870 
871 class Channel(T) {
872     private T[] buf;
873     private size_t size;
874     private Event ev;
875     void put(T item) {
876         if (buf.length == size) {
877             ev.await();
878         }
879         if (buf.length == 0) ev.reset();
880         buf ~= item;
881     }
882     auto front() {
883         if (buf.length == 0) {
884             ev.await();
885         }
886         if (buf.length == size) ev.reset();
887         return buf[$-1];
888     }
889     void popFront() {
890         buf = buf[0..$-1];
891         ev.reset();
892         buf.assumeSafeAppend();
893     }
894     bool empty() {
895         return buf.length == 0;
896     }
897 }
898 
899 Channel!T channel(T)(size_t size) {
900     return new Channel(new T[size], size, Event());
901 }
902 
903 // ======================================================================================
904 // SYSCALL warappers intercepts
905 // ======================================================================================
906 nothrow:
907 
908 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow
909 {
910     return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK)
911         (fd, cast(size_t)buf, count);
912 }
913 
914 extern(C) ssize_t write(int fd, const void *buf, size_t count)
915 {
916     return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
917         (fd, cast(size_t)buf, count);
918 }
919 
920 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen)
921 {
922     return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK)
923         (sockfd, cast(size_t) addr, cast(size_t) addrlen);    
924 }
925 
926 extern(C) ssize_t accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags)
927 {
928     return universalSyscall!(SYS_ACCEPT4, "accept4", SyscallKind.accept, Fcntl.sock, EWOULDBLOCK)
929         (sockfd, cast(size_t) addr, cast(size_t) addrlen, flags);
930 }
931 
932 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen)
933 {
934     return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS)
935         (sockfd, cast(size_t) addr, cast(size_t) addrlen);
936 }
937 
938 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
939                       const sockaddr *dest_addr, socklen_t addrlen)
940 {
941     return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
942         (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen);
943 }
944 
945 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow {
946     sockaddr_in src_addr;
947     src_addr.sin_family = AF_INET;
948     src_addr.sin_port = 0;
949     src_addr.sin_addr.s_addr = htonl(INADDR_ANY);
950     ssize_t addrlen = sockaddr_in.sizeof;
951     return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen);   
952 }
953 
954 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
955                         sockaddr *src_addr, ssize_t* addrlen) nothrow
956 {
957     return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK)
958         (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen);
959 }
960 
961 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout)
962 {
963     nothrow bool nonBlockingCheck(ref ssize_t result) {
964         bool uncertain;
965     L_cacheloop:
966         foreach (ref fd; fds[0..nfds]) {
967             interceptFd!(Fcntl.explicit)(fd.fd);
968             fd.revents = 0;
969             auto descriptor = descriptors.ptr + fd.fd;
970             if (fd.events & POLLIN) {
971                 auto state = descriptor.readerState;
972                 logf("Found event %d for reader in select", state);
973                 switch(state) with(ReaderState) {
974                 case READY:
975                     fd.revents |=  POLLIN;
976                     break;
977                 case EMPTY:
978                     break;
979                 default:
980                     uncertain = true;
981                     break L_cacheloop;
982                 }
983             }
984             if (fd.events & POLLOUT) {
985                 auto state = descriptor.writerState;
986                 logf("Found event %d for writer in select", state);
987                 switch(state) with(WriterState) {
988                 case READY:
989                     fd.revents |= POLLOUT;
990                     break;
991                 case FULL:
992                     break;
993                 default:
994                     uncertain = true;
995                     break L_cacheloop;
996                 }
997             }
998         }
999         // fallback to system poll call if descriptor state is uncertain
1000         if (uncertain) {
1001             logf("Fallback to system poll, descriptors have uncertain state");
1002             ssize_t p = raw_poll(fds, nfds, 0);
1003             if (p != 0) {
1004                 result = p;
1005                 logf("Raw poll returns %d", result);
1006                 return true;
1007             }
1008         }
1009         else {
1010             ssize_t j = 0;
1011             foreach (i; 0..nfds) {
1012                 if (fds[i].revents) {
1013                     fds[j++] = fds[i];
1014                 }
1015             }
1016             logf("Using our own event cache: %d events", j);
1017             if (j > 0) {
1018                 result = cast(ssize_t)j;
1019                 return true;
1020             }
1021         }
1022         return false;
1023     }
1024     if (currentFiber is null) {
1025         logf("POLL PASSTHROUGH!");
1026         return raw_poll(fds, nfds, timeout);
1027     }
1028     else {
1029         logf("HOOKED POLL %d fds timeout %d", nfds, timeout);
1030         if (nfds < 0) return -EINVAL.withErrorno;
1031         if (nfds == 0) {
1032             if (timeout == 0) return 0;
1033             shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
1034             Timer tm = timer();
1035             descriptors[tm.fd].enqueueReader(&aw);
1036             scope(exit) tm.dispose();
1037             tm.arm(timeout.msecs);
1038             logf("Timer fd=%d", tm.fd);
1039             Fiber.yield();
1040             logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd);
1041             return 0;
1042         }
1043         foreach(ref fd; fds[0..nfds]) {
1044             if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno;
1045             fd.revents = 0;
1046         }
1047         ssize_t result = 0;
1048         if (nonBlockingCheck(result)) return result;
1049         shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
1050         foreach (i; 0..nfds) {
1051             if (fds[i].events & POLLIN)
1052                 descriptors[fds[i].fd].enqueueReader(&aw);
1053             else if(fds[i].events & POLLOUT)
1054                 descriptors[fds[i].fd].enqueueWriter(&aw);
1055         }
1056         int timeoutFd = -1;
1057         if (timeout > 0) {
1058             Timer tm = timer();
1059             timeoutFd = tm.fd;
1060             scope(exit) tm.dispose();
1061             tm.arm(timeout.msecs);
1062             descriptors[tm.fd].enqueueReader(&aw);
1063             Fiber.yield();
1064             tm.disarm();
1065             atomicStore(descriptors[tm.fd]._readerWaits, cast(shared(AwaitingFiber)*)null);
1066         }
1067         else {
1068             Fiber.yield();
1069         }
1070         foreach (i; 0..nfds) {
1071             if (fds[i].events & POLLIN)
1072                 descriptors[fds[i].fd].removeReader(&aw);
1073             else if(fds[i].events & POLLOUT)
1074                 descriptors[fds[i].fd].removeWriter(&aw);
1075         }
1076         logf("Woke up after select %x. WakeFD=%d", cast(void*)currentFiber, currentFiber.wakeFd);
1077         if (currentFiber.wakeFd == timeoutFd) return 0;
1078         else {
1079             nonBlockingCheck(result);
1080             return result;
1081         }
1082     }
1083 }
1084 
1085 extern(C) private ssize_t nanosleep(const timespec* req, const timespec* rem) {
1086     if (currentFiber !is null) {
1087         delay(req);
1088         return 0;
1089     } else {
1090         return syscall(SYS_NANOSLEEP, cast(size_t)req, cast(size_t)rem).withErrorno;
1091     }
1092 }
1093 
1094 extern(C) private ssize_t close(int fd) nothrow
1095 {
1096     logf("HOOKED CLOSE FD=%d", fd);
1097     deregisterFd(fd);
1098     return cast(int)withErrorno(syscall(SYS_CLOSE, fd));
1099 }