1 module photon.macos.core;
2 version(OSX):
3 private:
4 
5 import std.stdio;
6 import std.string;
7 import std.format;
8 import std.exception;
9 import std.conv;
10 import std.array;
11 import core.thread;
12 import core.internal.spinlock;
13 import core.sys.posix.sys.types;
14 import core.sys.posix.sys.socket;
15 import core.sys.posix.poll;
16 import core.sys.posix.netinet.in_;
17 import core.sys.freebsd.unistd;
18 import core.sync.mutex;
19 import core.stdc.errno;
20 import core.stdc.signal;
21 import core.stdc.time;
22 import core.atomic;
23 import core.sys.posix.stdlib: abort;
24 import core.sys.posix.fcntl;
25 import core.memory;
26 import core.sys.posix.sys.mman;
27 import core.sys.posix.pthread;
28 import core.sys.darwin.sys.event;
29 import core.sys.darwin.mach.thread_act;
30 
31 import photon.macos.support;
32 import photon.ds.common;
33 import photon.ds.intrusive_queue;
34 
35 alias KEvent = kevent_t;
36 enum SYS_READ = 3;
37 enum SYS_WRITE = 4;
38 enum SYS_ACCEPT = 30;
39 enum SYS_CONNECT = 98;
40 enum SYS_SENDTO = 133;
41 enum SYS_RECVFROM = 29;
42 enum SYS_CLOSE = 6;
43 enum SYS_GETTID = 286;
44 enum SYS_POLL = 230;
45 
46 struct thread;
47 alias thread_t = thread*;
48 extern(C) thread_t current_thread();
49 
50 // T becomes thread-local b/c it's stolen from shared resource
51 auto steal(T)(ref shared T arg)
52 {
53     for (;;) {
54         auto v = atomicLoad(arg);
55         if(cas(&arg, v, cast(shared(T))null)) return v;
56     }
57 }
58 
59 shared struct RawEvent {
60 nothrow:
61     this(int dummy) {
62         int[2] fds;
63         pipe(fds).checked("event creation");
64         this.fds  = fds;
65     }
66 
67     void waitAndReset() {
68         byte[1] bytes = void;
69         ssize_t r;
70         do {
71             r = raw_read(fds[0], bytes.ptr, 1);
72         } while(r < 0 && errno == EINTR);
73         r.checked("event reset");
74     }
75     
76     void trigger() { 
77         ubyte[1] bytes;
78         ssize_t r;
79         do {
80             r = raw_write(fds[1], bytes.ptr, 1);
81         } while(r < 0 && errno == EINTR);
82         r.checked("event trigger");
83     }
84     
85     private int[2] fds;
86 }
87 
88 shared size_t timerId;
89 
90 struct Timer {
91 nothrow:
92     this(size_t id) {
93         this.id = id;
94     }
95 
96     ///
97     void wait(const timespec* ts) {
98         wait(ts.tv_sec.seconds + ts.tv_nsec.nsecs);
99     }
100 
101     ///
102     void wait(Duration dur) {
103         KEvent event;
104         event.ident = id;
105         event.filter = EVFILT_TIMER;
106         event.flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
107         event.fflags = 0;
108         event.data = dur.total!"msecs";
109         event.udata = cast(void*)(cast(size_t)cast(void*)currentFiber | 0x1);
110         timespec timeout;
111         timeout.tv_nsec = 1000;
112         kevent(kq, &event, 1, null, 0, &timeout).checked("arming the timer");
113         FiberExt.yield();
114     }
115 
116     private void waitThread(const timespec* ts) {
117         auto dur = ts.tv_sec.seconds + ts.tv_nsec.nsecs;
118         KEvent event;
119         event.ident = id;
120         event.filter = EVFILT_TIMER;
121         event.flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
122         event.fflags = 0;
123         event.data = dur.total!"msecs";
124         event.udata = cast(void*)(cast(size_t)mach_thread_self() << 1);
125         timespec timeout;
126         timeout.tv_nsec = 1000;
127         kevent(kq, &event, 1, null, 0, &timeout).checked("arming the timer for a thread");
128     }
129 
130     private size_t id;
131 }
132 
133 /// Allocate a timer
134 public nothrow auto timer() {
135     return Timer(atomicFetchAdd(timerId, 1));
136 }
137 
138 
139 public struct Event {
140 nothrow:
141     @disable this(this);
142 
143     this(bool signaled) {
144         int[2] fds;
145         pipe(fds).checked;
146         if (signaled) trigger();
147         this.fds = fds;
148     }
149 
150     /// Wait for the event to be triggered, then reset and return atomically
151     void waitAndReset() {
152         byte[4096] bytes = void;
153         ssize_t r;
154         do {
155             r = read(fds[0], bytes.ptr, bytes.sizeof);
156         } while(r < 0 && errno == EINTR);
157     }
158 
159     void waitAndReset() shared {
160         this.unshared.waitAndReset();
161     }
162     
163     /// Trigger the event.
164     void trigger() { 
165         ubyte[1] bytes = void;
166         ssize_t r;
167         do {
168             r = write(fds[1], bytes.ptr, 1);
169         } while(r < 0 && errno == EINTR);
170     }
171 
172     void trigger() shared {
173         this.unshared.trigger();
174     }
175 
176     ///
177     void dispose() {
178         close(fds[0]);
179         close(fds[1]);
180     }
181 
182     void dispose() shared {
183         this.unshared.dispose();
184     }
185 
186     private int[2] fds;
187 }
188 
189 ///
190 public nothrow auto event(bool signaled) {
191     return Event(signaled);
192 }
193 
194 
195 public struct Semaphore {
196 nothrow:
197     @disable this(this);
198 
199     this(int initial) {
200         int[2] fds;
201         pipe(fds).checked;
202         if (initial > 0) {
203             trigger(initial);
204         }
205         this.fds = fds;
206     }
207 
208     /// 
209     void wait() {
210         byte[1] bytes = void;
211         ssize_t r;
212         do {
213             r = read(fds[0], bytes.ptr, bytes.sizeof);
214         } while(r < 0 && errno == EINTR);
215     }
216 
217     ///
218     void wait() shared {
219         this.unshared.wait();
220     }
221     
222     /// 
223     void trigger(int count) { 
224         ubyte[4096] bytes = void;
225         ssize_t size = count > 4096 ? 4096 : count;
226         ssize_t r;
227         do {
228             r = write(fds[1], bytes.ptr, size);
229         } while(r < 0 && errno == EINTR);
230     }
231 
232     /// 
233     void trigger(int count) shared {
234         this.unshared.trigger(count);
235     }
236 
237     ///
238     void dispose() {
239         close(fds[0]);
240         close(fds[1]);
241     }
242 
243     ///
244     void dispose() shared {
245         this.unshared.dispose();
246     }
247 
248     private int[2] fds;
249 }
250 
251 ///
252 public nothrow auto semaphore(int initial) {
253     return Semaphore(initial);
254 }
255 
256 struct AwaitingFiber {
257     shared FiberExt fiber;
258     AwaitingFiber* next;
259 
260     void scheduleAll(int wakeFd) nothrow
261     {
262         auto w = &this;
263         FiberExt head;
264         // first process all AwaitingFibers since they are on stack
265         do {
266             auto fiber = steal(w.fiber);
267             if (fiber) {
268                 fiber.unshared.next = head;
269                 head = fiber.unshared;
270             }
271             w = w.next;
272         } while(w);
273         while(head) {
274             logf("Waking with FD=%d", wakeFd);
275             head.wakeFd = wakeFd;
276             auto next = head.next;
277             head.schedule();
278             head = next;
279         }
280     }
281 }
282 
283 class FiberExt : Fiber { 
284     FiberExt next;
285     uint numScheduler;
286     int wakeFd; // recieves fd that woken us up
287 
288     enum PAGESIZE = 4096;
289     
290     this(void function() fn, uint numSched) nothrow {
291         super(fn);
292         numScheduler = numSched;
293     }
294 
295     this(void delegate() dg, uint numSched) nothrow {
296         super(dg);
297         numScheduler = numSched;
298     }
299 
300     void schedule() nothrow
301     {
302         scheds[numScheduler].queue.push(this);
303     }
304 }
305 
306 FiberExt currentFiber; 
307 shared RawEvent termination; // termination event, triggered once last fiber exits
308 shared pthread_t eventLoop; // event loop, runs outside of D runtime
309 shared int alive; // count of non-terminated Fibers scheduled
310 
311 struct SchedulerBlock {
312     shared IntrusiveQueue!(FiberExt, RawEvent) queue;
313     shared uint assigned;
314     size_t[1] padding;
315 }
316 static assert(SchedulerBlock.sizeof == 64);
317 
318 package(photon) shared SchedulerBlock[] scheds;
319 
320 enum int MAX_EVENTS = 500;
321 enum int SIGNAL = 42;
322 
323 
324 package(photon) void schedulerEntry(size_t n)
325 {
326     int tid = gettid();
327     /*cpu_set_t mask;
328     CPU_SET(n, &mask);
329     sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity");
330     */
331     shared SchedulerBlock* sched = scheds.ptr + n;
332     while (alive > 0) {
333         sched.queue.event.waitAndReset();
334         for(;;) {
335             FiberExt f = sched.queue.drain();
336             if (f is null) break; // drained an empty queue, time to sleep
337             do {
338                 auto next = f.next; //save next, it will be reused on scheduling
339                 currentFiber = f;
340                 logf("Fiber %x started", cast(void*)f);
341                 try {
342                     f.call();
343                 }
344                 catch (Exception e) {
345                     stderr.writeln(e);
346                     atomicOp!"-="(alive, 1);
347                 }
348                 if (f.state == FiberExt.State.TERM) {
349                     logf("Fiber %s terminated", cast(void*)f);
350                     atomicOp!"-="(alive, 1);
351                 }
352                 f = next;
353             } while(f !is null);
354         }
355     }
356     termination.trigger();
357     foreach (ref s; scheds) {
358         s.queue.event.trigger();
359     }
360 }
361 
362 /// Convenience overload for functions
363 public void go(void function() func) {
364     go({ func(); });
365 }
366 
367 /// Setup a fiber task to run on the Photon scheduler.
368 public void go(void delegate() func) {
369     import std.random;
370     uint choice;
371     if (scheds.length == 1) choice = 0;
372     else {
373         uint a = uniform!"[)"(0, cast(uint)scheds.length);
374         uint b = uniform!"[)"(0, cast(uint)scheds.length-1);
375         if (a == b) b = cast(uint)scheds.length-1;
376         uint loadA = scheds[a].assigned;
377         uint loadB = scheds[b].assigned;
378         if (loadA < loadB) choice = a;
379         else choice = b;
380     }
381     atomicOp!"+="(scheds[choice].assigned, 1);
382     atomicOp!"+="(alive, 1);
383     auto f = new FiberExt(func, choice);
384     logf("Assigned %x -> %d / %d scheduler", cast(void*)f, choice, scheds.length);
385     f.schedule();
386 }
387 
388 shared Descriptor[] descriptors;
389 shared int event_loop_fd;
390 shared int signal_loop_fd;
391 
392 enum ReaderState: uint {
393     EMPTY = 0,
394     UNCERTAIN = 1,
395     READING = 2,
396     READY = 3
397 }
398 
399 enum WriterState: uint {
400     READY = 0,
401     UNCERTAIN = 1,
402     WRITING = 2,
403     FULL = 3
404 }
405 
406 enum DescriptorState: uint {
407     NOT_INITED,
408     INITIALIZING,
409     NONBLOCKING,
410     THREADPOOL
411 }
412 
413 // list of awaiting fibers
414 shared struct Descriptor {
415     ReaderState _readerState;   
416     AwaitingFiber* _readerWaits;
417     WriterState _writerState;
418     AwaitingFiber* _writerWaits;
419     DescriptorState state;
420 nothrow:
421     ReaderState readerState()() {
422         return atomicLoad(_readerState);
423     }
424 
425     WriterState writerState()() {
426         return atomicLoad(_writerState);
427     }
428 
429     // try to change state & return whatever it happend to be in the end
430     bool changeReader()(ReaderState from, ReaderState to) {
431         return cas(&_readerState, from, to);
432     }
433 
434     // ditto for writer
435     bool changeWriter()(WriterState from, WriterState to) {
436         return cas(&_writerState, from, to);
437     }
438 
439     //
440     shared(AwaitingFiber)* readWaiters()() {
441         return atomicLoad(_readerWaits);
442     }
443 
444     //
445     shared(AwaitingFiber)* writeWaiters()(){
446         return atomicLoad(_writerWaits);
447     }
448 
449     // try to enqueue reader fiber given old head
450     bool enqueueReader()(shared(AwaitingFiber)* fiber) {
451         auto head = readWaiters;
452         if (head == fiber) {
453             return true; // TODO: HACK
454         }
455         fiber.next = head;
456         return cas(&_readerWaits, head, fiber);
457     }
458 
459     void removeReader()(shared(AwaitingFiber)* fiber) {
460         auto head = steal(_readerWaits);
461         if (head is null || head.next is null) return;
462         head = removeFromList(head.unshared, fiber);
463         cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null);
464     }
465 
466     // try to enqueue writer fiber given old head
467     bool enqueueWriter()(shared(AwaitingFiber)* fiber) {
468         auto head = writeWaiters;
469         if (head == fiber) {
470             return true; // TODO: HACK
471         }
472         fiber.next = head;
473         return cas(&_writerWaits, head, fiber);
474     }
475 
476     void removeWriter()(shared(AwaitingFiber)* fiber) {
477         auto head = steal(_writerWaits);
478         if (head is null || head.next is null) return;
479         head = removeFromList(head.unshared, fiber);
480         cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null);
481     }
482 
483     // try to schedule readers - if fails - someone added a reader, it's now his job to check state
484     void scheduleReaders()(int wakeFd) {
485         auto w = steal(_readerWaits);
486         if (w) w.unshared.scheduleAll(wakeFd);
487     }
488 
489     // try to schedule writers, ditto
490     void scheduleWriters()(int wakeFd) {
491         auto w = steal(_writerWaits);
492         if (w) w.unshared.scheduleAll(wakeFd);
493     }
494 }
495 
496 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*)
497 {
498     version(photon_tracing) printStats();
499     _exit(9);
500 }
501 
502 version(photon_tracing) 
503 void printStats()
504 {
505     // TODO: report on various events in eventloop/scheduler
506     string msg = "Tracing report:\n\n";
507     write(2, msg.ptr, msg.length);
508 }
509 
510 shared int kq;
511 shared int eventLoopId;
512 __gshared ssize_t function (const timespec* req, const timespec* rem) libcNanosleep;
513 Timer[] timerPool; // thread-local pool of preallocated timers
514 
515 nothrow auto getSleepTimer() {
516     Timer tm;
517     if (timerPool.length == 0) {
518         tm = timer();
519     } else {
520         tm = timerPool[$-1];
521         timerPool.length = timerPool.length-1;
522     }
523     return tm;
524 }
525 
526 nothrow void freeSleepTimer(Timer tm) {
527     timerPool.assumeSafeAppend();
528     timerPool ~= tm;
529 }
530 
531 /// Delay fiber execution by `req` duration.
532 public nothrow void delay(T)(T req)
533 if (is(T : const timespec*) || is(T : Duration)) {
534     auto tm = getSleepTimer();
535     tm.wait(req);
536     freeSleepTimer(tm);
537 }
538 
539 public void startloop()
540 {
541     int threads = cast(int)sysconf(_SC_NPROCESSORS_ONLN).checked;
542     kq = kqueue();
543     enforce(kq != -1);
544     ssize_t fdMax = sysconf(_SC_OPEN_MAX).checked;
545     descriptors = (cast(shared(Descriptor*)) mmap(null, fdMax * Descriptor.sizeof, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0))[0..fdMax];
546     scheds = new SchedulerBlock[threads];
547     termination = RawEvent(0);
548     foreach(ref sched; scheds) {
549         sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0));
550     }
551     pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null);
552 }
553 
554 package(photon) void stoploop()
555 {
556     void* ret;
557     pthread_join(cast(pthread_t)eventLoop, &ret);
558 }
559 
560 extern(C) void* processEventsEntry(void*)
561 {
562     KEvent[MAX_EVENTS] ke;
563     for (;;) {
564 	    int cnt = kevent(kq, null, 0, ke.ptr, MAX_EVENTS, null);
565         enforce(cnt >= 0);
566 		for (int i = 0; i < cnt; i++) {
567             auto fd = cast(int)ke[i].ident;
568             auto filter = ke[i].filter;
569             auto descriptor = descriptors.ptr + fd;
570             if (filter == EVFILT_READ) {
571                 logf("Read event for fd=%d", fd);
572                 if (fd == termination.fds[0]) return null;
573                 auto state = descriptor.readerState;
574                 logf("read state = %d", state);                
575                 final switch(descriptor.readerState) with(ReaderState) {
576                     case EMPTY:
577                         logf("Trying to schedule readers");
578                         descriptor.changeReader(EMPTY, READY);
579                         descriptor.scheduleReaders(fd);
580                         logf("Scheduled readers");
581                         break;
582                     case UNCERTAIN:
583                         descriptor.changeReader(UNCERTAIN, READY);
584                         break;
585                     case READING:
586                         if (!descriptor.changeReader(READING, UNCERTAIN)) {
587                             if (descriptor.changeReader(EMPTY, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake readers
588                                 descriptor.scheduleReaders(fd);
589                         }
590                         break;
591                     case READY:
592                         descriptor.scheduleReaders(fd);
593                         break;
594                 }
595             }
596             if (filter == EVFILT_WRITE) {
597                 logf("Write event for fd=%d", fd);
598                 auto state = descriptor.writerState;
599                 logf("write state = %d", state);
600                 final switch(state) with(WriterState) { 
601                     case FULL:
602                         descriptor.changeWriter(FULL, READY);
603                         descriptor.scheduleWriters(fd);
604                         break;
605                     case UNCERTAIN:
606                         descriptor.changeWriter(UNCERTAIN, READY);
607                         break;
608                     case WRITING:
609                         if (!descriptor.changeWriter(WRITING, UNCERTAIN)) {
610                             if (descriptor.changeWriter(FULL, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake writers
611                                 descriptor.scheduleWriters(fd);
612                         }
613                         break;
614                     case READY:
615                         descriptor.scheduleWriters(fd);
616                         break;
617                 }
618                 logf("Awaits %x", cast(void*)descriptor.writeWaiters);
619             }
620             if (filter == EVFILT_TIMER) {
621                 size_t udata = cast(size_t)ke[i].udata;
622                 if (udata & 0x1) {
623                     auto ptr = udata & ~1;
624                     FiberExt fiber = *cast(FiberExt*)&ptr;
625                     fiber.schedule();
626                 }
627                 else {
628                     auto thread = cast(thread_act_t)udata >> 1;
629                     thread_resume(thread);
630                 }
631             }
632             descriptors[ke[i].ident].scheduleWriters(cast(int)ke[i].ident);   
633         }
634     }
635 }
636 
637 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, noop = 0xFFFFF }
638 enum SyscallKind { accept, read, write, connect }
639 
640 // intercept - a filter for file descriptor, changes flags and register on first use
641 void interceptFd(Fcntl needsFcntl)(int fd) nothrow {
642     logf("Hit interceptFD");
643     if (fd < 0 || fd >= descriptors.length) return;
644     if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) {
645         logf("First use, registering fd = %s", fd);
646         static if(needsFcntl == Fcntl.explicit) {
647             int flags = fcntl(fd, F_GETFL, 0);
648             fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked;
649             logf("Setting FCNTL. %x", cast(void*)currentFiber);
650         }
651         KEvent[2] ke;
652         ke[0].ident = fd;
653         ke[1].ident = fd;
654         ke[0].filter = EVFILT_READ;
655         ke[1].filter = EVFILT_WRITE;
656         ke[1].flags = ke[0].flags = EV_ADD | EV_ENABLE | EV_CLEAR;
657         timespec timeout;
658         timeout.tv_nsec = 1000;
659         kevent(kq, ke.ptr, 2, null, 0, &timeout);
660     }
661 }
662 
663 void deregisterFd(int fd) nothrow {
664     if(fd >= 0 && fd < descriptors.length) {
665         auto descriptor = descriptors.ptr + fd;
666         atomicStore(descriptor._writerState, WriterState.READY);
667         atomicStore(descriptor._readerState, ReaderState.EMPTY);
668         descriptor.scheduleReaders(fd);
669         descriptor.scheduleWriters(fd);
670         atomicStore(descriptor.state, DescriptorState.NOT_INITED);
671     }
672 }
673 
674 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...)
675                         (int fd, T args) nothrow {
676     if (currentFiber is null) {
677         logf("%s PASSTHROUGH FD=%s", name, fd);
678         return __syscall(ident, fd, args);
679     }
680     else {
681         logf("HOOKED %s FD=%d", name, fd);
682         interceptFd!(fcntlStyle)(fd);
683         shared(Descriptor)* descriptor = descriptors.ptr + fd;
684         if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) {
685             logf("%s syscall THREADPOLL FD=%d", name, fd);
686             //TODO: offload syscall to thread-pool
687             return __syscall(ident, fd, args);
688         }
689     L_start:
690         shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null);
691         // set flags argument if able to avoid direct fcntl calls
692         static if (fcntlStyle != Fcntl.explicit)
693         {
694             args[2] |= fcntlStyle;
695         }
696         //if (kind == SyscallKind.accept)
697         logf("kind:s args:%s", kind, args);
698         static if(kind == SyscallKind.accept || kind == SyscallKind.read) {
699             auto state = descriptor.readerState;
700             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
701             final switch (state) with (ReaderState) {
702             case EMPTY:
703                 logf("EMPTY - enqueue reader");
704                 if (!descriptor.enqueueReader(&await)) goto L_start;
705                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
706                 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd);
707                 FiberExt.yield();
708                 goto L_start;
709             case UNCERTAIN:
710                 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING
711                 goto case READING;
712             case READY:
713                 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads
714                 goto case READING;
715             case READING:
716                 ssize_t resp = __syscall(ident, fd, args);
717                 static if (kind == SyscallKind.accept) {
718                     if (resp >= 0) // for accept we never know if we emptied the queue
719                         descriptor.changeReader(READING, UNCERTAIN);
720                     else if (errno == ERR || errno == EAGAIN) {
721                         if (descriptor.changeReader(READING, EMPTY))
722                             goto case EMPTY;
723                         goto L_start; // became UNCERTAIN or READY in meantime
724                     }
725                 }
726                 else static if (kind == SyscallKind.read) {
727                     if (resp == args[1]) // length is 2nd in (buf, length, ...)
728                         descriptor.changeReader(READING, UNCERTAIN);
729                     else if(resp >= 0)
730                         descriptor.changeReader(READING, EMPTY);
731                     else if (errno == ERR || errno == EAGAIN) {
732                         if (descriptor.changeReader(READING, EMPTY))
733                             goto case EMPTY;
734                         goto L_start; // became UNCERTAIN or READY in meantime
735                     }
736                 }
737                 else
738                     static assert(0);
739                 return resp;
740             }
741         }
742         else static if(kind == SyscallKind.write || kind == SyscallKind.connect) {
743             //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking?
744             auto state = descriptor.writerState;
745             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
746             final switch (state) with (WriterState) {
747             case FULL:
748                 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber);
749                 if (!descriptor.enqueueWriter(&await)) goto L_start;
750                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
751                 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd);
752                 FiberExt.yield();
753                 goto L_start;
754             case UNCERTAIN:
755                 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber);
756                 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING
757                 goto case WRITING;
758             case READY:
759                 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes
760                 goto case WRITING;
761             case WRITING:
762                 ssize_t resp = __syscall(ident, fd, args);
763                 static if (kind == SyscallKind.connect) {
764                     if(resp >= 0) {
765                         descriptor.changeWriter(WRITING, READY);
766                     }
767                     else if (errno == ERR || errno == EALREADY) {
768                         if (descriptor.changeWriter(WRITING, FULL)) {
769                             goto case FULL;
770                         }
771                         goto L_start; // became UNCERTAIN or READY in meantime
772                     }
773                     return resp;
774                 }
775                 else {
776                     if (resp == args[1]) // (buf, len) args to syscall
777                         descriptor.changeWriter(WRITING, UNCERTAIN);
778                     else if(resp >= 0) {
779                         logf("Short-write on FD=%d, become FULL", fd);
780                         descriptor.changeWriter(WRITING, FULL);
781                     }
782                     else if (errno == ERR || errno == EAGAIN) {
783                         if (descriptor.changeWriter(WRITING, FULL)) {
784                             logf("Sudden block on FD=%d, become FULL", fd);
785                             goto case FULL;
786                         }
787                         goto L_start; // became UNCERTAIN or READY in meantime
788                     }
789                     return resp;
790                 }
791             }
792         }
793         assert(0);
794     }
795 }
796 
797 // ======================================================================================
798 // SYSCALL warappers intercepts
799 // ======================================================================================
800 nothrow:
801 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow
802 {
803     return universalSyscall!(SYS_READ, "read", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK)
804         (fd, cast(size_t)buf, count);
805 }
806 
807 extern(C) ssize_t write(int fd, const void *buf, size_t count)
808 {
809     return universalSyscall!(SYS_WRITE, "write", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
810         (fd, cast(size_t)buf, count);
811 }
812 
813 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen)
814 {
815     return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK)
816         (sockfd, cast(size_t) addr, cast(size_t) addrlen);    
817 }
818 
819 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen)
820 {
821     return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS)
822         (sockfd, cast(size_t) addr, cast(size_t) addrlen);
823 }
824 
825 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
826                       const sockaddr *dest_addr, socklen_t addrlen)
827 {
828     return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
829         (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen);
830 }
831 
832 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow {
833     sockaddr_in src_addr;
834     src_addr.sin_family = AF_INET;
835     src_addr.sin_port = 0;
836     src_addr.sin_addr.s_addr = htonl(INADDR_ANY);
837     ssize_t addrlen = sockaddr_in.sizeof;
838     return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen);   
839 }
840 
841 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
842                         sockaddr *src_addr, ssize_t* addrlen) nothrow
843 {
844     return universalSyscall!(SYS_RECVFROM, "recvfrom", SyscallKind.read, Fcntl.msg, EWOULDBLOCK)
845         (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen);
846 }
847 
848 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout)
849 {
850     bool nonBlockingCheck(ref ssize_t result) {
851         bool uncertain;
852     L_cacheloop:
853         foreach (ref fd; fds[0..nfds]) {
854             interceptFd!(Fcntl.explicit)(fd.fd);
855             fd.revents = 0;
856             auto descriptor = descriptors.ptr + fd.fd;
857             if (fd.events & POLLIN) {
858                 auto state = descriptor.readerState;
859                 logf("Found event %d for reader in select", state);
860                 switch(state) with(ReaderState) {
861                 case READY:
862                     fd.revents |=  POLLIN;
863                     break;
864                 case EMPTY:
865                     break;
866                 default:
867                     uncertain = true;
868                     break L_cacheloop;
869                 }
870             }
871             if (fd.events & POLLOUT) {
872                 auto state = descriptor.writerState;
873                 logf("Found event %d for writer in select", state);
874                 switch(state) with(WriterState) {
875                 case READY:
876                     fd.revents |= POLLOUT;
877                     break;
878                 case FULL:
879                     break;
880                 default:
881                     uncertain = true;
882                     break L_cacheloop;
883                 }
884             }
885         }
886         // fallback to system poll call if descriptor state is uncertain
887         if (uncertain) {
888             logf("Fallback to system poll, descriptors have uncertain state");
889             ssize_t p = raw_poll(fds, nfds, 0);
890             if (p != 0) {
891                 result = p;
892                 logf("Raw poll returns %d", result);
893                 return true;
894             }
895         }
896         else {
897             ssize_t j = 0;
898             foreach (i; 0..nfds) {
899                 if (fds[i].revents) {
900                     fds[j++] = fds[i];
901                 }
902             }
903             logf("Using our own event cache: %d events", j);
904             if (j > 0) {
905                 result = cast(ssize_t)j;
906                 return true;
907             }
908         }
909         return false;
910     }
911     if (currentFiber is null) {
912         logf("POLL PASSTHROUGH!");
913         return raw_poll(fds, nfds, timeout);
914     }
915     else {
916         logf("HOOKED POLL %d fds timeout %d", nfds, timeout);
917         if (nfds < 0) {
918             errno = EINVAL;
919             return -1;
920         }
921         if (nfds == 0) {
922             if (timeout == 0) return 0;
923             shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
924             Fiber.yield();
925             logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd);
926             return 0;
927         }
928         foreach(ref fd; fds[0..nfds]) {
929             if (fd.fd < 0 || fd.fd >= descriptors.length) {
930                 errno = EBADF;
931                 return -1;
932             }
933             fd.revents = 0;
934         }
935         ssize_t result = 0;
936         if (timeout <= 0) return raw_poll(fds, nfds, timeout);
937         if (nonBlockingCheck(result)) return result;
938         shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
939         foreach (i; 0..nfds) {
940             if (fds[i].events & POLLIN)
941                 descriptors[fds[i].fd].enqueueReader(&aw);
942             else if(fds[i].events & POLLOUT)
943                 descriptors[fds[i].fd].enqueueWriter(&aw);
944         }
945         Fiber.yield();
946         foreach (i; 0..nfds) {
947             if (fds[i].events & POLLIN)
948                 descriptors[fds[i].fd].removeReader(&aw);
949             else if(fds[i].events & POLLOUT)
950                 descriptors[fds[i].fd].removeWriter(&aw);
951         }
952         nonBlockingCheck(result);
953         return result;
954     }
955 }
956 
957 extern(C) private ssize_t nanosleep(const timespec* req, const timespec* rem) {
958     if (currentFiber !is null) {
959         delay(req);
960         return 0;
961     } else {
962         auto timer = getSleepTimer();
963         timer.waitThread(req);
964         thread_suspend(mach_thread_self());
965         return 0;
966     }
967 }
968 
969 extern(C) private ssize_t close(int fd) nothrow
970 {
971     logf("HOOKED CLOSE FD=%d", fd);
972     deregisterFd(fd);
973     return cast(int)__syscall(SYS_CLOSE, fd);
974 }
975 
976 
977 int gettid()
978 {
979     return cast(int)__syscall(SYS_GETTID);
980 }
981 
982 ssize_t raw_read(int fd, void *buf, size_t count) nothrow {
983     logf("Raw read on FD=%d", fd);
984     return __syscall(SYS_READ, fd, cast(ssize_t) buf, cast(ssize_t) count);
985 }
986 
987 ssize_t raw_write(int fd, const void *buf, size_t count) nothrow
988 {
989     logf("Raw write on FD=%d", fd);
990     return __syscall(SYS_WRITE, fd, cast(size_t) buf, count);
991 }
992 
993 ssize_t raw_poll(pollfd *fds, nfds_t nfds, int timeout)
994 {
995     logf("Raw poll");
996     return __syscall(SYS_POLL, cast(size_t)fds, cast(size_t) nfds, timeout);
997 }