1 module photon.linux.core; 2 version(linux): 3 private: 4 5 import std.stdio; 6 import std.string; 7 import std.format; 8 import std.exception; 9 import std.conv; 10 import std.array; 11 import core.thread; 12 import core.internal.spinlock; 13 import core.sys.posix.sys.types; 14 import core.sys.posix.sys.socket; 15 import core.sys.posix.poll; 16 import core.sys.posix.netinet.in_; 17 import core.sys.posix.unistd; 18 import core.sys.linux.epoll; 19 import core.sys.linux.timerfd; 20 import core.sync.mutex; 21 import core.stdc.errno; 22 import core.atomic; 23 import core.sys.posix.stdlib: abort; 24 import core.sys.posix.fcntl; 25 import core.memory; 26 import core.sys.posix.sys.mman; 27 import core.sys.posix.pthread; 28 import core.sys.linux.sys.signalfd; 29 30 import photon.linux.support; 31 import photon.linux.syscalls; 32 import photon.ds.common; 33 import photon.ds.intrusive_queue; 34 35 // T becomes thread-local b/c it's stolen from shared resource 36 auto steal(T)(ref shared T arg) 37 { 38 for (;;) { 39 auto v = atomicLoad(arg); 40 if(cas(&arg, v, cast(shared(T))null)) return v; 41 } 42 } 43 44 45 shared struct RawEvent { 46 nothrow: 47 this(int init) { 48 fd = eventfd(init, 0); 49 } 50 51 void waitAndReset() { 52 byte[8] bytes = void; 53 ssize_t r; 54 do { 55 r = raw_read(fd, bytes.ptr, 8); 56 } while(r < 0 && errno == EINTR); 57 r.checked("event reset"); 58 } 59 60 void trigger() { 61 union U { 62 ulong cnt; 63 ubyte[8] bytes; 64 } 65 U value; 66 value.cnt = 1; 67 ssize_t r; 68 do { 69 r = raw_write(fd, value.bytes.ptr, 8); 70 } while(r < 0 && errno == EINTR); 71 r.checked("event trigger"); 72 } 73 74 int fd; 75 } 76 77 struct Timer { 78 nothrow: 79 private int timerfd; 80 81 82 int fd() { 83 return timerfd; 84 } 85 86 static void ms2ts(timespec *ts, ulong ms) 87 { 88 ts.tv_sec = ms / 1000; 89 ts.tv_nsec = (ms % 1000) * 1000000; 90 } 91 92 void arm(int timeout) { 93 timespec ts_timeout; 94 ms2ts(&ts_timeout, timeout); //convert miliseconds to timespec 95 itimerspec its; 96 its.it_value = ts_timeout; 97 its.it_interval.tv_sec = 0; 98 its.it_interval.tv_nsec = 0; 99 timerfd_settime(timerfd, 0, &its, null); 100 } 101 102 void disarm() { 103 itimerspec its; // zeros 104 timerfd_settime(timerfd, 0, &its, null); 105 } 106 107 void dispose() { 108 close(timerfd).checked; 109 } 110 } 111 112 Timer timer() { 113 int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC).checked; 114 interceptFd!(Fcntl.noop)(timerfd); 115 return Timer(timerfd); 116 } 117 118 enum EventState { ready, fibers }; 119 120 shared struct Event { 121 SpinLock lock = SpinLock(SpinLock.Contention.brief); 122 EventState state = EventState.ready; 123 FiberExt awaiting; 124 125 void reset() { 126 lock.lock(); 127 auto f = awaiting.unshared; 128 awaiting = null; 129 state = EventState.ready; 130 lock.unlock(); 131 while(f !is null) { 132 auto next = f.next; 133 f.schedule(); 134 f = next; 135 } 136 } 137 138 bool ready(){ 139 lock.lock(); 140 scope(exit) lock.unlock(); 141 return state == EventState.ready; 142 } 143 144 void await(){ 145 lock.lock(); 146 scope(exit) lock.unlock(); 147 if (state == EventState.ready) return; 148 if (currentFiber !is null) { 149 currentFiber.next = awaiting.unshared; 150 awaiting = cast(shared)currentFiber; 151 state = EventState.fibers; 152 } 153 else abort(); //TODO: threads 154 } 155 } 156 157 struct AwaitingFiber { 158 shared FiberExt fiber; 159 AwaitingFiber* next; 160 161 void scheduleAll(int wakeFd) nothrow 162 { 163 auto w = &this; 164 FiberExt head; 165 // first process all AwaitingFibers since they are on stack 166 do { 167 auto fiber = steal(w.fiber); 168 if (fiber) { 169 fiber.unshared.next = head; 170 head = fiber.unshared; 171 } 172 w = w.next; 173 } while(w); 174 while(head) { 175 logf("Waking with FD=%d", wakeFd); 176 head.wakeFd = wakeFd; 177 head.schedule(); 178 head = head.next; 179 } 180 } 181 } 182 183 class FiberExt : Fiber { 184 FiberExt next; 185 uint numScheduler; 186 int wakeFd; // recieves fd that woken us up 187 188 enum PAGESIZE = 4096; 189 190 this(void function() fn, uint numSched) nothrow { 191 super(fn); 192 numScheduler = numSched; 193 } 194 195 this(void delegate() dg, uint numSched) nothrow { 196 super(dg); 197 numScheduler = numSched; 198 } 199 200 void schedule() nothrow 201 { 202 scheds[numScheduler].queue.push(this); 203 } 204 } 205 206 FiberExt currentFiber; 207 shared RawEvent termination; // termination event, triggered once last fiber exits 208 shared pthread_t eventLoop; // event loop, runs outside of D runtime 209 shared int alive; // count of non-terminated Fibers scheduled 210 211 struct SchedulerBlock { 212 shared IntrusiveQueue!(FiberExt, RawEvent) queue; 213 shared uint assigned; 214 size_t[2] padding; 215 } 216 static assert(SchedulerBlock.sizeof == 64); 217 218 package(photon) shared SchedulerBlock[] scheds; 219 220 enum int MAX_EVENTS = 500; 221 enum int SIGNAL = 42; 222 223 package(photon) void schedulerEntry(size_t n) 224 { 225 int tid = gettid(); 226 cpu_set_t mask; 227 CPU_SET(n, &mask); 228 sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity"); 229 shared SchedulerBlock* sched = scheds.ptr + n; 230 while (alive > 0) { 231 sched.queue.event.waitAndReset(); 232 for(;;) { 233 FiberExt f = sched.queue.drain(); 234 if (f is null) break; // drained an empty queue, time to sleep 235 do { 236 auto next = f.next; //save next, it will be reused on scheduling 237 currentFiber = f; 238 logf("Fiber %x started", cast(void*)f); 239 try { 240 f.call(); 241 } 242 catch (Exception e) { 243 stderr.writeln(e); 244 atomicOp!"-="(alive, 1); 245 } 246 if (f.state == FiberExt.State.TERM) { 247 logf("Fiber %s terminated", cast(void*)f); 248 atomicOp!"-="(alive, 1); 249 } 250 f = next; 251 } while(f !is null); 252 } 253 } 254 termination.trigger(); 255 } 256 257 public void go(void delegate() func) { 258 import std.random; 259 uint choice; 260 if (scheds.length == 1) choice = 0; 261 else { 262 uint a = uniform!"[)"(0, cast(uint)scheds.length); 263 uint b = uniform!"[)"(0, cast(uint)scheds.length-1); 264 if (a == b) b = cast(uint)scheds.length-1; 265 uint loadA = scheds[a].assigned; 266 uint loadB = scheds[b].assigned; 267 if (loadA < loadB) choice = a; 268 else choice = b; 269 } 270 atomicOp!"+="(scheds[choice].assigned, 1); 271 atomicOp!"+="(alive, 1); 272 auto f = new FiberExt(func, choice); 273 logf("Assigned %x -> %d scheduler", cast(void*)f, choice); 274 f.schedule(); 275 } 276 277 shared Descriptor[] descriptors; 278 shared int event_loop_fd; 279 shared int signal_loop_fd; 280 281 enum ReaderState: uint { 282 EMPTY = 0, 283 UNCERTAIN = 1, 284 READING = 2, 285 READY = 3 286 } 287 288 enum WriterState: uint { 289 READY = 0, 290 UNCERTAIN = 1, 291 WRITING = 2, 292 FULL = 3 293 } 294 295 enum DescriptorState: uint { 296 NOT_INITED, 297 INITIALIZING, 298 NONBLOCKING, 299 THREADPOOL 300 } 301 302 // list of awaiting fibers 303 shared struct Descriptor { 304 ReaderState _readerState; 305 AwaitingFiber* _readerWaits; 306 WriterState _writerState; 307 AwaitingFiber* _writerWaits; 308 DescriptorState state; 309 nothrow: 310 ReaderState readerState()() { 311 return atomicLoad(_readerState); 312 } 313 314 WriterState writerState()() { 315 return atomicLoad(_writerState); 316 } 317 318 // try to change state & return whatever it happend to be in the end 319 bool changeReader()(ReaderState from, ReaderState to) { 320 return cas(&_readerState, from, to); 321 } 322 323 // ditto for writer 324 bool changeWriter()(WriterState from, WriterState to) { 325 return cas(&_writerState, from, to); 326 } 327 328 // 329 shared(AwaitingFiber)* readWaiters()() { 330 return atomicLoad(_readerWaits); 331 } 332 333 // 334 shared(AwaitingFiber)* writeWaiters()(){ 335 return atomicLoad(_writerWaits); 336 } 337 338 // try to enqueue reader fiber given old head 339 bool enqueueReader()(shared(AwaitingFiber)* fiber) { 340 auto head = readWaiters; 341 if (head == fiber) { 342 return true; // TODO: HACK 343 } 344 fiber.next = head; 345 return cas(&_readerWaits, head, fiber); 346 } 347 348 void removeReader()(shared(AwaitingFiber)* fiber) { 349 auto head = steal(_readerWaits); 350 if (head is null || head.next is null) return; 351 head = removeFromList(head.unshared, fiber); 352 cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null); 353 } 354 355 // try to enqueue writer fiber given old head 356 bool enqueueWriter()(shared(AwaitingFiber)* fiber) { 357 auto head = writeWaiters; 358 if (head == fiber) { 359 return true; // TODO: HACK 360 } 361 fiber.next = head; 362 return cas(&_writerWaits, head, fiber); 363 } 364 365 void removeWriter()(shared(AwaitingFiber)* fiber) { 366 auto head = steal(_writerWaits); 367 if (head is null || head.next is null) return; 368 head = removeFromList(head.unshared, fiber); 369 cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null); 370 } 371 372 // try to schedule readers - if fails - someone added a reader, it's now his job to check state 373 void scheduleReaders()(int wakeFd) { 374 auto w = steal(_readerWaits); 375 if (w) w.unshared.scheduleAll(wakeFd); 376 } 377 378 // try to schedule writers, ditto 379 void scheduleWriters()(int wakeFd) { 380 auto w = steal(_writerWaits); 381 if (w) w.unshared.scheduleAll(wakeFd); 382 } 383 } 384 385 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*) 386 { 387 version(photon_tracing) printStats(); 388 _exit(9); 389 } 390 391 version(photon_tracing) 392 void printStats() 393 { 394 // TODO: report on various events in eventloop/scheduler 395 string msg = "Tracing report:\n\n"; 396 write(2, msg.ptr, msg.length); 397 } 398 399 public void startloop() 400 { 401 import core.cpuid; 402 uint threads = threadsPerCPU; 403 404 event_loop_fd = cast(int)epoll_create1(0).checked("ERROR: Failed to create event-loop!"); 405 // use RT signals, disable default termination on signal received 406 sigset_t mask; 407 sigemptyset(&mask); 408 sigaddset(&mask, SIGNAL); 409 pthread_sigmask(SIG_BLOCK, &mask, null).checked; 410 signal_loop_fd = cast(int)signalfd(-1, &mask, 0).checked("ERROR: Failed to create signalfd!"); 411 412 epoll_event event; 413 event.events = EPOLLIN; 414 event.data.fd = signal_loop_fd; 415 epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, signal_loop_fd, &event).checked; 416 417 termination = RawEvent(0); 418 event.events = EPOLLIN; 419 event.data.fd = termination.fd; 420 epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, termination.fd, &event).checked; 421 422 { 423 424 sigaction_t action; 425 action.sa_sigaction = &graceful_shutdown_on_signal; 426 sigaction(SIGTERM, &action, null).checked; 427 } 428 429 ssize_t fdMax = sysconf(_SC_OPEN_MAX).checked; 430 descriptors = (cast(shared(Descriptor*)) mmap(null, fdMax * Descriptor.sizeof, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0))[0..fdMax]; 431 scheds = new SchedulerBlock[threads]; 432 foreach(ref sched; scheds) { 433 sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0)); 434 } 435 eventLoop = pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null); 436 } 437 438 package(photon) void stoploop() 439 { 440 void* ret; 441 pthread_join(eventLoop, &ret); 442 } 443 444 extern(C) void* processEventsEntry(void*) 445 { 446 for (;;) { 447 epoll_event[MAX_EVENTS] events = void; 448 signalfd_siginfo[20] fdsi = void; 449 int r; 450 do { 451 r = epoll_wait(event_loop_fd, events.ptr, MAX_EVENTS, -1); 452 } while (r < 0 && errno == EINTR); 453 checked(r); 454 for (int n = 0; n < r; n++) { 455 int fd = events[n].data.fd; 456 if (fd == termination.fd) { 457 foreach(ref s; scheds) s.queue.event.trigger(); 458 return null; 459 } 460 else if (fd == signal_loop_fd) { 461 logf("Intercepted our aio SIGNAL"); 462 ssize_t r2 = raw_read(signal_loop_fd, &fdsi, fdsi.sizeof); 463 logf("aio events = %d", r2 / signalfd_siginfo.sizeof); 464 if (r2 % signalfd_siginfo.sizeof != 0) 465 checked(r2, "ERROR: failed read on signalfd"); 466 467 for(int i = 0; i < r2 / signalfd_siginfo.sizeof; i++) { //TODO: stress test multiple signals 468 logf("Processing aio event idx = %d", i); 469 if (fdsi[i].ssi_signo == SIGNAL) { 470 logf("HIT our SIGNAL"); 471 auto fiber = cast(FiberExt)cast(void*)fdsi[i].ssi_ptr; 472 fiber.schedule(); 473 } 474 } 475 } 476 else { 477 auto descriptor = descriptors.ptr + fd; 478 if (descriptor.state == DescriptorState.NONBLOCKING) { 479 if (events[n].events & EPOLLIN) { 480 logf("Read event for fd=%d", fd); 481 auto state = descriptor.readerState; 482 logf("read state = %d", state); 483 final switch(state) with(ReaderState) { 484 case EMPTY: 485 logf("Trying to schedule readers"); 486 descriptor.changeReader(EMPTY, READY); 487 descriptor.scheduleReaders(fd); 488 logf("Scheduled readers"); 489 break; 490 case UNCERTAIN: 491 descriptor.changeReader(UNCERTAIN, READY); 492 break; 493 case READING: 494 if (!descriptor.changeReader(READING, UNCERTAIN)) { 495 if (descriptor.changeReader(EMPTY, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake readers 496 descriptor.scheduleReaders(fd); 497 } 498 break; 499 case READY: 500 descriptor.scheduleReaders(fd); 501 break; 502 } 503 logf("Awaits %x", cast(void*)descriptor.readWaiters); 504 } 505 if (events[n].events & EPOLLOUT) { 506 logf("Write event for fd=%d", fd); 507 auto state = descriptor.writerState; 508 logf("write state = %d", state); 509 final switch(state) with(WriterState) { 510 case FULL: 511 descriptor.changeWriter(FULL, READY); 512 descriptor.scheduleWriters(fd); 513 break; 514 case UNCERTAIN: 515 descriptor.changeWriter(UNCERTAIN, READY); 516 break; 517 case WRITING: 518 if (!descriptor.changeWriter(WRITING, UNCERTAIN)) { 519 if (descriptor.changeWriter(FULL, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake writers 520 descriptor.scheduleWriters(fd); 521 } 522 break; 523 case READY: 524 descriptor.scheduleWriters(fd); 525 break; 526 } 527 logf("Awaits %x", cast(void*)descriptor.writeWaiters); 528 } 529 } 530 } 531 } 532 } 533 } 534 535 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF } 536 enum SyscallKind { accept, read, write, connect } 537 538 // intercept - a filter for file descriptor, changes flags and register on first use 539 void interceptFd(Fcntl needsFcntl)(int fd) nothrow { 540 logf("Hit interceptFD"); 541 if (fd < 0 || fd >= descriptors.length) return; 542 if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) { 543 logf("First use, registering fd = %s", fd); 544 static if(needsFcntl == Fcntl.explicit) { 545 int flags = fcntl(fd, F_GETFL, 0); 546 fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked; 547 logf("Setting FCNTL. %x", cast(void*)currentFiber); 548 } 549 epoll_event event; 550 event.events = EPOLLIN | EPOLLOUT | EPOLLET; 551 event.data.fd = fd; 552 if (epoll_ctl(event_loop_fd, EPOLL_CTL_ADD, fd, &event) < 0 && errno == EPERM) { 553 logf("isSocket = false FD = %s", fd); 554 descriptors[fd].state = DescriptorState.THREADPOOL; 555 } 556 else { 557 logf("isSocket = true FD = %s", fd); 558 descriptors[fd].state = DescriptorState.NONBLOCKING; 559 } 560 } 561 } 562 563 void deregisterFd(int fd) nothrow { 564 if(fd >= 0 && fd < descriptors.length) { 565 auto descriptor = descriptors.ptr + fd; 566 atomicStore(descriptor._writerState, WriterState.READY); 567 atomicStore(descriptor._readerState, ReaderState.EMPTY); 568 descriptor.scheduleReaders(fd); 569 descriptor.scheduleWriters(fd); 570 atomicStore(descriptor.state, DescriptorState.NOT_INITED); 571 } 572 } 573 574 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...) 575 (int fd, T args) nothrow { 576 if (currentFiber is null) { 577 logf("%s PASSTHROUGH FD=%s", name, fd); 578 return syscall(ident, fd, args).withErrorno; 579 } 580 else { 581 logf("HOOKED %s FD=%d", name, fd); 582 interceptFd!(fcntlStyle)(fd); 583 shared(Descriptor)* descriptor = descriptors.ptr + fd; 584 if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) { 585 logf("%s syscall THREADPOLL FD=%d", name, fd); 586 //TODO: offload syscall to thread-pool 587 return syscall(ident, fd, args).withErrorno; 588 } 589 L_start: 590 shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null); 591 // set flags argument if able to avoid direct fcntl calls 592 static if (fcntlStyle != Fcntl.explicit) 593 { 594 args[2] |= fcntlStyle; 595 } 596 //if (kind == SyscallKind.accept) 597 logf("kind:s args:%s", kind, args); 598 static if(kind == SyscallKind.accept || kind == SyscallKind.read) { 599 auto state = descriptor.readerState; 600 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 601 final switch (state) with (ReaderState) { 602 case EMPTY: 603 logf("EMPTY - enqueue reader"); 604 if (!descriptor.enqueueReader(&await)) goto L_start; 605 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 606 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd); 607 FiberExt.yield(); 608 goto L_start; 609 case UNCERTAIN: 610 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING 611 goto case READING; 612 case READY: 613 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads 614 goto case READING; 615 case READING: 616 ssize_t resp = syscall(ident, fd, args); 617 static if (kind == SyscallKind.accept) { 618 if (resp >= 0) // for accept we never know if we emptied the queue 619 descriptor.changeReader(READING, UNCERTAIN); 620 else if (resp == -ERR || resp == -EAGAIN) { 621 if (descriptor.changeReader(READING, EMPTY)) 622 goto case EMPTY; 623 goto L_start; // became UNCERTAIN or READY in meantime 624 } 625 } 626 else static if (kind == SyscallKind.read) { 627 if (resp == args[1]) // length is 2nd in (buf, length, ...) 628 descriptor.changeReader(READING, UNCERTAIN); 629 else if(resp >= 0) 630 descriptor.changeReader(READING, EMPTY); 631 else if (resp == -ERR || resp == -EAGAIN) { 632 if (descriptor.changeReader(READING, EMPTY)) 633 goto case EMPTY; 634 goto L_start; // became UNCERTAIN or READY in meantime 635 } 636 } 637 else 638 static assert(0); 639 return withErrorno(resp); 640 } 641 } 642 else static if(kind == SyscallKind.write || kind == SyscallKind.connect) { 643 //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking? 644 auto state = descriptor.writerState; 645 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 646 final switch (state) with (WriterState) { 647 case FULL: 648 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber); 649 if (!descriptor.enqueueWriter(&await)) goto L_start; 650 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 651 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd); 652 FiberExt.yield(); 653 goto L_start; 654 case UNCERTAIN: 655 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber); 656 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING 657 goto case WRITING; 658 case READY: 659 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes 660 goto case WRITING; 661 case WRITING: 662 ssize_t resp = syscall(ident, fd, args); 663 static if (kind == SyscallKind.connect) { 664 if(resp >= 0) { 665 descriptor.changeWriter(WRITING, READY); 666 } 667 else if (resp == -ERR || resp == -EALREADY) { 668 if (descriptor.changeWriter(WRITING, FULL)) { 669 goto case FULL; 670 } 671 goto L_start; // became UNCERTAIN or READY in meantime 672 } 673 return withErrorno(resp); 674 } 675 else { 676 if (resp == args[1]) // (buf, len) args to syscall 677 descriptor.changeWriter(WRITING, UNCERTAIN); 678 else if(resp >= 0) { 679 logf("Short-write on FD=%d, become FULL", fd); 680 descriptor.changeWriter(WRITING, FULL); 681 } 682 else if (resp == -ERR || resp == -EAGAIN) { 683 if (descriptor.changeWriter(WRITING, FULL)) { 684 logf("Sudden block on FD=%d, become FULL", fd); 685 goto case FULL; 686 } 687 goto L_start; // became UNCERTAIN or READY in meantime 688 } 689 return withErrorno(resp); 690 } 691 } 692 } 693 assert(0); 694 } 695 } 696 697 class Channel(T) { 698 private T[] buf; 699 private size_t size; 700 private Event ev; 701 void put(T item) { 702 if (buf.length == size) { 703 ev.await(); 704 } 705 if (buf.length == 0) ev.reset(); 706 buf ~= item; 707 } 708 auto front() { 709 if (buf.length == 0) { 710 ev.await(); 711 } 712 if (buf.length == size) ev.reset(); 713 return buf[$-1]; 714 } 715 void popFront() { 716 buf = buf[0..$-1]; 717 ev.reset(); 718 buf.assumeSafeAppend(); 719 } 720 bool empty() { 721 return buf.length == 0; 722 } 723 } 724 725 Channel!T channel(T)(size_t size) { 726 return new Channel(new T[size], size, Event()); 727 } 728 729 // ====================================================================================== 730 // SYSCALL warappers intercepts 731 // ====================================================================================== 732 733 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow 734 { 735 return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK) 736 (fd, cast(size_t)buf, count); 737 } 738 739 extern(C) ssize_t write(int fd, const void *buf, size_t count) 740 { 741 return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 742 (fd, cast(size_t)buf, count); 743 } 744 745 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen) 746 { 747 return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK) 748 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 749 } 750 751 extern(C) ssize_t accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) 752 { 753 return universalSyscall!(SYS_ACCEPT4, "accept4", SyscallKind.accept, Fcntl.sock, EWOULDBLOCK) 754 (sockfd, cast(size_t) addr, cast(size_t) addrlen, flags); 755 } 756 757 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen) 758 { 759 return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS) 760 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 761 } 762 763 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, 764 const sockaddr *dest_addr, socklen_t addrlen) 765 { 766 return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 767 (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen); 768 } 769 770 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow { 771 sockaddr_in src_addr; 772 src_addr.sin_family = AF_INET; 773 src_addr.sin_port = 0; 774 src_addr.sin_addr.s_addr = htonl(INADDR_ANY); 775 ssize_t addrlen = sockaddr_in.sizeof; 776 return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen); 777 } 778 779 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, 780 sockaddr *src_addr, ssize_t* addrlen) nothrow 781 { 782 return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK) 783 (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen); 784 } 785 786 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout) 787 { 788 bool nonBlockingCheck(ref ssize_t result) { 789 bool uncertain; 790 L_cacheloop: 791 foreach (ref fd; fds[0..nfds]) { 792 interceptFd!(Fcntl.explicit)(fd.fd); 793 fd.revents = 0; 794 auto descriptor = descriptors.ptr + fd.fd; 795 if (fd.events & POLLIN) { 796 auto state = descriptor.readerState; 797 logf("Found event %d for reader in select", state); 798 switch(state) with(ReaderState) { 799 case READY: 800 fd.revents |= POLLIN; 801 break; 802 case EMPTY: 803 break; 804 default: 805 uncertain = true; 806 break L_cacheloop; 807 } 808 } 809 if (fd.events & POLLOUT) { 810 auto state = descriptor.writerState; 811 logf("Found event %d for writer in select", state); 812 switch(state) with(WriterState) { 813 case READY: 814 fd.revents |= POLLOUT; 815 break; 816 case FULL: 817 break; 818 default: 819 uncertain = true; 820 break L_cacheloop; 821 } 822 } 823 } 824 // fallback to system poll call if descriptor state is uncertain 825 if (uncertain) { 826 logf("Fallback to system poll, descriptors have uncertain state"); 827 ssize_t p = raw_poll(fds, nfds, 0); 828 if (p != 0) { 829 result = p; 830 logf("Raw poll returns %d", result); 831 return true; 832 } 833 } 834 else { 835 ssize_t j = 0; 836 foreach (i; 0..nfds) { 837 if (fds[i].revents) { 838 fds[j++] = fds[i]; 839 } 840 } 841 logf("Using our own event cache: %d events", j); 842 if (j > 0) { 843 result = cast(ssize_t)j; 844 return true; 845 } 846 } 847 return false; 848 } 849 if (currentFiber is null) { 850 logf("POLL PASSTHROUGH!"); 851 return raw_poll(fds, nfds, timeout); 852 } 853 else { 854 logf("HOOKED POLL %d fds timeout %d", nfds, timeout); 855 if (nfds < 0) return -EINVAL.withErrorno; 856 if (nfds == 0) { 857 if (timeout == 0) return 0; 858 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 859 Timer tm = timer(); 860 descriptors[tm.fd].enqueueReader(&aw); 861 scope(exit) tm.dispose(); 862 tm.arm(timeout); 863 logf("Timer fd=%d", tm.fd); 864 Fiber.yield(); 865 logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd); 866 return 0; 867 } 868 foreach(ref fd; fds[0..nfds]) { 869 if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno; 870 fd.revents = 0; 871 } 872 ssize_t result = 0; 873 if (timeout <= 0) return raw_poll(fds, nfds, timeout); 874 if (nonBlockingCheck(result)) return result; 875 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 876 foreach (i; 0..nfds) { 877 if (fds[i].events & POLLIN) 878 descriptors[fds[i].fd].enqueueReader(&aw); 879 else if(fds[i].events & POLLOUT) 880 descriptors[fds[i].fd].enqueueWriter(&aw); 881 } 882 Timer tm = timer(); 883 scope(exit) tm.dispose(); 884 tm.arm(timeout); 885 descriptors[tm.fd].enqueueReader(&aw); 886 Fiber.yield(); 887 tm.disarm(); 888 atomicStore(descriptors[tm.fd]._readerWaits, cast(shared(AwaitingFiber)*)null); 889 foreach (i; 0..nfds) { 890 if (fds[i].events & POLLIN) 891 descriptors[fds[i].fd].removeReader(&aw); 892 else if(fds[i].events & POLLOUT) 893 descriptors[fds[i].fd].removeWriter(&aw); 894 } 895 logf("Woke up after select %x. WakeFD=%d", cast(void*)currentFiber, currentFiber.wakeFd); 896 if (currentFiber.wakeFd == tm.fd) return 0; 897 else { 898 nonBlockingCheck(result); 899 return result; 900 } 901 } 902 } 903 904 extern(C) private ssize_t close(int fd) nothrow 905 { 906 logf("HOOKED CLOSE FD=%d", fd); 907 deregisterFd(fd); 908 return cast(int)withErrorno(syscall(SYS_CLOSE, fd)); 909 }