1 module photon.macos.core;
2 version(OSX):
3 private:
4 
5 import std.stdio;
6 import std.string;
7 import std.format;
8 import std.exception;
9 import std.conv;
10 import std.array;
11 import core.thread;
12 import core.internal.spinlock;
13 import core.sys.posix.sys.types;
14 import core.sys.posix.sys.socket;
15 import core.sys.posix.poll;
16 import core.sys.posix.netinet.in_;
17 import core.sys.freebsd.unistd;
18 import core.sync.mutex;
19 import core.stdc.errno;
20 import core.stdc.signal;
21 import core.stdc.time;
22 import core.atomic;
23 import core.sys.posix.stdlib: abort;
24 import core.sys.posix.fcntl;
25 import core.memory;
26 import core.sys.posix.sys.mman;
27 import core.sys.posix.pthread;
28 
29 import photon.freebsd.support;
30 import photon.ds.common;
31 import photon.ds.intrusive_queue;
32 
33 alias quad_t = ulong;
34 alias off_t = long;
35 extern(C) nothrow off_t __syscall(quad_t number, ...);
36 extern(C) nothrow int kevent(int kq, const KEvent* changelist, int nchanges, const KEvent* eventlist, int nevent, timespec* tm);
37 
38 struct KEvent {
39     size_t    ident;	     /*	identifier for this event */
40     short     filter;	     /*	filter for event */
41     ushort    flags;	     /*	action flags for kqueue	*/
42     uint      fflags;	     /*	filter flag value */
43     long      data;	     /*	filter data value */
44     void*     udata;	     /*	opaque user data identifier */
45     ulong[4]  ext;	     /*	extensions */
46 };
47 
48 enum SYS_READ = 3;
49 enum SYS_WRITE = 4;
50 enum SYS_ACCEPT = 30;
51 enum SYS_CONNECT = 98;
52 enum SYS_SENDTO = 133;
53 enum SYS_RECVFROM = 29;
54 enum SYS_CLOSE = 6;
55 enum SYS_GETTID = 286;
56 enum SYS_POLL = 230;
57 
58 // T becomes thread-local b/c it's stolen from shared resource
59 auto steal(T)(ref shared T arg)
60 {
61     for (;;) {
62         auto v = atomicLoad(arg);
63         if(cas(&arg, v, cast(shared(T))null)) return v;
64     }
65 }
66 
67 
68 shared struct RawEvent {
69 nothrow:
70     this(int init) {
71         fd = eventfd(init, 0);
72     }
73 
74     void waitAndReset() {
75         byte[8] bytes = void;
76         ssize_t r;
77         do {
78             r = raw_read(fd, bytes.ptr, 8);
79         } while(r < 0 && errno == EINTR);
80         r.checked("event reset");
81     }
82     
83     void trigger() { 
84         union U {
85             ulong cnt;
86             ubyte[8] bytes;
87         }
88         U value;
89         value.cnt = 1;
90         ssize_t r;
91         do {
92             r = raw_write(fd, value.bytes.ptr, 8);
93         } while(r < 0 && errno == EINTR);
94         r.checked("event trigger");
95     }
96     
97     int fd;
98 }
99 /*
100 struct Timer {
101 nothrow:
102     private timer_t timer;
103 
104     static void ms2ts(timespec *ts, ulong ms)
105     {
106         ts.tv_sec = ms / 1000;
107         ts.tv_nsec = (ms % 1000) * 1000000;
108     }
109 
110     void arm(int timeout) {
111         timespec ts_timeout;
112         ms2ts(&ts_timeout, timeout); //convert miliseconds to timespec
113         itimerspec its;
114         its.it_value = ts_timeout;
115         its.it_interval.tv_sec = 0;
116         its.it_interval.tv_nsec = 0;
117         timer_settime(timer, 0, &its, null);
118     }
119 
120     void disarm() {
121         itimerspec its; // zeros
122         timer_settime(timer, 0, &its, null);
123     }
124 
125     void dispose() { 
126         timer_delete(timer).checked;
127     }
128 }
129 
130 Timer timer() {
131     int timerfd = timer_create(CLOCK_MONOTONIC, null, null).checked;
132     interceptFd!(Fcntl.noop)(timerfd);
133     return Timer(timerfd);
134 }
135 */
136 enum EventState { ready, fibers };
137 
138 shared struct Event {
139     SpinLock lock = SpinLock(SpinLock.Contention.brief);
140     EventState state = EventState.ready;
141     FiberExt awaiting;
142 
143     void reset() {
144         lock.lock();
145         auto f = awaiting.unshared;
146         awaiting = null;
147         state = EventState.ready;
148         lock.unlock();
149         while(f !is null) {
150             auto next = f.next;
151             f.schedule();
152             f = next;
153         }
154     }
155 
156     bool ready(){
157         lock.lock();
158         scope(exit) lock.unlock();
159         return state == EventState.ready;
160     }
161 
162     void await(){
163         lock.lock();
164         scope(exit) lock.unlock();
165         if (state == EventState.ready) return;
166         if (currentFiber !is null) {
167             currentFiber.next = awaiting.unshared;
168             awaiting = cast(shared)currentFiber;
169             state = EventState.fibers;
170         }
171         else abort(); //TODO: threads
172     }
173 }
174 
175 struct AwaitingFiber {
176     shared FiberExt fiber;
177     AwaitingFiber* next;
178 
179     void scheduleAll(int wakeFd) nothrow
180     {
181         auto w = &this;
182         FiberExt head;
183         // first process all AwaitingFibers since they are on stack
184         do {
185             auto fiber = steal(w.fiber);
186             if (fiber) {
187                 fiber.unshared.next = head;
188                 head = fiber.unshared;
189             }
190             w = w.next;
191         } while(w);
192         while(head) {
193             logf("Waking with FD=%d", wakeFd);
194             head.wakeFd = wakeFd;
195             head.schedule();
196             head = head.next;
197         }
198     }
199 }
200 
201 class FiberExt : Fiber { 
202     FiberExt next;
203     uint numScheduler;
204     int wakeFd; // recieves fd that woken us up
205 
206     enum PAGESIZE = 4096;
207     
208     this(void function() fn, uint numSched) nothrow {
209         super(fn);
210         numScheduler = numSched;
211     }
212 
213     this(void delegate() dg, uint numSched) nothrow {
214         super(dg);
215         numScheduler = numSched;
216     }
217 
218     void schedule() nothrow
219     {
220         scheds[numScheduler].queue.push(this);
221     }
222 }
223 
224 FiberExt currentFiber; 
225 shared RawEvent termination; // termination event, triggered once last fiber exits
226 shared pthread_t eventLoop; // event loop, runs outside of D runtime
227 shared int alive; // count of non-terminated Fibers scheduled
228 
229 struct SchedulerBlock {
230     shared IntrusiveQueue!(FiberExt, RawEvent) queue;
231     shared uint assigned;
232     size_t[2] padding;
233 }
234 static assert(SchedulerBlock.sizeof == 64);
235 
236 package(photon) shared SchedulerBlock[] scheds;
237 
238 enum int MAX_EVENTS = 500;
239 enum int SIGNAL = 42;
240 
241 extern(C) int kqueue();
242 
243 package(photon) void schedulerEntry(size_t n)
244 {
245     int tid = gettid();
246     cpu_set_t mask;
247     CPU_SET(n, &mask);
248     sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity");
249     shared SchedulerBlock* sched = scheds.ptr + n;
250     while (alive > 0) {
251         sched.queue.event.waitAndReset();
252         for(;;) {
253             FiberExt f = sched.queue.drain();
254             if (f is null) break; // drained an empty queue, time to sleep
255             do {
256                 auto next = f.next; //save next, it will be reused on scheduling
257                 currentFiber = f;
258                 logf("Fiber %x started", cast(void*)f);
259                 try {
260                     f.call();
261                 }
262                 catch (Exception e) {
263                     stderr.writeln(e);
264                     atomicOp!"-="(alive, 1);
265                 }
266                 if (f.state == FiberExt.State.TERM) {
267                     logf("Fiber %s terminated", cast(void*)f);
268                     atomicOp!"-="(alive, 1);
269                 }
270                 f = next;
271             } while(f !is null);
272         }
273     }
274     termination.trigger();
275 }
276 
277 public void go(void delegate() func) {
278     import std.random;
279     uint choice;
280     if (scheds.length == 1) choice = 0;
281     else {
282         uint a = uniform!"[)"(0, cast(uint)scheds.length);
283         uint b = uniform!"[)"(0, cast(uint)scheds.length-1);
284         if (a == b) b = cast(uint)scheds.length-1;
285         uint loadA = scheds[a].assigned;
286         uint loadB = scheds[b].assigned;
287         if (loadA < loadB) choice = a;
288         else choice = b;
289     }
290     atomicOp!"+="(scheds[choice].assigned, 1);
291     atomicOp!"+="(alive, 1);
292     auto f = new FiberExt(func, choice);
293     logf("Assigned %x -> %d scheduler", cast(void*)f, choice);
294     f.schedule();
295 }
296 
297 shared Descriptor[] descriptors;
298 shared int event_loop_fd;
299 shared int signal_loop_fd;
300 
301 enum ReaderState: uint {
302     EMPTY = 0,
303     UNCERTAIN = 1,
304     READING = 2,
305     READY = 3
306 }
307 
308 enum WriterState: uint {
309     READY = 0,
310     UNCERTAIN = 1,
311     WRITING = 2,
312     FULL = 3
313 }
314 
315 enum DescriptorState: uint {
316     NOT_INITED,
317     INITIALIZING,
318     NONBLOCKING,
319     THREADPOOL
320 }
321 
322 // list of awaiting fibers
323 shared struct Descriptor {
324     ReaderState _readerState;   
325     AwaitingFiber* _readerWaits;
326     WriterState _writerState;
327     AwaitingFiber* _writerWaits;
328     DescriptorState state;
329 nothrow:
330     ReaderState readerState()() {
331         return atomicLoad(_readerState);
332     }
333 
334     WriterState writerState()() {
335         return atomicLoad(_writerState);
336     }
337 
338     // try to change state & return whatever it happend to be in the end
339     bool changeReader()(ReaderState from, ReaderState to) {
340         return cas(&_readerState, from, to);
341     }
342 
343     // ditto for writer
344     bool changeWriter()(WriterState from, WriterState to) {
345         return cas(&_writerState, from, to);
346     }
347 
348     //
349     shared(AwaitingFiber)* readWaiters()() {
350         return atomicLoad(_readerWaits);
351     }
352 
353     //
354     shared(AwaitingFiber)* writeWaiters()(){
355         return atomicLoad(_writerWaits);
356     }
357 
358     // try to enqueue reader fiber given old head
359     bool enqueueReader()(shared(AwaitingFiber)* fiber) {
360         auto head = readWaiters;
361         if (head == fiber) {
362             return true; // TODO: HACK
363         }
364         fiber.next = head;
365         return cas(&_readerWaits, head, fiber);
366     }
367 
368     void removeReader()(shared(AwaitingFiber)* fiber) {
369         auto head = steal(_readerWaits);
370         if (head is null || head.next is null) return;
371         head = removeFromList(head.unshared, fiber);
372         cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null);
373     }
374 
375     // try to enqueue writer fiber given old head
376     bool enqueueWriter()(shared(AwaitingFiber)* fiber) {
377         auto head = writeWaiters;
378         if (head == fiber) {
379             return true; // TODO: HACK
380         }
381         fiber.next = head;
382         return cas(&_writerWaits, head, fiber);
383     }
384 
385     void removeWriter()(shared(AwaitingFiber)* fiber) {
386         auto head = steal(_writerWaits);
387         if (head is null || head.next is null) return;
388         head = removeFromList(head.unshared, fiber);
389         cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null);
390     }
391 
392     // try to schedule readers - if fails - someone added a reader, it's now his job to check state
393     void scheduleReaders()(int wakeFd) {
394         auto w = steal(_readerWaits);
395         if (w) w.unshared.scheduleAll(wakeFd);
396     }
397 
398     // try to schedule writers, ditto
399     void scheduleWriters()(int wakeFd) {
400         auto w = steal(_writerWaits);
401         if (w) w.unshared.scheduleAll(wakeFd);
402     }
403 }
404 
405 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*)
406 {
407     version(photon_tracing) printStats();
408     _exit(9);
409 }
410 
411 version(photon_tracing) 
412 void printStats()
413 {
414     // TODO: report on various events in eventloop/scheduler
415     string msg = "Tracing report:\n\n";
416     write(2, msg.ptr, msg.length);
417 }
418 
419 shared int kq;
420 shared int eventLoopId;
421 
422 public void startloop()
423 {
424     import core.cpuid;
425     uint threads = threadsPerCPU;
426     kq = kqueue();
427     enforce(kq != -1);
428     
429     pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null);
430 }
431 
432 package(photon) void stoploop()
433 {
434     void* ret;
435     pthread_join(cast(pthread_t)eventLoop, &ret);
436 }
437 
438 enum EVFILT_READ = -1;
439 enum EVFILT_WRITE = -2;
440 
441 extern(C) void* processEventsEntry(void*)
442 {
443     KEvent[MAX_EVENTS] ke;
444     for (;;) {
445 	    int cnt = kevent(kq, null, 0, ke.ptr, MAX_EVENTS, null);
446         enforce(cnt >= 0);
447 		for (int i = 0; i < cnt; i++) {
448             if (ke[i].filter == EVFILT_READ) {
449                 //auto reader = descriptors[ke[i].ident].removeReader();
450             } else if(ke[i].filter == EVFILT_WRITE) {
451                 //auto writer = descriptors[ke[i].ident].removeWriter();
452             }
453         }
454     }
455 }
456 
457 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF }
458 enum SyscallKind { accept, read, write, connect }
459 
460 // intercept - a filter for file descriptor, changes flags and register on first use
461 void interceptFd(Fcntl needsFcntl)(int fd) nothrow {
462     logf("Hit interceptFD");
463     if (fd < 0 || fd >= descriptors.length) return;
464     if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) {
465         logf("First use, registering fd = %s", fd);
466         static if(needsFcntl == Fcntl.explicit) {
467             int flags = fcntl(fd, F_GETFL, 0);
468             fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked;
469             logf("Setting FCNTL. %x", cast(void*)currentFiber);
470         }
471         KEvent ke;
472         ke.ident = fd;
473         ke.filter = (-1) | (-2);
474         ke.flags = 0x0001 | 0x0004;
475         timespec timeout;
476         timeout.tv_nsec = 1000;
477         kevent(kq, &ke, 1, null, 0, &timeout);
478     }
479 }
480 
481 void deregisterFd(int fd) nothrow {
482     if(fd >= 0 && fd < descriptors.length) {
483         auto descriptor = descriptors.ptr + fd;
484         atomicStore(descriptor._writerState, WriterState.READY);
485         atomicStore(descriptor._readerState, ReaderState.EMPTY);
486         descriptor.scheduleReaders(fd);
487         descriptor.scheduleWriters(fd);
488         atomicStore(descriptor.state, DescriptorState.NOT_INITED);
489     }
490 }
491 
492 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...)
493                         (int fd, T args) nothrow {
494     if (currentFiber is null) {
495         logf("%s PASSTHROUGH FD=%s", name, fd);
496         return __syscall(ident, fd, args).withErrorno;
497     }
498     else {
499         logf("HOOKED %s FD=%d", name, fd);
500         interceptFd!(fcntlStyle)(fd);
501         shared(Descriptor)* descriptor = descriptors.ptr + fd;
502         if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) {
503             logf("%s syscall THREADPOLL FD=%d", name, fd);
504             //TODO: offload syscall to thread-pool
505             return __syscall(ident, fd, args).withErrorno;
506         }
507     L_start:
508         shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null);
509         // set flags argument if able to avoid direct fcntl calls
510         static if (fcntlStyle != Fcntl.explicit)
511         {
512             args[2] |= fcntlStyle;
513         }
514         //if (kind == SyscallKind.accept)
515         logf("kind:s args:%s", kind, args);
516         static if(kind == SyscallKind.accept || kind == SyscallKind.read) {
517             auto state = descriptor.readerState;
518             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
519             final switch (state) with (ReaderState) {
520             case EMPTY:
521                 logf("EMPTY - enqueue reader");
522                 if (!descriptor.enqueueReader(&await)) goto L_start;
523                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
524                 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd);
525                 FiberExt.yield();
526                 goto L_start;
527             case UNCERTAIN:
528                 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING
529                 goto case READING;
530             case READY:
531                 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads
532                 goto case READING;
533             case READING:
534                 ssize_t resp = __syscall(ident, fd, args);
535                 static if (kind == SyscallKind.accept) {
536                     if (resp >= 0) // for accept we never know if we emptied the queue
537                         descriptor.changeReader(READING, UNCERTAIN);
538                     else if (resp == -ERR || resp == -EAGAIN) {
539                         if (descriptor.changeReader(READING, EMPTY))
540                             goto case EMPTY;
541                         goto L_start; // became UNCERTAIN or READY in meantime
542                     }
543                 }
544                 else static if (kind == SyscallKind.read) {
545                     if (resp == args[1]) // length is 2nd in (buf, length, ...)
546                         descriptor.changeReader(READING, UNCERTAIN);
547                     else if(resp >= 0)
548                         descriptor.changeReader(READING, EMPTY);
549                     else if (resp == -ERR || resp == -EAGAIN) {
550                         if (descriptor.changeReader(READING, EMPTY))
551                             goto case EMPTY;
552                         goto L_start; // became UNCERTAIN or READY in meantime
553                     }
554                 }
555                 else
556                     static assert(0);
557                 return withErrorno(resp);
558             }
559         }
560         else static if(kind == SyscallKind.write || kind == SyscallKind.connect) {
561             //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking?
562             auto state = descriptor.writerState;
563             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
564             final switch (state) with (WriterState) {
565             case FULL:
566                 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber);
567                 if (!descriptor.enqueueWriter(&await)) goto L_start;
568                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
569                 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd);
570                 FiberExt.yield();
571                 goto L_start;
572             case UNCERTAIN:
573                 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber);
574                 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING
575                 goto case WRITING;
576             case READY:
577                 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes
578                 goto case WRITING;
579             case WRITING:
580                 ssize_t resp = __syscall(ident, fd, args);
581                 static if (kind == SyscallKind.connect) {
582                     if(resp >= 0) {
583                         descriptor.changeWriter(WRITING, READY);
584                     }
585                     else if (resp == -ERR || resp == -EALREADY) {
586                         if (descriptor.changeWriter(WRITING, FULL)) {
587                             goto case FULL;
588                         }
589                         goto L_start; // became UNCERTAIN or READY in meantime
590                     }
591                     return withErrorno(resp);
592                 }
593                 else {
594                     if (resp == args[1]) // (buf, len) args to syscall
595                         descriptor.changeWriter(WRITING, UNCERTAIN);
596                     else if(resp >= 0) {
597                         logf("Short-write on FD=%d, become FULL", fd);
598                         descriptor.changeWriter(WRITING, FULL);
599                     }
600                     else if (resp == -ERR || resp == -EAGAIN) {
601                         if (descriptor.changeWriter(WRITING, FULL)) {
602                             logf("Sudden block on FD=%d, become FULL", fd);
603                             goto case FULL;
604                         }
605                         goto L_start; // became UNCERTAIN or READY in meantime
606                     }
607                     return withErrorno(resp);
608                 }
609             }
610         }
611         assert(0);
612     }
613 }
614 
615 // ======================================================================================
616 // SYSCALL warappers intercepts
617 // ======================================================================================
618 
619 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow
620 {
621     return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK)
622         (fd, cast(size_t)buf, count);
623 }
624 
625 extern(C) ssize_t write(int fd, const void *buf, size_t count)
626 {
627     return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
628         (fd, cast(size_t)buf, count);
629 }
630 
631 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen)
632 {
633     return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK)
634         (sockfd, cast(size_t) addr, cast(size_t) addrlen);    
635 }
636 
637 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen)
638 {
639     return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS)
640         (sockfd, cast(size_t) addr, cast(size_t) addrlen);
641 }
642 
643 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
644                       const sockaddr *dest_addr, socklen_t addrlen)
645 {
646     return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
647         (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen);
648 }
649 
650 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow {
651     sockaddr_in src_addr;
652     src_addr.sin_family = AF_INET;
653     src_addr.sin_port = 0;
654     src_addr.sin_addr.s_addr = htonl(INADDR_ANY);
655     ssize_t addrlen = sockaddr_in.sizeof;
656     return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen);   
657 }
658 
659 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
660                         sockaddr *src_addr, ssize_t* addrlen) nothrow
661 {
662     return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK)
663         (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen);
664 }
665 
666 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout)
667 {
668     bool nonBlockingCheck(ref ssize_t result) {
669         bool uncertain;
670     L_cacheloop:
671         foreach (ref fd; fds[0..nfds]) {
672             interceptFd!(Fcntl.explicit)(fd.fd);
673             fd.revents = 0;
674             auto descriptor = descriptors.ptr + fd.fd;
675             if (fd.events & POLLIN) {
676                 auto state = descriptor.readerState;
677                 logf("Found event %d for reader in select", state);
678                 switch(state) with(ReaderState) {
679                 case READY:
680                     fd.revents |=  POLLIN;
681                     break;
682                 case EMPTY:
683                     break;
684                 default:
685                     uncertain = true;
686                     break L_cacheloop;
687                 }
688             }
689             if (fd.events & POLLOUT) {
690                 auto state = descriptor.writerState;
691                 logf("Found event %d for writer in select", state);
692                 switch(state) with(WriterState) {
693                 case READY:
694                     fd.revents |= POLLOUT;
695                     break;
696                 case FULL:
697                     break;
698                 default:
699                     uncertain = true;
700                     break L_cacheloop;
701                 }
702             }
703         }
704         // fallback to system poll call if descriptor state is uncertain
705         if (uncertain) {
706             logf("Fallback to system poll, descriptors have uncertain state");
707             ssize_t p = raw_poll(fds, nfds, 0);
708             if (p != 0) {
709                 result = p;
710                 logf("Raw poll returns %d", result);
711                 return true;
712             }
713         }
714         else {
715             ssize_t j = 0;
716             foreach (i; 0..nfds) {
717                 if (fds[i].revents) {
718                     fds[j++] = fds[i];
719                 }
720             }
721             logf("Using our own event cache: %d events", j);
722             if (j > 0) {
723                 result = cast(ssize_t)j;
724                 return true;
725             }
726         }
727         return false;
728     }
729     if (currentFiber is null) {
730         logf("POLL PASSTHROUGH!");
731         return raw_poll(fds, nfds, timeout);
732     }
733     else {
734         logf("HOOKED POLL %d fds timeout %d", nfds, timeout);
735         if (nfds < 0) return -EINVAL.withErrorno;
736         if (nfds == 0) {
737             if (timeout == 0) return 0;
738             shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
739             Fiber.yield();
740             logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd);
741             return 0;
742         }
743         foreach(ref fd; fds[0..nfds]) {
744             if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno;
745             fd.revents = 0;
746         }
747         ssize_t result = 0;
748         if (timeout <= 0) return raw_poll(fds, nfds, timeout);
749         if (nonBlockingCheck(result)) return result;
750         shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
751         foreach (i; 0..nfds) {
752             if (fds[i].events & POLLIN)
753                 descriptors[fds[i].fd].enqueueReader(&aw);
754             else if(fds[i].events & POLLOUT)
755                 descriptors[fds[i].fd].enqueueWriter(&aw);
756         }
757         Fiber.yield();
758         foreach (i; 0..nfds) {
759             if (fds[i].events & POLLIN)
760                 descriptors[fds[i].fd].removeReader(&aw);
761             else if(fds[i].events & POLLOUT)
762                 descriptors[fds[i].fd].removeWriter(&aw);
763         }
764         nonBlockingCheck(result);
765         return result;
766     }
767 }
768 
769 extern(C) private ssize_t close(int fd) nothrow
770 {
771     logf("HOOKED CLOSE FD=%d", fd);
772     deregisterFd(fd);
773     return cast(int)withErrorno(__syscall(SYS_CLOSE, fd));
774 }
775 
776 
777 int gettid()
778 {
779     return cast(int)__syscall(SYS_GETTID);
780 }
781 
782 ssize_t raw_read(int fd, void *buf, size_t count) nothrow {
783     logf("Raw read on FD=%d", fd);
784     return __syscall(SYS_READ, fd, cast(ssize_t) buf, cast(ssize_t) count).withErrorno;
785 }
786 
787 ssize_t raw_write(int fd, const void *buf, size_t count) nothrow
788 {
789     logf("Raw write on FD=%d", fd);
790     return __syscall(SYS_WRITE, fd, cast(size_t) buf, count).withErrorno;
791 }
792 
793 ssize_t raw_poll(pollfd *fds, nfds_t nfds, int timeout)
794 {
795     logf("Raw poll");
796     return __syscall(SYS_POLL, cast(size_t)fds, cast(size_t) nfds, timeout).withErrorno;
797 }