1 module photon.freebsd.core;
2 version(FreeBSD):
3 private:
4 
5 import std.stdio;
6 import std.string;
7 import std.format;
8 import std.exception;
9 import std.conv;
10 import std.array;
11 import core.thread;
12 import core.internal.spinlock;
13 import core.sys.posix.sys.types;
14 import core.sys.posix.sys.socket;
15 import core.sys.posix.poll;
16 import core.sys.posix.netinet.in_;
17 import core.sys.freebsd.unistd;
18 import core.sync.mutex;
19 import core.stdc.errno;
20 import core.atomic;
21 import core.sys.posix.stdlib: abort;
22 import core.sys.posix.fcntl;
23 import core.memory;
24 import core.sys.posix.sys.mman;
25 import core.sys.posix.pthread;
26 import core.sys.linux.sys.signalfd;
27 
28 import photon.freebsd.support;
29 import photon.freebsd.syscalls;
30 import photon.ds.common;
31 import photon.ds.intrusive_queue;
32 
33 // T becomes thread-local b/c it's stolen from shared resource
34 auto steal(T)(ref shared T arg)
35 {
36     for (;;) {
37         auto v = atomicLoad(arg);
38         if(cas(&arg, v, cast(shared(T))null)) return v;
39     }
40 }
41 
42 
43 shared struct RawEvent {
44 nothrow:
45     this(int init) {
46         fd = eventfd(init, 0);
47     }
48 
49     void waitAndReset() {
50         byte[8] bytes = void;
51         ssize_t r;
52         do {
53             r = raw_read(fd, bytes.ptr, 8);
54         } while(r < 0 && errno == EINTR);
55         r.checked("event reset");
56     }
57     
58     void trigger() { 
59         union U {
60             ulong cnt;
61             ubyte[8] bytes;
62         }
63         U value;
64         value.cnt = 1;
65         ssize_t r;
66         do {
67             r = raw_write(fd, value.bytes.ptr, 8);
68         } while(r < 0 && errno == EINTR);
69         r.checked("event trigger");
70     }
71     
72     int fd;
73 }
74 
75 struct Timer {
76 nothrow:
77     private timer_t timer;
78 
79     static void ms2ts(timespec *ts, ulong ms)
80     {
81         ts.tv_sec = ms / 1000;
82         ts.tv_nsec = (ms % 1000) * 1000000;
83     }
84 
85     void arm(int timeout) {
86         timespec ts_timeout;
87         ms2ts(&ts_timeout, timeout); //convert miliseconds to timespec
88         itimerspec its;
89         its.it_value = ts_timeout;
90         its.it_interval.tv_sec = 0;
91         its.it_interval.tv_nsec = 0;
92         timer_settime(timer, 0, &its, null);
93     }
94 
95     void disarm() {
96         itimerspec its; // zeros
97         timer_settime(timer, 0, &its, null);
98     }
99 
100     void dispose() { 
101         timer_delete(timer).checked;
102     }
103 }
104 
105 Timer timer() {
106     int timerfd = timer_create(CLOCK_MONOTONIC, null, null).checked;
107     interceptFd!(Fcntl.noop)(timerfd);
108     return Timer(timerfd);
109 }
110 
111 enum EventState { ready, fibers };
112 
113 shared struct Event {
114     SpinLock lock = SpinLock(SpinLock.Contention.brief);
115     EventState state = EventState.ready;
116     FiberExt awaiting;
117 
118     void reset() {
119         lock.lock();
120         auto f = awaiting.unshared;
121         awaiting = null;
122         state = EventState.ready;
123         lock.unlock();
124         while(f !is null) {
125             auto next = f.next;
126             f.schedule();
127             f = next;
128         }
129     }
130 
131     bool ready(){
132         lock.lock();
133         scope(exit) lock.unlock();
134         return state == EventState.ready;
135     }
136 
137     void await(){
138         lock.lock();
139         scope(exit) lock.unlock();
140         if (state == EventState.ready) return;
141         if (currentFiber !is null) {
142             currentFiber.next = awaiting.unshared;
143             awaiting = cast(shared)currentFiber;
144             state = EventState.fibers;
145         }
146         else abort(); //TODO: threads
147     }
148 }
149 
150 struct AwaitingFiber {
151     shared FiberExt fiber;
152     AwaitingFiber* next;
153 
154     void scheduleAll(int wakeFd) nothrow
155     {
156         auto w = &this;
157         FiberExt head;
158         // first process all AwaitingFibers since they are on stack
159         do {
160             auto fiber = steal(w.fiber);
161             if (fiber) {
162                 fiber.unshared.next = head;
163                 head = fiber.unshared;
164             }
165             w = w.next;
166         } while(w);
167         while(head) {
168             logf("Waking with FD=%d", wakeFd);
169             head.wakeFd = wakeFd;
170             head.schedule();
171             head = head.next;
172         }
173     }
174 }
175 
176 class FiberExt : Fiber { 
177     FiberExt next;
178     uint numScheduler;
179     int wakeFd; // recieves fd that woken us up
180 
181     enum PAGESIZE = 4096;
182     
183     this(void function() fn, uint numSched) nothrow {
184         super(fn);
185         numScheduler = numSched;
186     }
187 
188     this(void delegate() dg, uint numSched) nothrow {
189         super(dg);
190         numScheduler = numSched;
191     }
192 
193     void schedule() nothrow
194     {
195         scheds[numScheduler].queue.push(this);
196     }
197 }
198 
199 FiberExt currentFiber; 
200 shared RawEvent termination; // termination event, triggered once last fiber exits
201 shared pthread_t eventLoop; // event loop, runs outside of D runtime
202 shared int alive; // count of non-terminated Fibers scheduled
203 
204 struct SchedulerBlock {
205     shared IntrusiveQueue!(FiberExt, RawEvent) queue;
206     shared uint assigned;
207     size_t[2] padding;
208 }
209 static assert(SchedulerBlock.sizeof == 64);
210 
211 package(photon) shared SchedulerBlock[] scheds;
212 
213 enum int MAX_EVENTS = 500;
214 enum int SIGNAL = 42;
215 
216 struct kevent {
217     size_t    ident;	     /*	identifier for this event */
218     short     filter;	     /*	filter for event */
219     ushort    flags;	     /*	action flags for kqueue	*/
220     uint      fflags;	     /*	filter flag value */
221     long      data;	     /*	filter data value */
222     void*     udata;	     /*	opaque user data identifier */
223     ulong[4]  ext;	     /*	extensions */
224 };
225 
226 package(photon) void schedulerEntry(size_t n)
227 {
228     int tid = gettid();
229     cpu_set_t mask;
230     CPU_SET(n, &mask);
231     sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity");
232     shared SchedulerBlock* sched = scheds.ptr + n;
233     while (alive > 0) {
234         sched.queue.event.waitAndReset();
235         for(;;) {
236             FiberExt f = sched.queue.drain();
237             if (f is null) break; // drained an empty queue, time to sleep
238             do {
239                 auto next = f.next; //save next, it will be reused on scheduling
240                 currentFiber = f;
241                 logf("Fiber %x started", cast(void*)f);
242                 try {
243                     f.call();
244                 }
245                 catch (Exception e) {
246                     stderr.writeln(e);
247                     atomicOp!"-="(alive, 1);
248                 }
249                 if (f.state == FiberExt.State.TERM) {
250                     logf("Fiber %s terminated", cast(void*)f);
251                     atomicOp!"-="(alive, 1);
252                 }
253                 f = next;
254             } while(f !is null);
255         }
256     }
257     termination.trigger();
258 }
259 
260 public void go(void delegate() func) {
261     import std.random;
262     uint choice;
263     if (scheds.length == 1) choice = 0;
264     else {
265         uint a = uniform!"[)"(0, cast(uint)scheds.length);
266         uint b = uniform!"[)"(0, cast(uint)scheds.length-1);
267         if (a == b) b = cast(uint)scheds.length-1;
268         uint loadA = scheds[a].assigned;
269         uint loadB = scheds[b].assigned;
270         if (loadA < loadB) choice = a;
271         else choice = b;
272     }
273     atomicOp!"+="(scheds[choice].assigned, 1);
274     atomicOp!"+="(alive, 1);
275     auto f = new FiberExt(func, choice);
276     logf("Assigned %x -> %d scheduler", cast(void*)f, choice);
277     f.schedule();
278 }
279 
280 shared Descriptor[] descriptors;
281 shared int event_loop_fd;
282 shared int signal_loop_fd;
283 
284 enum ReaderState: uint {
285     EMPTY = 0,
286     UNCERTAIN = 1,
287     READING = 2,
288     READY = 3
289 }
290 
291 enum WriterState: uint {
292     READY = 0,
293     UNCERTAIN = 1,
294     WRITING = 2,
295     FULL = 3
296 }
297 
298 enum DescriptorState: uint {
299     NOT_INITED,
300     INITIALIZING,
301     NONBLOCKING,
302     THREADPOOL
303 }
304 
305 // list of awaiting fibers
306 shared struct Descriptor {
307     ReaderState _readerState;   
308     AwaitingFiber* _readerWaits;
309     WriterState _writerState;
310     AwaitingFiber* _writerWaits;
311     DescriptorState state;
312 nothrow:
313     ReaderState readerState()() {
314         return atomicLoad(_readerState);
315     }
316 
317     WriterState writerState()() {
318         return atomicLoad(_writerState);
319     }
320 
321     // try to change state & return whatever it happend to be in the end
322     bool changeReader()(ReaderState from, ReaderState to) {
323         return cas(&_readerState, from, to);
324     }
325 
326     // ditto for writer
327     bool changeWriter()(WriterState from, WriterState to) {
328         return cas(&_writerState, from, to);
329     }
330 
331     //
332     shared(AwaitingFiber)* readWaiters()() {
333         return atomicLoad(_readerWaits);
334     }
335 
336     //
337     shared(AwaitingFiber)* writeWaiters()(){
338         return atomicLoad(_writerWaits);
339     }
340 
341     // try to enqueue reader fiber given old head
342     bool enqueueReader()(shared(AwaitingFiber)* fiber) {
343         auto head = readWaiters;
344         if (head == fiber) {
345             return true; // TODO: HACK
346         }
347         fiber.next = head;
348         return cas(&_readerWaits, head, fiber);
349     }
350 
351     void removeReader()(shared(AwaitingFiber)* fiber) {
352         auto head = steal(_readerWaits);
353         if (head is null || head.next is null) return;
354         head = removeFromList(head.unshared, fiber);
355         cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null);
356     }
357 
358     // try to enqueue writer fiber given old head
359     bool enqueueWriter()(shared(AwaitingFiber)* fiber) {
360         auto head = writeWaiters;
361         if (head == fiber) {
362             return true; // TODO: HACK
363         }
364         fiber.next = head;
365         return cas(&_writerWaits, head, fiber);
366     }
367 
368     void removeWriter()(shared(AwaitingFiber)* fiber) {
369         auto head = steal(_writerWaits);
370         if (head is null || head.next is null) return;
371         head = removeFromList(head.unshared, fiber);
372         cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null);
373     }
374 
375     // try to schedule readers - if fails - someone added a reader, it's now his job to check state
376     void scheduleReaders()(int wakeFd) {
377         auto w = steal(_readerWaits);
378         if (w) w.unshared.scheduleAll(wakeFd);
379     }
380 
381     // try to schedule writers, ditto
382     void scheduleWriters()(int wakeFd) {
383         auto w = steal(_writerWaits);
384         if (w) w.unshared.scheduleAll(wakeFd);
385     }
386 }
387 
388 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*)
389 {
390     version(photon_tracing) printStats();
391     _exit(9);
392 }
393 
394 version(photon_tracing) 
395 void printStats()
396 {
397     // TODO: report on various events in eventloop/scheduler
398     string msg = "Tracing report:\n\n";
399     write(2, msg.ptr, msg.length);
400 }
401 
402 shared int kq;
403 
404 public void startloop()
405 {
406     import core.cpuid;
407     uint threads = threadsPerCPU;
408     kq = kqueue();
409     enforce(kq != -1);
410     
411     eventLoop = pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null);
412 }
413 
414 package(photon) void stoploop()
415 {
416     void* ret;
417     pthread_join(eventLoop, &ret);
418 }
419 
420 extern(C) void* processEventsEntry(void*)
421 {
422     kevent ke;
423     for (;;) {
424 	if (kevent(kq, null, 0, &ke, 1, null) != -1) {
425 		logf("A write occured on the file");
426 	}
427     }
428 }
429 
430 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF }
431 enum SyscallKind { accept, read, write, connect }
432 
433 // intercept - a filter for file descriptor, changes flags and register on first use
434 void interceptFd(Fcntl needsFcntl)(int fd) nothrow {
435     logf("Hit interceptFD");
436     if (fd < 0 || fd >= descriptors.length) return;
437     if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) {
438         logf("First use, registering fd = %s", fd);
439         static if(needsFcntl == Fcntl.explicit) {
440             int flags = fcntl(fd, F_GETFL, 0);
441             fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked;
442             logf("Setting FCNTL. %x", cast(void*)currentFiber);
443         }
444 	kevent ke;
445 	ke.ident = fd;
446 	ke.filter = EVFILT_READ | EVFILT_WRITE;
447 	ke.flags = EV_ADD | EV_ENABLE;
448 	timespec timeout;
449 	timeout.tv_nsec = 1000;
450 	enforce(kevent(kq, null, 0, &ke, 1, &timespec) >= 0);
451     }
452 }
453 
454 void deregisterFd(int fd) nothrow {
455     if(fd >= 0 && fd < descriptors.length) {
456         auto descriptor = descriptors.ptr + fd;
457         atomicStore(descriptor._writerState, WriterState.READY);
458         atomicStore(descriptor._readerState, ReaderState.EMPTY);
459         descriptor.scheduleReaders(fd);
460         descriptor.scheduleWriters(fd);
461         atomicStore(descriptor.state, DescriptorState.NOT_INITED);
462     }
463 }
464 
465 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...)
466                         (int fd, T args) nothrow {
467     if (currentFiber is null) {
468         logf("%s PASSTHROUGH FD=%s", name, fd);
469         return syscall(ident, fd, args).withErrorno;
470     }
471     else {
472         logf("HOOKED %s FD=%d", name, fd);
473         interceptFd!(fcntlStyle)(fd);
474         shared(Descriptor)* descriptor = descriptors.ptr + fd;
475         if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) {
476             logf("%s syscall THREADPOLL FD=%d", name, fd);
477             //TODO: offload syscall to thread-pool
478             return syscall(ident, fd, args).withErrorno;
479         }
480     L_start:
481         shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null);
482         // set flags argument if able to avoid direct fcntl calls
483         static if (fcntlStyle != Fcntl.explicit)
484         {
485             args[2] |= fcntlStyle;
486         }
487         //if (kind == SyscallKind.accept)
488         logf("kind:s args:%s", kind, args);
489         static if(kind == SyscallKind.accept || kind == SyscallKind.read) {
490             auto state = descriptor.readerState;
491             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
492             final switch (state) with (ReaderState) {
493             case EMPTY:
494                 logf("EMPTY - enqueue reader");
495                 if (!descriptor.enqueueReader(&await)) goto L_start;
496                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
497                 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd);
498                 FiberExt.yield();
499                 goto L_start;
500             case UNCERTAIN:
501                 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING
502                 goto case READING;
503             case READY:
504                 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads
505                 goto case READING;
506             case READING:
507                 ssize_t resp = syscall(ident, fd, args);
508                 static if (kind == SyscallKind.accept) {
509                     if (resp >= 0) // for accept we never know if we emptied the queue
510                         descriptor.changeReader(READING, UNCERTAIN);
511                     else if (resp == -ERR || resp == -EAGAIN) {
512                         if (descriptor.changeReader(READING, EMPTY))
513                             goto case EMPTY;
514                         goto L_start; // became UNCERTAIN or READY in meantime
515                     }
516                 }
517                 else static if (kind == SyscallKind.read) {
518                     if (resp == args[1]) // length is 2nd in (buf, length, ...)
519                         descriptor.changeReader(READING, UNCERTAIN);
520                     else if(resp >= 0)
521                         descriptor.changeReader(READING, EMPTY);
522                     else if (resp == -ERR || resp == -EAGAIN) {
523                         if (descriptor.changeReader(READING, EMPTY))
524                             goto case EMPTY;
525                         goto L_start; // became UNCERTAIN or READY in meantime
526                     }
527                 }
528                 else
529                     static assert(0);
530                 return withErrorno(resp);
531             }
532         }
533         else static if(kind == SyscallKind.write || kind == SyscallKind.connect) {
534             //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking?
535             auto state = descriptor.writerState;
536             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
537             final switch (state) with (WriterState) {
538             case FULL:
539                 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber);
540                 if (!descriptor.enqueueWriter(&await)) goto L_start;
541                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
542                 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd);
543                 FiberExt.yield();
544                 goto L_start;
545             case UNCERTAIN:
546                 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber);
547                 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING
548                 goto case WRITING;
549             case READY:
550                 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes
551                 goto case WRITING;
552             case WRITING:
553                 ssize_t resp = syscall(ident, fd, args);
554                 static if (kind == SyscallKind.connect) {
555                     if(resp >= 0) {
556                         descriptor.changeWriter(WRITING, READY);
557                     }
558                     else if (resp == -ERR || resp == -EALREADY) {
559                         if (descriptor.changeWriter(WRITING, FULL)) {
560                             goto case FULL;
561                         }
562                         goto L_start; // became UNCERTAIN or READY in meantime
563                     }
564                     return withErrorno(resp);
565                 }
566                 else {
567                     if (resp == args[1]) // (buf, len) args to syscall
568                         descriptor.changeWriter(WRITING, UNCERTAIN);
569                     else if(resp >= 0) {
570                         logf("Short-write on FD=%d, become FULL", fd);
571                         descriptor.changeWriter(WRITING, FULL);
572                     }
573                     else if (resp == -ERR || resp == -EAGAIN) {
574                         if (descriptor.changeWriter(WRITING, FULL)) {
575                             logf("Sudden block on FD=%d, become FULL", fd);
576                             goto case FULL;
577                         }
578                         goto L_start; // became UNCERTAIN or READY in meantime
579                     }
580                     return withErrorno(resp);
581                 }
582             }
583         }
584         assert(0);
585     }
586 }
587 
588 // ======================================================================================
589 // SYSCALL warappers intercepts
590 // ======================================================================================
591 
592 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow
593 {
594     return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK)
595         (fd, cast(size_t)buf, count);
596 }
597 
598 extern(C) ssize_t write(int fd, const void *buf, size_t count)
599 {
600     return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
601         (fd, cast(size_t)buf, count);
602 }
603 
604 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen)
605 {
606     return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK)
607         (sockfd, cast(size_t) addr, cast(size_t) addrlen);    
608 }
609 
610 extern(C) ssize_t accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags)
611 {
612     return universalSyscall!(SYS_ACCEPT4, "accept4", SyscallKind.accept, Fcntl.sock, EWOULDBLOCK)
613         (sockfd, cast(size_t) addr, cast(size_t) addrlen, flags);
614 }
615 
616 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen)
617 {
618     return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS)
619         (sockfd, cast(size_t) addr, cast(size_t) addrlen);
620 }
621 
622 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
623                       const sockaddr *dest_addr, socklen_t addrlen)
624 {
625     return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
626         (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen);
627 }
628 
629 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow {
630     sockaddr_in src_addr;
631     src_addr.sin_family = AF_INET;
632     src_addr.sin_port = 0;
633     src_addr.sin_addr.s_addr = htonl(INADDR_ANY);
634     ssize_t addrlen = sockaddr_in.sizeof;
635     return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen);   
636 }
637 
638 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
639                         sockaddr *src_addr, ssize_t* addrlen) nothrow
640 {
641     return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK)
642         (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen);
643 }
644 
645 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout)
646 {
647     bool nonBlockingCheck(ref ssize_t result) {
648         bool uncertain;
649     L_cacheloop:
650         foreach (ref fd; fds[0..nfds]) {
651             interceptFd!(Fcntl.explicit)(fd.fd);
652             fd.revents = 0;
653             auto descriptor = descriptors.ptr + fd.fd;
654             if (fd.events & POLLIN) {
655                 auto state = descriptor.readerState;
656                 logf("Found event %d for reader in select", state);
657                 switch(state) with(ReaderState) {
658                 case READY:
659                     fd.revents |=  POLLIN;
660                     break;
661                 case EMPTY:
662                     break;
663                 default:
664                     uncertain = true;
665                     break L_cacheloop;
666                 }
667             }
668             if (fd.events & POLLOUT) {
669                 auto state = descriptor.writerState;
670                 logf("Found event %d for writer in select", state);
671                 switch(state) with(WriterState) {
672                 case READY:
673                     fd.revents |= POLLOUT;
674                     break;
675                 case FULL:
676                     break;
677                 default:
678                     uncertain = true;
679                     break L_cacheloop;
680                 }
681             }
682         }
683         // fallback to system poll call if descriptor state is uncertain
684         if (uncertain) {
685             logf("Fallback to system poll, descriptors have uncertain state");
686             ssize_t p = raw_poll(fds, nfds, 0);
687             if (p != 0) {
688                 result = p;
689                 logf("Raw poll returns %d", result);
690                 return true;
691             }
692         }
693         else {
694             ssize_t j = 0;
695             foreach (i; 0..nfds) {
696                 if (fds[i].revents) {
697                     fds[j++] = fds[i];
698                 }
699             }
700             logf("Using our own event cache: %d events", j);
701             if (j > 0) {
702                 result = cast(ssize_t)j;
703                 return true;
704             }
705         }
706         return false;
707     }
708     if (currentFiber is null) {
709         logf("POLL PASSTHROUGH!");
710         return raw_poll(fds, nfds, timeout);
711     }
712     else {
713         logf("HOOKED POLL %d fds timeout %d", nfds, timeout);
714         if (nfds < 0) return -EINVAL.withErrorno;
715         if (nfds == 0) {
716             if (timeout == 0) return 0;
717             shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
718             Timer tm = timer();
719             descriptors[tm.fd].enqueueReader(&aw);
720             scope(exit) tm.dispose();
721             tm.arm(timeout);
722             logf("Timer fd=%d", tm.fd);
723             Fiber.yield();
724             logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd);
725             return 0;
726         }
727         foreach(ref fd; fds[0..nfds]) {
728             if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno;
729             fd.revents = 0;
730         }
731         ssize_t result = 0;
732         if (timeout <= 0) return raw_poll(fds, nfds, timeout);
733         if (nonBlockingCheck(result)) return result;
734         shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
735         foreach (i; 0..nfds) {
736             if (fds[i].events & POLLIN)
737                 descriptors[fds[i].fd].enqueueReader(&aw);
738             else if(fds[i].events & POLLOUT)
739                 descriptors[fds[i].fd].enqueueWriter(&aw);
740         }
741         Timer tm = timer();
742         scope(exit) tm.dispose();
743         tm.arm(timeout);
744         descriptors[tm.fd].enqueueReader(&aw);
745         Fiber.yield();
746         tm.disarm();
747         atomicStore(descriptors[tm.fd]._readerWaits, cast(shared(AwaitingFiber)*)null);
748         foreach (i; 0..nfds) {
749             if (fds[i].events & POLLIN)
750                 descriptors[fds[i].fd].removeReader(&aw);
751             else if(fds[i].events & POLLOUT)
752                 descriptors[fds[i].fd].removeWriter(&aw);
753         }
754         logf("Woke up after select %x. WakeFD=%d", cast(void*)currentFiber, currentFiber.wakeFd);
755         if (currentFiber.wakeFd == tm.fd) return 0;
756         else {
757             nonBlockingCheck(result);
758             return result;
759         }
760     }
761 }
762 
763 extern(C) private ssize_t close(int fd) nothrow
764 {
765     logf("HOOKED CLOSE FD=%d", fd);
766     deregisterFd(fd);
767     return cast(int)withErrorno(syscall(SYS_CLOSE, fd));
768 }