1 module photon.linux.core;
2 version(linux):
3 private:
4 
5 import std.stdio;
6 import std.string;
7 import std.format;
8 import std.exception;
9 import std.conv;
10 import std.array;
11 import core.thread;
12 import core.internal.spinlock;
13 import core.sys.posix.sys.types;
14 import core.sys.posix.sys.socket;
15 import core.sys.posix.poll;
16 import core.sys.posix.netinet.in_;
17 import core.sys.posix.unistd;
18 import core.sys.linux.epoll;
19 import core.sys.linux.timerfd;
20 import core.sync.mutex;
21 import core.stdc.errno;
22 import core.atomic;
23 import core.sys.posix.stdlib: abort;
24 import core.sys.posix.fcntl;
25 import core.memory;
26 import core.sys.posix.sys.mman;
27 import core.sys.posix.pthread;
28 import core.sys.linux.sys.signalfd;
29 
30 import photon.linux.support;
31 import photon.linux.syscalls;
32 import photon.ds.common;
33 import photon.ds.intrusive_queue;
34 
35 // T becomes thread-local b/c it's stolen from shared resource
36 auto steal(T)(ref shared T arg)
37 {
38     for (;;) {
39         auto v = atomicLoad(arg);
40         if(cas(&arg, v, cast(shared(T))null)) return v;
41     }
42 }
43 
44 
45 shared struct RawEvent {
46 nothrow:
47     this(int init) {
48         fd = eventfd(init, 0);
49     }
50 
51     void waitAndReset() {
52         byte[8] bytes = void;
53         ssize_t r;
54         do {
55             r = raw_read(fd, bytes.ptr, 8);
56         } while(r < 0 && errno == EINTR);
57         r.checked("event reset");
58     }
59     
60     void trigger() { 
61         union U {
62             ulong cnt;
63             ubyte[8] bytes;
64         }
65         U value;
66         value.cnt = 1;
67         ssize_t r;
68         do {
69             r = raw_write(fd, value.bytes.ptr, 8);
70         } while(r < 0 && errno == EINTR);
71         r.checked("event trigger");
72     }
73     
74     int fd;
75 }
76 
77 struct Timer {
78 nothrow:
79     private int timerfd;
80 
81 
82     int fd() {
83         return timerfd;
84     }
85 
86     static void ms2ts(timespec *ts, ulong ms)
87     {
88         ts.tv_sec = ms / 1000;
89         ts.tv_nsec = (ms % 1000) * 1000000;
90     }
91 
92     void arm(int timeout) {
93         timespec ts_timeout;
94         ms2ts(&ts_timeout, timeout); //convert miliseconds to timespec
95         itimerspec its;
96         its.it_value = ts_timeout;
97         its.it_interval.tv_sec = 0;
98         its.it_interval.tv_nsec = 0;
99         timerfd_settime(timerfd, 0, &its, null);
100     }
101 
102     void disarm() {
103         itimerspec its; // zeros
104         timerfd_settime(timerfd, 0, &its, null);
105     }
106 
107     void dispose() { 
108         close(timerfd).checked;
109     }
110 }
111 
112 Timer timer() {
113     int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC).checked;
114     interceptFd!(Fcntl.noop)(timerfd);
115     return Timer(timerfd);
116 }
117 
118 enum EventState { ready, fibers };
119 
120 shared struct Event {
121     SpinLock lock = SpinLock(SpinLock.Contention.brief);
122     EventState state = EventState.ready;
123     FiberExt awaiting;
124 
125     void reset() {
126         lock.lock();
127         auto f = awaiting.unshared;
128         awaiting = null;
129         state = EventState.ready;
130         lock.unlock();
131         while(f !is null) {
132             auto next = f.next;
133             f.schedule();
134             f = next;
135         }
136     }
137 
138     bool ready(){
139         lock.lock();
140         scope(exit) lock.unlock();
141         return state == EventState.ready;
142     }
143 
144     void await(){
145         lock.lock();
146         scope(exit) lock.unlock();
147         if (state == EventState.ready) return;
148         if (currentFiber !is null) {
149             currentFiber.next = awaiting.unshared;
150             awaiting = cast(shared)currentFiber;
151             state = EventState.fibers;
152         }
153         else abort(); //TODO: threads
154     }
155 }
156 
157 struct AwaitingFiber {
158     shared FiberExt fiber;
159     AwaitingFiber* next;
160 
161     void scheduleAll(int wakeFd) nothrow
162     {
163         auto w = &this;
164         FiberExt head;
165         // first process all AwaitingFibers since they are on stack
166         do {
167             auto fiber = steal(w.fiber);
168             if (fiber) {
169                 fiber.unshared.next = head;
170                 head = fiber.unshared;
171             }
172             w = w.next;
173         } while(w);
174         while(head) {
175             logf("Waking with FD=%d", wakeFd);
176             head.wakeFd = wakeFd;
177             head.schedule();
178             head = head.next;
179         }
180     }
181 }
182 
183 class FiberExt : Fiber { 
184     FiberExt next;
185     uint numScheduler;
186     int wakeFd; // recieves fd that woken us up
187 
188     enum PAGESIZE = 4096;
189     
190     this(void function() fn, uint numSched) nothrow {
191         super(fn);
192         numScheduler = numSched;
193     }
194 
195     this(void delegate() dg, uint numSched) nothrow {
196         super(dg);
197         numScheduler = numSched;
198     }
199 
200     void schedule() nothrow
201     {
202         scheds[numScheduler].queue.push(this);
203     }
204 }
205 
206 FiberExt currentFiber; 
207 shared RawEvent termination; // termination event, triggered once last fiber exits
208 shared pthread_t eventLoop; // event loop, runs outside of D runtime
209 shared int alive; // count of non-terminated Fibers scheduled
210 
211 struct SchedulerBlock {
212     shared IntrusiveQueue!(FiberExt, RawEvent) queue;
213     shared uint assigned;
214     size_t[2] padding;
215 }
216 static assert(SchedulerBlock.sizeof == 64);
217 
218 package(photon) shared SchedulerBlock[] scheds;
219 
220 enum int MAX_EVENTS = 500;
221 enum int SIGNAL = 42;
222 
223 package(photon) void schedulerEntry(size_t n)
224 {
225     int tid = gettid();
226     cpu_set_t mask;
227     CPU_SET(n, &mask);
228     sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity");
229     shared SchedulerBlock* sched = scheds.ptr + n;
230     while (alive > 0) {
231         sched.queue.event.waitAndReset();
232         for(;;) {
233             FiberExt f = sched.queue.drain();
234             if (f is null) break; // drained an empty queue, time to sleep
235             do {
236                 auto next = f.next; //save next, it will be reused on scheduling
237                 currentFiber = f;
238                 logf("Fiber %x started", cast(void*)f);
239                 try {
240                     f.call();
241                 }
242                 catch (Exception e) {
243                     stderr.writeln(e);
244                     atomicOp!"-="(alive, 1);
245                 }
246                 if (f.state == FiberExt.State.TERM) {
247                     logf("Fiber %s terminated", cast(void*)f);
248                     atomicOp!"-="(alive, 1);
249                 }
250                 f = next;
251             } while(f !is null);
252         }
253     }
254     termination.trigger();
255 }
256 
257 public void go(void delegate() func) {
258     import std.random;
259     uint choice;
260     if (scheds.length == 1) choice = 0;
261     else {
262         uint a = uniform!"[)"(0, cast(uint)scheds.length);
263         uint b = uniform!"[)"(0, cast(uint)scheds.length-1);
264         if (a == b) b = cast(uint)scheds.length-1;
265         uint loadA = scheds[a].assigned;
266         uint loadB = scheds[b].assigned;
267         if (loadA < loadB) choice = a;
268         else choice = b;
269     }
270     atomicOp!"+="(scheds[choice].assigned, 1);
271     atomicOp!"+="(alive, 1);
272     auto f = new FiberExt(func, choice);
273     logf("Assigned %x -> %d scheduler", cast(void*)f, choice);
274     f.schedule();
275 }
276 
277 shared Descriptor[] descriptors;
278 shared int event_loop_fd;
279 shared int signal_loop_fd;
280 
281 enum ReaderState: uint {
282     EMPTY = 0,
283     UNCERTAIN = 1,
284     READING = 2,
285     READY = 3
286 }
287 
288 enum WriterState: uint {
289     READY = 0,
290     UNCERTAIN = 1,
291     WRITING = 2,
292     FULL = 3
293 }
294 
295 enum DescriptorState: uint {
296     NOT_INITED,
297     INITIALIZING,
298     NONBLOCKING,
299     THREADPOOL
300 }
301 
302 // list of awaiting fibers
303 shared struct Descriptor {
304     ReaderState _readerState;   
305     AwaitingFiber* _readerWaits;
306     WriterState _writerState;
307     AwaitingFiber* _writerWaits;
308     DescriptorState state;
309 nothrow:
310     ReaderState readerState()() {
311         return atomicLoad(_readerState);
312     }
313 
314     WriterState writerState()() {
315         return atomicLoad(_writerState);
316     }
317 
318     // try to change state & return whatever it happend to be in the end
319     bool changeReader()(ReaderState from, ReaderState to) {
320         return cas(&_readerState, from, to);
321     }
322 
323     // ditto for writer
324     bool changeWriter()(WriterState from, WriterState to) {
325         return cas(&_writerState, from, to);
326     }
327 
328     //
329     shared(AwaitingFiber)* readWaiters()() {
330         return atomicLoad(_readerWaits);
331     }
332 
333     //
334     shared(AwaitingFiber)* writeWaiters()(){
335         return atomicLoad(_writerWaits);
336     }
337 
338     // try to enqueue reader fiber given old head
339     bool enqueueReader()(shared(AwaitingFiber)* fiber) {
340         auto head = readWaiters;
341         if (head == fiber) {
342             return true; // TODO: HACK
343         }
344         fiber.next = head;
345         return cas(&_readerWaits, head, fiber);
346     }
347 
348     void removeReader()(shared(AwaitingFiber)* fiber) {
349         auto head = steal(_readerWaits);
350         if (head is null || head.next is null) return;
351         head = removeFromList(head.unshared, fiber);
352         cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null);
353     }
354 
355     // try to enqueue writer fiber given old head
356     bool enqueueWriter()(shared(AwaitingFiber)* fiber) {
357         auto head = writeWaiters;
358         if (head == fiber) {
359             return true; // TODO: HACK
360         }
361         fiber.next = head;
362         return cas(&_writerWaits, head, fiber);
363     }
364 
365     void removeWriter()(shared(AwaitingFiber)* fiber) {
366         auto head = steal(_writerWaits);
367         if (head is null || head.next is null) return;
368         head = removeFromList(head.unshared, fiber);
369         cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null);
370     }
371 
372     // try to schedule readers - if fails - someone added a reader, it's now his job to check state
373     void scheduleReaders()(int wakeFd) {
374         auto w = steal(_readerWaits);
375         if (w) w.unshared.scheduleAll(wakeFd);
376     }
377 
378     // try to schedule writers, ditto
379     void scheduleWriters()(int wakeFd) {
380         auto w = steal(_writerWaits);
381         if (w) w.unshared.scheduleAll(wakeFd);
382     }
383 }
384 
385 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*)
386 {
387     version(photon_tracing) printStats();
388     _exit(9);
389 }
390 
391 version(photon_tracing) 
392 void printStats()
393 {
394     // TODO: report on various events in eventloop/scheduler
395     string msg = "Tracing report:\n\n";
396     write(2, msg.ptr, msg.length);
397 }
398 
399 public void startloop()
400 {
401     import core.cpuid;
402     uint threads = threadsPerCPU;
403 
404     event_loop_fd = cast(int)epoll_create1(0).checked("ERROR: Failed to create event-loop!");
405     // use RT signals, disable default termination on signal received
406     sigset_t mask;
407     sigemptyset(&mask);
408     sigaddset(&mask, SIGNAL);
409     pthread_sigmask(SIG_BLOCK, &mask, null).checked;
410     signal_loop_fd = cast(int)signalfd(-1, &mask, 0).checked("ERROR: Failed to create signalfd!");
411 
412     epoll_event event;
413     event.events = EPOLLIN;
414     event.data.fd = signal_loop_fd;
415     epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, signal_loop_fd, &event).checked;
416 
417     termination = RawEvent(0);
418     event.events = EPOLLIN;
419     event.data.fd = termination.fd;
420     epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, termination.fd, &event).checked;
421 
422     {
423         
424         sigaction_t action;
425         action.sa_sigaction = &graceful_shutdown_on_signal;
426         sigaction(SIGTERM, &action, null).checked;
427     }
428 
429     ssize_t fdMax = sysconf(_SC_OPEN_MAX).checked;
430     descriptors = (cast(shared(Descriptor*)) mmap(null, fdMax * Descriptor.sizeof, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0))[0..fdMax];
431     scheds = new SchedulerBlock[threads];
432     foreach(ref sched; scheds) {
433         sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0));
434     }
435     eventLoop = pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null);
436 }
437 
438 package(photon) void stoploop()
439 {
440     void* ret;
441     pthread_join(eventLoop, &ret);
442 }
443 
444 extern(C) void* processEventsEntry(void*)
445 {
446     for (;;) {
447         epoll_event[MAX_EVENTS] events = void;
448         signalfd_siginfo[20] fdsi = void;
449         int r;
450         do {
451             r = epoll_wait(event_loop_fd, events.ptr, MAX_EVENTS, -1);
452         } while (r < 0 && errno == EINTR);
453         checked(r);
454         for (int n = 0; n < r; n++) {
455             int fd = events[n].data.fd;
456             if (fd == termination.fd) {
457                 foreach(ref s; scheds) s.queue.event.trigger();
458                 return null;
459             }
460             else if (fd == signal_loop_fd) {
461                 logf("Intercepted our aio SIGNAL");
462                 ssize_t r2 = raw_read(signal_loop_fd, &fdsi, fdsi.sizeof);
463                 logf("aio events = %d", r2 / signalfd_siginfo.sizeof);
464                 if (r2 % signalfd_siginfo.sizeof != 0)
465                     checked(r2, "ERROR: failed read on signalfd");
466 
467                 for(int i = 0; i < r2 / signalfd_siginfo.sizeof; i++) { //TODO: stress test multiple signals
468                     logf("Processing aio event idx = %d", i);
469                     if (fdsi[i].ssi_signo == SIGNAL) {
470                         logf("HIT our SIGNAL");
471                         auto fiber = cast(FiberExt)cast(void*)fdsi[i].ssi_ptr;
472                         fiber.schedule();
473                     }
474                 }
475             }
476             else {
477                 auto descriptor = descriptors.ptr + fd;
478                 if (descriptor.state == DescriptorState.NONBLOCKING) {
479                     if (events[n].events & EPOLLIN) {
480                         logf("Read event for fd=%d", fd);
481                         auto state = descriptor.readerState;
482                         logf("read state = %d", state);
483                         final switch(state) with(ReaderState) { 
484                             case EMPTY:
485                                 logf("Trying to schedule readers");
486                                 descriptor.changeReader(EMPTY, READY);
487                                 descriptor.scheduleReaders(fd);
488                                 logf("Scheduled readers");
489                                 break;
490                             case UNCERTAIN:
491                                 descriptor.changeReader(UNCERTAIN, READY);
492                                 break;
493                             case READING:
494                                 if (!descriptor.changeReader(READING, UNCERTAIN)) {
495                                     if (descriptor.changeReader(EMPTY, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake readers
496                                         descriptor.scheduleReaders(fd);
497                                 }
498                                 break;
499                             case READY:
500                                 descriptor.scheduleReaders(fd);
501                                 break;
502                         }
503                         logf("Awaits %x", cast(void*)descriptor.readWaiters);
504                     }
505                     if (events[n].events & EPOLLOUT) {
506                         logf("Write event for fd=%d", fd);
507                         auto state = descriptor.writerState;
508                         logf("write state = %d", state);
509                         final switch(state) with(WriterState) { 
510                             case FULL:
511                                 descriptor.changeWriter(FULL, READY);
512                                 descriptor.scheduleWriters(fd);
513                                 break;
514                             case UNCERTAIN:
515                                 descriptor.changeWriter(UNCERTAIN, READY);
516                                 break;
517                             case WRITING:
518                                 if (!descriptor.changeWriter(WRITING, UNCERTAIN)) {
519                                     if (descriptor.changeWriter(FULL, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake writers
520                                         descriptor.scheduleWriters(fd);
521                                 }
522                                 break;
523                             case READY:
524                                 descriptor.scheduleWriters(fd);
525                                 break;
526                         }
527                         logf("Awaits %x", cast(void*)descriptor.writeWaiters);
528                     }
529                 }
530             }
531         }
532     }
533 }
534 
535 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF }
536 enum SyscallKind { accept, read, write, connect }
537 
538 // intercept - a filter for file descriptor, changes flags and register on first use
539 void interceptFd(Fcntl needsFcntl)(int fd) nothrow {
540     logf("Hit interceptFD");
541     if (fd < 0 || fd >= descriptors.length) return;
542     if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) {
543         logf("First use, registering fd = %s", fd);
544         static if(needsFcntl == Fcntl.explicit) {
545             int flags = fcntl(fd, F_GETFL, 0);
546             fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked;
547             logf("Setting FCNTL. %x", cast(void*)currentFiber);
548         }
549         epoll_event event;
550         event.events = EPOLLIN | EPOLLOUT | EPOLLET;
551         event.data.fd = fd;
552         if (epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, fd, &event) < 0 && errno == EPERM) {
553             logf("isSocket = false FD = %s", fd);
554             descriptors[fd].state = DescriptorState.THREADPOOL;
555         }
556         else {
557             logf("isSocket = true FD = %s", fd);
558             descriptors[fd].state = DescriptorState.NONBLOCKING;
559         }
560     }
561 }
562 
563 void deregisterFd(int fd) nothrow {
564     if(fd >= 0 && fd < descriptors.length) {
565         auto descriptor = descriptors.ptr + fd;
566         atomicStore(descriptor._writerState, WriterState.READY);
567         atomicStore(descriptor._readerState, ReaderState.EMPTY);
568         descriptor.scheduleReaders(fd);
569         descriptor.scheduleWriters(fd);
570         atomicStore(descriptor.state, DescriptorState.NOT_INITED);
571     }
572 }
573 
574 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...)
575                         (int fd, T args) nothrow {
576     if (currentFiber is null) {
577         logf("%s PASSTHROUGH FD=%s", name, fd);
578         return syscall(ident, fd, args).withErrorno;
579     }
580     else {
581         logf("HOOKED %s FD=%d", name, fd);
582         interceptFd!(fcntlStyle)(fd);
583         shared(Descriptor)* descriptor = descriptors.ptr + fd;
584         if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) {
585             logf("%s syscall THREADPOLL FD=%d", name, fd);
586             //TODO: offload syscall to thread-pool
587             return syscall(ident, fd, args).withErrorno;
588         }
589     L_start:
590         shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null);
591         // set flags argument if able to avoid direct fcntl calls
592         static if (fcntlStyle != Fcntl.explicit)
593         {
594             args[2] |= fcntlStyle;
595         }
596         //if (kind == SyscallKind.accept)
597         logf("kind:s args:%s", kind, args);
598         static if(kind == SyscallKind.accept || kind == SyscallKind.read) {
599             auto state = descriptor.readerState;
600             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
601             final switch (state) with (ReaderState) {
602             case EMPTY:
603                 logf("EMPTY - enqueue reader");
604                 if (!descriptor.enqueueReader(&await)) goto L_start;
605                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
606                 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd);
607                 FiberExt.yield();
608                 goto L_start;
609             case UNCERTAIN:
610                 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING
611                 goto case READING;
612             case READY:
613                 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads
614                 goto case READING;
615             case READING:
616                 ssize_t resp = syscall(ident, fd, args);
617                 static if (kind == SyscallKind.accept) {
618                     if (resp >= 0) // for accept we never know if we emptied the queue
619                         descriptor.changeReader(READING, UNCERTAIN);
620                     else if (resp == -ERR || resp == -EAGAIN) {
621                         if (descriptor.changeReader(READING, EMPTY))
622                             goto case EMPTY;
623                         goto L_start; // became UNCERTAIN or READY in meantime
624                     }
625                 }
626                 else static if (kind == SyscallKind.read) {
627                     if (resp == args[1]) // length is 2nd in (buf, length, ...)
628                         descriptor.changeReader(READING, UNCERTAIN);
629                     else if(resp >= 0)
630                         descriptor.changeReader(READING, EMPTY);
631                     else if (resp == -ERR || resp == -EAGAIN) {
632                         if (descriptor.changeReader(READING, EMPTY))
633                             goto case EMPTY;
634                         goto L_start; // became UNCERTAIN or READY in meantime
635                     }
636                 }
637                 else
638                     static assert(0);
639                 return withErrorno(resp);
640             }
641         }
642         else static if(kind == SyscallKind.write || kind == SyscallKind.connect) {
643             //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking?
644             auto state = descriptor.writerState;
645             logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber);
646             final switch (state) with (WriterState) {
647             case FULL:
648                 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber);
649                 if (!descriptor.enqueueWriter(&await)) goto L_start;
650                 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule
651                 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd);
652                 FiberExt.yield();
653                 goto L_start;
654             case UNCERTAIN:
655                 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber);
656                 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING
657                 goto case WRITING;
658             case READY:
659                 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes
660                 goto case WRITING;
661             case WRITING:
662                 ssize_t resp = syscall(ident, fd, args);
663                 static if (kind == SyscallKind.connect) {
664                     if(resp >= 0) {
665                         descriptor.changeWriter(WRITING, READY);
666                     }
667                     else if (resp == -ERR || resp == -EALREADY) {
668                         if (descriptor.changeWriter(WRITING, FULL)) {
669                             goto case FULL;
670                         }
671                         goto L_start; // became UNCERTAIN or READY in meantime
672                     }
673                     return withErrorno(resp);
674                 }
675                 else {
676                     if (resp == args[1]) // (buf, len) args to syscall
677                         descriptor.changeWriter(WRITING, UNCERTAIN);
678                     else if(resp >= 0) {
679                         logf("Short-write on FD=%d, become FULL", fd);
680                         descriptor.changeWriter(WRITING, FULL);
681                     }
682                     else if (resp == -ERR || resp == -EAGAIN) {
683                         if (descriptor.changeWriter(WRITING, FULL)) {
684                             logf("Sudden block on FD=%d, become FULL", fd);
685                             goto case FULL;
686                         }
687                         goto L_start; // became UNCERTAIN or READY in meantime
688                     }
689                     return withErrorno(resp);
690                 }
691             }
692         }
693         assert(0);
694     }
695 }
696 
697 class Channel(T) {
698     private T[] buf;
699     private size_t size;
700     private Event ev;
701     void put(T item) {
702         if (buf.length == size) {
703             ev.await();
704         }
705         if (buf.length == 0) ev.reset();
706         buf ~= item;
707     }
708     auto front() {
709         if (buf.length == 0) {
710             ev.await();
711         }
712         if (buf.length == size) ev.reset();
713         return buf[$-1];
714     }
715     void popFront() {
716         buf = buf[0..$-1];
717         ev.reset();
718         buf.assumeSafeAppend();
719     }
720     bool empty() {
721         return buf.length == 0;
722     }
723 }
724 
725 Channel!T channel(T)(size_t size) {
726     return new Channel(new T[size], size, Event());
727 }
728 
729 // ======================================================================================
730 // SYSCALL warappers intercepts
731 // ======================================================================================
732 
733 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow
734 {
735     return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK)
736         (fd, cast(size_t)buf, count);
737 }
738 
739 extern(C) ssize_t write(int fd, const void *buf, size_t count)
740 {
741     return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
742         (fd, cast(size_t)buf, count);
743 }
744 
745 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen)
746 {
747     return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK)
748         (sockfd, cast(size_t) addr, cast(size_t) addrlen);    
749 }
750 
751 extern(C) ssize_t accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags)
752 {
753     return universalSyscall!(SYS_ACCEPT4, "accept4", SyscallKind.accept, Fcntl.sock, EWOULDBLOCK)
754         (sockfd, cast(size_t) addr, cast(size_t) addrlen, flags);
755 }
756 
757 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen)
758 {
759     return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS)
760         (sockfd, cast(size_t) addr, cast(size_t) addrlen);
761 }
762 
763 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
764                       const sockaddr *dest_addr, socklen_t addrlen)
765 {
766     return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK)
767         (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen);
768 }
769 
770 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow {
771     sockaddr_in src_addr;
772     src_addr.sin_family = AF_INET;
773     src_addr.sin_port = 0;
774     src_addr.sin_addr.s_addr = htonl(INADDR_ANY);
775     ssize_t addrlen = sockaddr_in.sizeof;
776     return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen);   
777 }
778 
779 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
780                         sockaddr *src_addr, ssize_t* addrlen) nothrow
781 {
782     return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK)
783         (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen);
784 }
785 
786 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout)
787 {
788     bool nonBlockingCheck(ref ssize_t result) {
789         bool uncertain;
790     L_cacheloop:
791         foreach (ref fd; fds[0..nfds]) {
792             interceptFd!(Fcntl.explicit)(fd.fd);
793             fd.revents = 0;
794             auto descriptor = descriptors.ptr + fd.fd;
795             if (fd.events & POLLIN) {
796                 auto state = descriptor.readerState;
797                 logf("Found event %d for reader in select", state);
798                 switch(state) with(ReaderState) {
799                 case READY:
800                     fd.revents |=  POLLIN;
801                     break;
802                 case EMPTY:
803                     break;
804                 default:
805                     uncertain = true;
806                     break L_cacheloop;
807                 }
808             }
809             if (fd.events & POLLOUT) {
810                 auto state = descriptor.writerState;
811                 logf("Found event %d for writer in select", state);
812                 switch(state) with(WriterState) {
813                 case READY:
814                     fd.revents |= POLLOUT;
815                     break;
816                 case FULL:
817                     break;
818                 default:
819                     uncertain = true;
820                     break L_cacheloop;
821                 }
822             }
823         }
824         // fallback to system poll call if descriptor state is uncertain
825         if (uncertain) {
826             logf("Fallback to system poll, descriptors have uncertain state");
827             ssize_t p = raw_poll(fds, nfds, 0);
828             if (p != 0) {
829                 result = p;
830                 logf("Raw poll returns %d", result);
831                 return true;
832             }
833         }
834         else {
835             ssize_t j = 0;
836             foreach (i; 0..nfds) {
837                 if (fds[i].revents) {
838                     fds[j++] = fds[i];
839                 }
840             }
841             logf("Using our own event cache: %d events", j);
842             if (j > 0) {
843                 result = cast(ssize_t)j;
844                 return true;
845             }
846         }
847         return false;
848     }
849     if (currentFiber is null) {
850         logf("POLL PASSTHROUGH!");
851         return raw_poll(fds, nfds, timeout);
852     }
853     else {
854         logf("HOOKED POLL %d fds timeout %d", nfds, timeout);
855         if (nfds < 0) return -EINVAL.withErrorno;
856         if (nfds == 0) {
857             if (timeout == 0) return 0;
858             shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
859             Timer tm = timer();
860             descriptors[tm.fd].enqueueReader(&aw);
861             scope(exit) tm.dispose();
862             tm.arm(timeout);
863             logf("Timer fd=%d", tm.fd);
864             Fiber.yield();
865             logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd);
866             return 0;
867         }
868         foreach(ref fd; fds[0..nfds]) {
869             if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno;
870             fd.revents = 0;
871         }
872         ssize_t result = 0;
873         if (timeout <= 0) return raw_poll(fds, nfds, timeout);
874         if (nonBlockingCheck(result)) return result;
875         shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber);
876         foreach (i; 0..nfds) {
877             if (fds[i].events & POLLIN)
878                 descriptors[fds[i].fd].enqueueReader(&aw);
879             else if(fds[i].events & POLLOUT)
880                 descriptors[fds[i].fd].enqueueWriter(&aw);
881         }
882         Timer tm = timer();
883         scope(exit) tm.dispose();
884         tm.arm(timeout);
885         descriptors[tm.fd].enqueueReader(&aw);
886         Fiber.yield();
887         tm.disarm();
888         atomicStore(descriptors[tm.fd]._readerWaits, cast(shared(AwaitingFiber)*)null);
889         foreach (i; 0..nfds) {
890             if (fds[i].events & POLLIN)
891                 descriptors[fds[i].fd].removeReader(&aw);
892             else if(fds[i].events & POLLOUT)
893                 descriptors[fds[i].fd].removeWriter(&aw);
894         }
895         logf("Woke up after select %x. WakeFD=%d", cast(void*)currentFiber, currentFiber.wakeFd);
896         if (currentFiber.wakeFd == tm.fd) return 0;
897         else {
898             nonBlockingCheck(result);
899             return result;
900         }
901     }
902 }
903 
904 extern(C) private ssize_t close(int fd) nothrow
905 {
906     logf("HOOKED CLOSE FD=%d", fd);
907     deregisterFd(fd);
908     return cast(int)withErrorno(syscall(SYS_CLOSE, fd));
909 }