1 module photon.linux.core; 2 version(linux): 3 private: 4 5 import std.stdio; 6 import std.string; 7 import std.format; 8 import std.exception; 9 import std.conv; 10 import std.array; 11 import std.meta; 12 import std.random; 13 import core.thread; 14 import core.internal.spinlock; 15 import core.sys.posix.sys.types; 16 import core.sys.posix.sys.socket; 17 import core.sys.posix.poll; 18 import core.sys.posix.netinet.in_; 19 import core.sys.posix.unistd; 20 import core.sys.linux.epoll; 21 import core.sys.linux.timerfd; 22 import core.sys.linux.sys.eventfd; 23 import core.sync.mutex; 24 import core.stdc.errno; 25 import core.atomic; 26 import core.sys.posix.stdlib: abort; 27 import core.sys.posix.fcntl; 28 import core.memory; 29 import core.sys.posix.sys.mman; 30 import core.sys.posix.pthread; 31 import core.stdc.stdlib; 32 import core.sys.linux.sys.signalfd; 33 34 import photon.linux.support; 35 import photon.linux.syscalls; 36 import photon.ds.common; 37 import photon.ds.intrusive_queue; 38 39 shared struct RawEvent { 40 nothrow: 41 this(int init) { 42 fd = eventfd(init, 0); 43 } 44 45 void waitAndReset() { 46 byte[8] bytes = void; 47 ssize_t r; 48 do { 49 r = raw_read(fd, bytes.ptr, 8); 50 } while(r < 0 && errno == EINTR); 51 r.checked("event reset"); 52 } 53 54 void trigger() { 55 union U { 56 ulong cnt; 57 ubyte[8] bytes; 58 } 59 U value; 60 value.cnt = 1; 61 ssize_t r; 62 do { 63 r = raw_write(fd, value.bytes.ptr, 8); 64 } while(r < 0 && errno == EINTR); 65 r.checked("event trigger"); 66 } 67 68 int fd; 69 } 70 71 /// 72 public struct Event { 73 nothrow: 74 private int evfd; 75 76 private this(bool signaled) { 77 evfd = eventfd(signaled ? 1 : 0, EFD_NONBLOCK); 78 interceptFd!(Fcntl.noop)(evfd); 79 } 80 81 int fd() { return evfd; } 82 83 @disable this(this); 84 85 /// Wait for the event to be triggered, then reset and return atomically 86 void waitAndReset() { 87 byte[8] bytes = void; 88 ssize_t r; 89 do { 90 r = read(evfd, bytes.ptr, bytes.sizeof); 91 } while (r < 0 && errno == EINTR); 92 } 93 94 void waitAndReset() shared { 95 this.unshared.waitAndReset(); 96 } 97 98 /// Trigger the event. 99 void trigger() { 100 union U { 101 ulong cnt; 102 ubyte[8] bytes; 103 } 104 U value; 105 value.cnt = 1; 106 ssize_t r; 107 do { 108 r = write(evfd, value.bytes.ptr, value.sizeof); 109 } while(r < 0 && errno == EINTR); 110 } 111 112 void trigger() shared { 113 this.unshared.trigger(); 114 } 115 } 116 117 /// 118 public auto event(bool triggered) { 119 return Event(triggered); 120 } 121 122 /// 123 public struct Semaphore { 124 nothrow: 125 private int evfd; 126 /// 127 this(int count) { 128 evfd = eventfd(count, EFD_NONBLOCK | EFD_SEMAPHORE); 129 interceptFd!(Fcntl.noop)(evfd); 130 } 131 132 int fd() { return evfd; } 133 134 @disable this(this); 135 136 /// 137 void wait() { 138 ubyte[8] bytes = void; 139 ssize_t r; 140 do { 141 // go through event loop 142 r = read(evfd, bytes.ptr, bytes.sizeof); 143 } while (r < 0 && errno == EINTR); 144 } 145 146 void wait() shared { 147 this.unshared.wait(); 148 } 149 150 /// 151 void trigger(int count) { 152 union U { 153 ulong cnt; 154 ubyte[8] bytes; 155 } 156 U value; 157 value.cnt = count; 158 ssize_t r; 159 do { 160 r = write(evfd, value.bytes.ptr, value.sizeof); 161 } while(r < 0 && errno == EINTR); 162 } 163 164 void trigger(int count) shared { 165 this.unshared.trigger(count); 166 } 167 168 /// Free this semaphore 169 void dispose() { 170 close(evfd).checked; 171 } 172 173 void dispose() shared { 174 this.unshared.dispose(); 175 } 176 } 177 178 /// 179 public auto nothrow semaphore(int initialCount) { 180 return Semaphore(initialCount); 181 } 182 183 /// A fiber-friendly timer. 184 public struct Timer { 185 nothrow: 186 private int timerfd; 187 188 189 int fd() { 190 return timerfd; 191 } 192 193 static void duration2ts(timespec *ts, Duration d) { 194 auto total = d.total!"nsecs"(); 195 ts.tv_sec = total / 1_000_000_000; 196 ts.tv_nsec = total % 1_000_000_000; 197 } 198 199 private void arm(const timespec *ts_timeout) { 200 itimerspec its; 201 its.it_value = *ts_timeout; 202 its.it_interval.tv_sec = 0; 203 its.it_interval.tv_nsec = 0; 204 timerfd_settime(timerfd, 0, &its, null); 205 } 206 207 private void arm(Duration timeout) { 208 timespec ts_timeout; 209 duration2ts(&ts_timeout, timeout); //convert duration to timespec 210 arm(&ts_timeout); 211 } 212 213 private void disarm() { 214 itimerspec its; // zeros 215 timerfd_settime(timerfd, 0, &its, null); 216 } 217 218 /// Wait for the timer to trigger. 219 void wait(T)(T duration) 220 if (is(T : const timespec*) || is(T : Duration)) { 221 arm(duration); 222 union U { 223 ulong timeouts; 224 ubyte[8] bytes; 225 } 226 U u = void; 227 ssize_t r; 228 do { 229 r = read(timerfd, u.bytes.ptr, U.sizeof); 230 } while (r < 0 && errno == EINTR); 231 disarm(); 232 } 233 234 void dispose() { 235 close(timerfd).checked; 236 } 237 } 238 239 /// 240 public nothrow Timer timer() { 241 int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC).checked; 242 interceptFd!(Fcntl.noop)(timerfd); 243 return Timer(timerfd); 244 } 245 246 struct AwaitingFiber { 247 shared FiberExt fiber; 248 AwaitingFiber* next; 249 250 void scheduleAll(int wakeFd) nothrow 251 { 252 auto w = &this; 253 FiberExt head; 254 // first process all AwaitingFibers since they are on stack 255 do { 256 auto fiber = steal(w.fiber); 257 if (fiber) { 258 fiber.unshared.next = head; 259 head = fiber.unshared; 260 } 261 w = w.next; 262 } while(w); 263 while(head) { 264 logf("Waking with FD=%d", wakeFd); 265 head.wakeFd = wakeFd; 266 auto next = head.next; // schedule uses .next pointer 267 head.schedule(); 268 head = next; 269 } 270 } 271 } 272 273 class FiberExt : Fiber { 274 FiberExt next; 275 uint numScheduler; 276 int wakeFd; // recieves fd that woken us up 277 278 enum PAGESIZE = 4096; 279 280 this(void function() fn, uint numSched) nothrow { 281 super(fn); 282 numScheduler = numSched; 283 } 284 285 this(void delegate() dg, uint numSched) nothrow { 286 super(dg); 287 numScheduler = numSched; 288 } 289 290 void schedule() nothrow 291 { 292 scheds[numScheduler].queue.push(this); 293 } 294 } 295 296 FiberExt currentFiber; 297 shared RawEvent termination; // termination event, triggered once last fiber exits 298 shared pthread_t eventLoop; // event loop, runs outside of D runtime 299 shared int alive; // count of non-terminated Fibers scheduled 300 301 Timer[] timerPool; // thread-local pool of preallocated timers 302 303 nothrow auto getSleepTimer() { 304 Timer tm; 305 if (timerPool.length == 0) { 306 tm = timer(); 307 } else { 308 tm = timerPool[$-1]; 309 timerPool.length = timerPool.length-1; 310 } 311 return tm; 312 } 313 314 nothrow void freeSleepTimer(Timer tm) { 315 timerPool.assumeSafeAppend(); 316 timerPool ~= tm; 317 } 318 319 /// Delay fiber execution by `req` duration. 320 public nothrow void delay(T)(T req) 321 if (is(T : const timespec*) || is(T : Duration)) { 322 auto tm = getSleepTimer(); 323 tm.wait(req); 324 freeSleepTimer(tm); 325 } 326 327 /// 328 enum isAwaitable(E) = is (E : Event) || is (E : Semaphore) 329 || is(E : Event*) || is(E : Semaphore*); 330 331 /// 332 public size_t awaitAny(Awaitable...)(auto ref Awaitable args) 333 if (allSatisfy!(isAwaitable, Awaitable)) { 334 pollfd* fds = cast(pollfd*)calloc(args.length, pollfd.sizeof); 335 scope(exit) free(fds); 336 foreach (i, ref arg; args) { 337 fds[i].fd = arg.fd; 338 fds[i].events = POLL_IN; 339 } 340 int resp; 341 do { 342 resp = poll(fds, args.length, -1); 343 } while (resp < 0 && errno == EINTR); 344 foreach (idx, ref fd; fds[0..args.length]) { 345 if (fd.revents & POLL_IN) { 346 ubyte[8] tmp; 347 read(fds[idx].fd, tmp.ptr, tmp.sizeof); 348 return idx; 349 } 350 } 351 assert(0); 352 } 353 354 /// 355 public size_t awaitAny(Awaitable)(Awaitable[] args) 356 if (allSatisfy!(isAwaitable, Awaitable)) { 357 pollfd* fds = cast(pollfd*)calloc(args.length, pollfd.sizeof); 358 scope(exit) free(fds); 359 foreach (i, ref arg; args) { 360 fds[i].fd = arg.fd; 361 fds[i].events = POLL_IN; 362 } 363 ssize_t resp; 364 do { 365 resp = poll(fds, args.length, -1); 366 } while (resp < 0 && errno == EINTR); 367 foreach (idx, ref fd; fds[0..args.length]) { 368 if (fd.revents & POLL_IN) { 369 ubyte[8] tmp; 370 read(fds[idx].fd, tmp.ptr, tmp.sizeof); 371 return idx; 372 } 373 } 374 assert(0); 375 } 376 377 struct SchedulerBlock { 378 shared IntrusiveQueue!(FiberExt, RawEvent) queue; 379 shared uint assigned; 380 size_t[2] padding; 381 } 382 static assert(SchedulerBlock.sizeof == 64); 383 384 package(photon) shared SchedulerBlock[] scheds; 385 386 enum int MAX_EVENTS = 500; 387 enum int SIGNAL = 42; 388 389 package(photon) void schedulerEntry(size_t n) 390 { 391 int tid = gettid(); 392 cpu_set_t mask; 393 CPU_SET(n, &mask); 394 sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity"); 395 shared SchedulerBlock* sched = scheds.ptr + n; 396 while (alive > 0) { 397 sched.queue.event.waitAndReset(); 398 for(;;) { 399 FiberExt f = sched.queue.drain(); 400 if (f is null) break; // drained an empty queue, time to sleep 401 do { 402 auto next = f.next; //save next, it will be reused on scheduling 403 currentFiber = f; 404 logf("Fiber %x started", cast(void*)f); 405 try { 406 f.call(); 407 } 408 catch (Exception e) { 409 stderr.writeln(e); 410 atomicOp!"-="(alive, 1); 411 } 412 if (f.state == FiberExt.State.TERM) { 413 logf("Fiber %s terminated", cast(void*)f); 414 atomicOp!"-="(alive, 1); 415 } 416 f = next; 417 } while(f !is null); 418 } 419 } 420 termination.trigger(); 421 foreach (ref s; scheds) { 422 s.queue.event.trigger(); 423 } 424 } 425 426 /// Convenience overload for functions 427 public void go(void function() func) { 428 go({ func(); }); 429 } 430 431 /// Setup a fiber task to run on the Photon scheduler. 432 public void go(void delegate() func) { 433 uint choice; 434 if (scheds.length == 1) choice = 0; 435 else { 436 uint a = uniform!"[)"(0, cast(uint)scheds.length); 437 uint b = uniform!"[)"(0, cast(uint)scheds.length-1); 438 if (a == b) b = cast(uint)scheds.length-1; 439 uint loadA = scheds[a].assigned; 440 uint loadB = scheds[b].assigned; 441 if (loadA < loadB) choice = a; 442 else choice = b; 443 } 444 atomicOp!"+="(scheds[choice].assigned, 1); 445 atomicOp!"+="(alive, 1); 446 auto f = new FiberExt(func, choice); 447 logf("Assigned %x -> %d scheduler", cast(void*)f, choice); 448 f.schedule(); 449 } 450 451 shared Descriptor[] descriptors; 452 shared int event_loop_fd; 453 shared int signal_loop_fd; 454 455 enum ReaderState: uint { 456 EMPTY = 0, 457 UNCERTAIN = 1, 458 READING = 2, 459 READY = 3 460 } 461 462 enum WriterState: uint { 463 READY = 0, 464 UNCERTAIN = 1, 465 WRITING = 2, 466 FULL = 3 467 } 468 469 enum DescriptorState: uint { 470 NOT_INITED, 471 INITIALIZING, 472 NONBLOCKING, 473 THREADPOOL 474 } 475 476 // list of awaiting fibers 477 shared struct Descriptor { 478 ReaderState _readerState; 479 AwaitingFiber* _readerWaits; 480 WriterState _writerState; 481 AwaitingFiber* _writerWaits; 482 DescriptorState state; 483 nothrow: 484 ReaderState readerState()() { 485 return atomicLoad(_readerState); 486 } 487 488 WriterState writerState()() { 489 return atomicLoad(_writerState); 490 } 491 492 // try to change state & return whatever it happend to be in the end 493 bool changeReader()(ReaderState from, ReaderState to) { 494 return cas(&_readerState, from, to); 495 } 496 497 // ditto for writer 498 bool changeWriter()(WriterState from, WriterState to) { 499 return cas(&_writerState, from, to); 500 } 501 502 // 503 shared(AwaitingFiber)* readWaiters()() { 504 return atomicLoad(_readerWaits); 505 } 506 507 // 508 shared(AwaitingFiber)* writeWaiters()(){ 509 return atomicLoad(_writerWaits); 510 } 511 512 // try to enqueue reader fiber given old head 513 bool enqueueReader()(shared(AwaitingFiber)* fiber) { 514 auto head = readWaiters; 515 if (head == fiber) { 516 return true; // TODO: HACK 517 } 518 fiber.next = head; 519 return cas(&_readerWaits, head, fiber); 520 } 521 522 void removeReader()(shared(AwaitingFiber)* fiber) { 523 auto head = steal(_readerWaits); 524 if (head is null || head.next is null) return; 525 head = removeFromList(head.unshared, fiber); 526 cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null); 527 } 528 529 // try to enqueue writer fiber given old head 530 bool enqueueWriter()(shared(AwaitingFiber)* fiber) { 531 auto head = writeWaiters; 532 if (head == fiber) { 533 return true; // TODO: HACK 534 } 535 fiber.next = head; 536 return cas(&_writerWaits, head, fiber); 537 } 538 539 void removeWriter()(shared(AwaitingFiber)* fiber) { 540 auto head = steal(_writerWaits); 541 if (head is null || head.next is null) return; 542 head = removeFromList(head.unshared, fiber); 543 cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null); 544 } 545 546 // try to schedule readers - if fails - someone added a reader, it's now his job to check state 547 void scheduleReaders()(int wakeFd) { 548 auto w = steal(_readerWaits); 549 if (w) w.unshared.scheduleAll(wakeFd); 550 } 551 552 // try to schedule writers, ditto 553 void scheduleWriters()(int wakeFd) { 554 auto w = steal(_writerWaits); 555 if (w) w.unshared.scheduleAll(wakeFd); 556 } 557 } 558 559 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*) 560 { 561 version(photon_tracing) printStats(); 562 _exit(9); 563 } 564 565 version(photon_tracing) 566 void printStats() 567 { 568 // TODO: report on various events in eventloop/scheduler 569 string msg = "Tracing report:\n\n"; 570 write(2, msg.ptr, msg.length); 571 } 572 573 public void startloop() 574 { 575 import core.cpuid; 576 uint threads = threadsPerCPU; 577 578 event_loop_fd = cast(int)epoll_create1(0).checked("ERROR: Failed to create event-loop!"); 579 // use RT signals, disable default termination on signal received 580 sigset_t mask; 581 sigemptyset(&mask); 582 sigaddset(&mask, SIGNAL); 583 pthread_sigmask(SIG_BLOCK, &mask, null).checked; 584 signal_loop_fd = cast(int)signalfd(-1, &mask, 0).checked("ERROR: Failed to create signalfd!"); 585 586 epoll_event event; 587 event.events = EPOLLIN; 588 event.data.fd = signal_loop_fd; 589 epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, signal_loop_fd, &event).checked; 590 591 termination = RawEvent(0); 592 event.events = EPOLLIN; 593 event.data.fd = termination.fd; 594 epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, termination.fd, &event).checked; 595 596 { 597 598 sigaction_t action; 599 action.sa_sigaction = &graceful_shutdown_on_signal; 600 sigaction(SIGTERM, &action, null).checked; 601 } 602 603 ssize_t fdMax = sysconf(_SC_OPEN_MAX).checked; 604 descriptors = (cast(shared(Descriptor*)) mmap(null, fdMax * Descriptor.sizeof, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0))[0..fdMax]; 605 scheds = new SchedulerBlock[threads]; 606 foreach(ref sched; scheds) { 607 sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0)); 608 } 609 eventLoop = pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null); 610 } 611 612 package(photon) void stoploop() 613 { 614 void* ret; 615 pthread_join(eventLoop, &ret); 616 } 617 618 extern(C) void* processEventsEntry(void*) 619 { 620 for (;;) { 621 epoll_event[MAX_EVENTS] events = void; 622 signalfd_siginfo[20] fdsi = void; 623 int r; 624 do { 625 r = epoll_wait(event_loop_fd, events.ptr, MAX_EVENTS, -1); 626 } while (r < 0 && errno == EINTR); 627 checked(r); 628 for (int n = 0; n < r; n++) { 629 int fd = events[n].data.fd; 630 if (fd == termination.fd) { 631 foreach(ref s; scheds) s.queue.event.trigger(); 632 return null; 633 } 634 else if (fd == signal_loop_fd) { 635 logf("Intercepted our aio SIGNAL"); 636 ssize_t r2 = raw_read(signal_loop_fd, &fdsi, fdsi.sizeof); 637 logf("aio events = %d", r2 / signalfd_siginfo.sizeof); 638 if (r2 % signalfd_siginfo.sizeof != 0) 639 checked(r2, "ERROR: failed read on signalfd"); 640 641 for(int i = 0; i < r2 / signalfd_siginfo.sizeof; i++) { //TODO: stress test multiple signals 642 logf("Processing aio event idx = %d", i); 643 if (fdsi[i].ssi_signo == SIGNAL) { 644 logf("HIT our SIGNAL"); 645 auto fiber = cast(FiberExt)cast(void*)fdsi[i].ssi_ptr; 646 fiber.schedule(); 647 } 648 } 649 } 650 else { 651 auto descriptor = descriptors.ptr + fd; 652 if (descriptor.state == DescriptorState.NONBLOCKING) { 653 if (events[n].events & EPOLLIN) { 654 logf("Read event for fd=%d", fd); 655 auto state = descriptor.readerState; 656 logf("read state = %d", state); 657 final switch(state) with(ReaderState) { 658 case EMPTY: 659 logf("Trying to schedule readers"); 660 descriptor.changeReader(EMPTY, READY); 661 descriptor.scheduleReaders(fd); 662 logf("Scheduled readers"); 663 break; 664 case UNCERTAIN: 665 descriptor.changeReader(UNCERTAIN, READY); 666 break; 667 case READING: 668 if (!descriptor.changeReader(READING, UNCERTAIN)) { 669 if (descriptor.changeReader(EMPTY, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake readers 670 descriptor.scheduleReaders(fd); 671 } 672 break; 673 case READY: 674 descriptor.scheduleReaders(fd); 675 break; 676 } 677 logf("Awaits %x", cast(void*)descriptor.readWaiters); 678 } 679 if (events[n].events & EPOLLOUT) { 680 logf("Write event for fd=%d", fd); 681 auto state = descriptor.writerState; 682 logf("write state = %d", state); 683 final switch(state) with(WriterState) { 684 case FULL: 685 descriptor.changeWriter(FULL, READY); 686 descriptor.scheduleWriters(fd); 687 break; 688 case UNCERTAIN: 689 descriptor.changeWriter(UNCERTAIN, READY); 690 break; 691 case WRITING: 692 if (!descriptor.changeWriter(WRITING, UNCERTAIN)) { 693 if (descriptor.changeWriter(FULL, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake writers 694 descriptor.scheduleWriters(fd); 695 } 696 break; 697 case READY: 698 descriptor.scheduleWriters(fd); 699 break; 700 } 701 logf("Awaits %x", cast(void*)descriptor.writeWaiters); 702 } 703 } 704 } 705 } 706 } 707 } 708 709 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF } 710 enum SyscallKind { accept, read, write, connect } 711 712 // intercept - a filter for file descriptor, changes flags and register on first use 713 void interceptFd(Fcntl needsFcntl)(int fd) nothrow { 714 logf("Hit interceptFD"); 715 if (fd < 0 || fd >= descriptors.length) return; 716 if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) { 717 logf("First use, registering fd = %s", fd); 718 static if(needsFcntl == Fcntl.explicit) { 719 int flags = fcntl(fd, F_GETFL, 0); 720 fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked; 721 logf("Setting FCNTL. %x", cast(void*)currentFiber); 722 } 723 epoll_event event; 724 event.events = EPOLLIN | EPOLLOUT | EPOLLET; 725 event.data.fd = fd; 726 if (epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, fd, &event) < 0 && errno == EPERM) { 727 logf("isSocket = false FD = %s", fd); 728 descriptors[fd].state = DescriptorState.THREADPOOL; 729 } 730 else { 731 logf("isSocket = true FD = %s", fd); 732 descriptors[fd].state = DescriptorState.NONBLOCKING; 733 } 734 } 735 } 736 737 void deregisterFd(int fd) nothrow { 738 if(fd >= 0 && fd < descriptors.length) { 739 auto descriptor = descriptors.ptr + fd; 740 atomicStore(descriptor._writerState, WriterState.READY); 741 atomicStore(descriptor._readerState, ReaderState.EMPTY); 742 descriptor.scheduleReaders(fd); 743 descriptor.scheduleWriters(fd); 744 atomicStore(descriptor.state, DescriptorState.NOT_INITED); 745 } 746 } 747 748 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...) 749 (int fd, T args) nothrow { 750 if (currentFiber is null) { 751 logf("%s PASSTHROUGH FD=%s", name, fd); 752 return syscall(ident, fd, args).withErrorno; 753 } 754 else { 755 logf("HOOKED %s FD=%d", name, fd); 756 interceptFd!(fcntlStyle)(fd); 757 shared(Descriptor)* descriptor = descriptors.ptr + fd; 758 if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) { 759 logf("%s syscall THREADPOLL FD=%d", name, fd); 760 //TODO: offload syscall to thread-pool 761 return syscall(ident, fd, args).withErrorno; 762 } 763 L_start: 764 shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null); 765 // set flags argument if able to avoid direct fcntl calls 766 static if (fcntlStyle != Fcntl.explicit) 767 { 768 args[2] |= fcntlStyle; 769 } 770 //if (kind == SyscallKind.accept) 771 logf("kind:s args:%s", kind, args); 772 static if(kind == SyscallKind.accept || kind == SyscallKind.read) { 773 auto state = descriptor.readerState; 774 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 775 final switch (state) with (ReaderState) { 776 case EMPTY: 777 logf("EMPTY - enqueue reader"); 778 if (!descriptor.enqueueReader(&await)) goto L_start; 779 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 780 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd); 781 FiberExt.yield(); 782 goto L_start; 783 case UNCERTAIN: 784 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING 785 goto case READING; 786 case READY: 787 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads 788 goto case READING; 789 case READING: 790 ssize_t resp = syscall(ident, fd, args); 791 static if (kind == SyscallKind.accept) { 792 if (resp >= 0) // for accept we never know if we emptied the queue 793 descriptor.changeReader(READING, UNCERTAIN); 794 else if (resp == -ERR || resp == -EAGAIN) { 795 if (descriptor.changeReader(READING, EMPTY)) 796 goto case EMPTY; 797 goto L_start; // became UNCERTAIN or READY in meantime 798 } 799 } 800 else static if (kind == SyscallKind.read) { 801 if (resp == args[1]) // length is 2nd in (buf, length, ...) 802 descriptor.changeReader(READING, UNCERTAIN); 803 else if(resp >= 0) 804 descriptor.changeReader(READING, EMPTY); 805 else if (resp == -ERR || resp == -EAGAIN) { 806 if (descriptor.changeReader(READING, EMPTY)) 807 goto case EMPTY; 808 goto L_start; // became UNCERTAIN or READY in meantime 809 } 810 } 811 else 812 static assert(0); 813 return withErrorno(resp); 814 } 815 } 816 else static if(kind == SyscallKind.write || kind == SyscallKind.connect) { 817 //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking? 818 auto state = descriptor.writerState; 819 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 820 final switch (state) with (WriterState) { 821 case FULL: 822 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber); 823 if (!descriptor.enqueueWriter(&await)) goto L_start; 824 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 825 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd); 826 FiberExt.yield(); 827 goto L_start; 828 case UNCERTAIN: 829 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber); 830 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING 831 goto case WRITING; 832 case READY: 833 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes 834 goto case WRITING; 835 case WRITING: 836 ssize_t resp = syscall(ident, fd, args); 837 static if (kind == SyscallKind.connect) { 838 if(resp >= 0) { 839 descriptor.changeWriter(WRITING, READY); 840 } 841 else if (resp == -ERR || resp == -EALREADY) { 842 if (descriptor.changeWriter(WRITING, FULL)) { 843 goto case FULL; 844 } 845 goto L_start; // became UNCERTAIN or READY in meantime 846 } 847 return withErrorno(resp); 848 } 849 else { 850 if (resp == args[1]) // (buf, len) args to syscall 851 descriptor.changeWriter(WRITING, UNCERTAIN); 852 else if(resp >= 0) { 853 logf("Short-write on FD=%d, become FULL", fd); 854 descriptor.changeWriter(WRITING, FULL); 855 } 856 else if (resp == -ERR || resp == -EAGAIN) { 857 if (descriptor.changeWriter(WRITING, FULL)) { 858 logf("Sudden block on FD=%d, become FULL", fd); 859 goto case FULL; 860 } 861 goto L_start; // became UNCERTAIN or READY in meantime 862 } 863 return withErrorno(resp); 864 } 865 } 866 } 867 assert(0); 868 } 869 } 870 871 class Channel(T) { 872 private T[] buf; 873 private size_t size; 874 private Event ev; 875 void put(T item) { 876 if (buf.length == size) { 877 ev.await(); 878 } 879 if (buf.length == 0) ev.reset(); 880 buf ~= item; 881 } 882 auto front() { 883 if (buf.length == 0) { 884 ev.await(); 885 } 886 if (buf.length == size) ev.reset(); 887 return buf[$-1]; 888 } 889 void popFront() { 890 buf = buf[0..$-1]; 891 ev.reset(); 892 buf.assumeSafeAppend(); 893 } 894 bool empty() { 895 return buf.length == 0; 896 } 897 } 898 899 Channel!T channel(T)(size_t size) { 900 return new Channel(new T[size], size, Event()); 901 } 902 903 // ====================================================================================== 904 // SYSCALL warappers intercepts 905 // ====================================================================================== 906 nothrow: 907 908 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow 909 { 910 return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK) 911 (fd, cast(size_t)buf, count); 912 } 913 914 extern(C) ssize_t write(int fd, const void *buf, size_t count) 915 { 916 return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 917 (fd, cast(size_t)buf, count); 918 } 919 920 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen) 921 { 922 return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK) 923 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 924 } 925 926 extern(C) ssize_t accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) 927 { 928 return universalSyscall!(SYS_ACCEPT4, "accept4", SyscallKind.accept, Fcntl.sock, EWOULDBLOCK) 929 (sockfd, cast(size_t) addr, cast(size_t) addrlen, flags); 930 } 931 932 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen) 933 { 934 return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS) 935 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 936 } 937 938 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, 939 const sockaddr *dest_addr, socklen_t addrlen) 940 { 941 return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 942 (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen); 943 } 944 945 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow { 946 sockaddr_in src_addr; 947 src_addr.sin_family = AF_INET; 948 src_addr.sin_port = 0; 949 src_addr.sin_addr.s_addr = htonl(INADDR_ANY); 950 ssize_t addrlen = sockaddr_in.sizeof; 951 return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen); 952 } 953 954 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, 955 sockaddr *src_addr, ssize_t* addrlen) nothrow 956 { 957 return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK) 958 (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen); 959 } 960 961 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout) 962 { 963 nothrow bool nonBlockingCheck(ref ssize_t result) { 964 bool uncertain; 965 L_cacheloop: 966 foreach (ref fd; fds[0..nfds]) { 967 interceptFd!(Fcntl.explicit)(fd.fd); 968 fd.revents = 0; 969 auto descriptor = descriptors.ptr + fd.fd; 970 if (fd.events & POLLIN) { 971 auto state = descriptor.readerState; 972 logf("Found event %d for reader in select", state); 973 switch(state) with(ReaderState) { 974 case READY: 975 fd.revents |= POLLIN; 976 break; 977 case EMPTY: 978 break; 979 default: 980 uncertain = true; 981 break L_cacheloop; 982 } 983 } 984 if (fd.events & POLLOUT) { 985 auto state = descriptor.writerState; 986 logf("Found event %d for writer in select", state); 987 switch(state) with(WriterState) { 988 case READY: 989 fd.revents |= POLLOUT; 990 break; 991 case FULL: 992 break; 993 default: 994 uncertain = true; 995 break L_cacheloop; 996 } 997 } 998 } 999 // fallback to system poll call if descriptor state is uncertain 1000 if (uncertain) { 1001 logf("Fallback to system poll, descriptors have uncertain state"); 1002 ssize_t p = raw_poll(fds, nfds, 0); 1003 if (p != 0) { 1004 result = p; 1005 logf("Raw poll returns %d", result); 1006 return true; 1007 } 1008 } 1009 else { 1010 ssize_t j = 0; 1011 foreach (i; 0..nfds) { 1012 if (fds[i].revents) { 1013 fds[j++] = fds[i]; 1014 } 1015 } 1016 logf("Using our own event cache: %d events", j); 1017 if (j > 0) { 1018 result = cast(ssize_t)j; 1019 return true; 1020 } 1021 } 1022 return false; 1023 } 1024 if (currentFiber is null) { 1025 logf("POLL PASSTHROUGH!"); 1026 return raw_poll(fds, nfds, timeout); 1027 } 1028 else { 1029 logf("HOOKED POLL %d fds timeout %d", nfds, timeout); 1030 if (nfds < 0) return -EINVAL.withErrorno; 1031 if (nfds == 0) { 1032 if (timeout == 0) return 0; 1033 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 1034 Timer tm = timer(); 1035 descriptors[tm.fd].enqueueReader(&aw); 1036 scope(exit) tm.dispose(); 1037 tm.arm(timeout.msecs); 1038 logf("Timer fd=%d", tm.fd); 1039 Fiber.yield(); 1040 logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd); 1041 return 0; 1042 } 1043 foreach(ref fd; fds[0..nfds]) { 1044 if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno; 1045 fd.revents = 0; 1046 } 1047 ssize_t result = 0; 1048 if (nonBlockingCheck(result)) return result; 1049 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 1050 foreach (i; 0..nfds) { 1051 if (fds[i].events & POLLIN) 1052 descriptors[fds[i].fd].enqueueReader(&aw); 1053 else if(fds[i].events & POLLOUT) 1054 descriptors[fds[i].fd].enqueueWriter(&aw); 1055 } 1056 int timeoutFd = -1; 1057 if (timeout > 0) { 1058 Timer tm = timer(); 1059 timeoutFd = tm.fd; 1060 scope(exit) tm.dispose(); 1061 tm.arm(timeout.msecs); 1062 descriptors[tm.fd].enqueueReader(&aw); 1063 Fiber.yield(); 1064 tm.disarm(); 1065 atomicStore(descriptors[tm.fd]._readerWaits, cast(shared(AwaitingFiber)*)null); 1066 } 1067 else { 1068 Fiber.yield(); 1069 } 1070 foreach (i; 0..nfds) { 1071 if (fds[i].events & POLLIN) 1072 descriptors[fds[i].fd].removeReader(&aw); 1073 else if(fds[i].events & POLLOUT) 1074 descriptors[fds[i].fd].removeWriter(&aw); 1075 } 1076 logf("Woke up after select %x. WakeFD=%d", cast(void*)currentFiber, currentFiber.wakeFd); 1077 if (currentFiber.wakeFd == timeoutFd) return 0; 1078 else { 1079 nonBlockingCheck(result); 1080 return result; 1081 } 1082 } 1083 } 1084 1085 extern(C) private ssize_t nanosleep(const timespec* req, const timespec* rem) { 1086 if (currentFiber !is null) { 1087 delay(req); 1088 return 0; 1089 } else { 1090 return syscall(SYS_NANOSLEEP, cast(size_t)req, cast(size_t)rem).withErrorno; 1091 } 1092 } 1093 1094 extern(C) private ssize_t close(int fd) nothrow 1095 { 1096 logf("HOOKED CLOSE FD=%d", fd); 1097 deregisterFd(fd); 1098 return cast(int)withErrorno(syscall(SYS_CLOSE, fd)); 1099 }