1 module photon.macos.core; 2 version(OSX): 3 private: 4 5 import std.stdio; 6 import std.string; 7 import std.format; 8 import std.exception; 9 import std.conv; 10 import std.array; 11 import core.thread; 12 import core.internal.spinlock; 13 import core.sys.posix.sys.types; 14 import core.sys.posix.sys.socket; 15 import core.sys.posix.poll; 16 import core.sys.posix.netinet.in_; 17 import core.sys.freebsd.unistd; 18 import core.sync.mutex; 19 import core.stdc.errno; 20 import core.stdc.signal; 21 import core.stdc.time; 22 import core.atomic; 23 import core.sys.posix.stdlib: abort; 24 import core.sys.posix.fcntl; 25 import core.memory; 26 import core.sys.posix.sys.mman; 27 import core.sys.posix.pthread; 28 import core.sys.darwin.sys.event; 29 import core.sys.darwin.mach.thread_act; 30 31 import photon.macos.support; 32 import photon.ds.common; 33 import photon.ds.intrusive_queue; 34 35 alias KEvent = kevent_t; 36 enum SYS_READ = 3; 37 enum SYS_WRITE = 4; 38 enum SYS_ACCEPT = 30; 39 enum SYS_CONNECT = 98; 40 enum SYS_SENDTO = 133; 41 enum SYS_RECVFROM = 29; 42 enum SYS_CLOSE = 6; 43 enum SYS_GETTID = 286; 44 enum SYS_POLL = 230; 45 46 struct thread; 47 alias thread_t = thread*; 48 extern(C) thread_t current_thread(); 49 50 // T becomes thread-local b/c it's stolen from shared resource 51 auto steal(T)(ref shared T arg) 52 { 53 for (;;) { 54 auto v = atomicLoad(arg); 55 if(cas(&arg, v, cast(shared(T))null)) return v; 56 } 57 } 58 59 shared struct RawEvent { 60 nothrow: 61 this(int dummy) { 62 int[2] fds; 63 pipe(fds).checked("event creation"); 64 this.fds = fds; 65 } 66 67 void waitAndReset() { 68 byte[1] bytes = void; 69 ssize_t r; 70 do { 71 r = raw_read(fds[0], bytes.ptr, 1); 72 } while(r < 0 && errno == EINTR); 73 r.checked("event reset"); 74 } 75 76 void trigger() { 77 ubyte[1] bytes; 78 ssize_t r; 79 do { 80 r = raw_write(fds[1], bytes.ptr, 1); 81 } while(r < 0 && errno == EINTR); 82 r.checked("event trigger"); 83 } 84 85 private int[2] fds; 86 } 87 88 shared size_t timerId; 89 90 struct Timer { 91 nothrow: 92 this(size_t id) { 93 this.id = id; 94 } 95 96 /// 97 void wait(const timespec* ts) { 98 wait(ts.tv_sec.seconds + ts.tv_nsec.nsecs); 99 } 100 101 /// 102 void wait(Duration dur) { 103 KEvent event; 104 event.ident = id; 105 event.filter = EVFILT_TIMER; 106 event.flags = EV_ADD | EV_ENABLE | EV_ONESHOT; 107 event.fflags = 0; 108 event.data = dur.total!"msecs"; 109 event.udata = cast(void*)(cast(size_t)cast(void*)currentFiber | 0x1); 110 timespec timeout; 111 timeout.tv_nsec = 1000; 112 kevent(kq, &event, 1, null, 0, &timeout).checked("arming the timer"); 113 FiberExt.yield(); 114 } 115 116 private void waitThread(const timespec* ts) { 117 auto dur = ts.tv_sec.seconds + ts.tv_nsec.nsecs; 118 KEvent event; 119 event.ident = id; 120 event.filter = EVFILT_TIMER; 121 event.flags = EV_ADD | EV_ENABLE | EV_ONESHOT; 122 event.fflags = 0; 123 event.data = dur.total!"msecs"; 124 event.udata = cast(void*)(cast(size_t)mach_thread_self() << 1); 125 timespec timeout; 126 timeout.tv_nsec = 1000; 127 kevent(kq, &event, 1, null, 0, &timeout).checked("arming the timer for a thread"); 128 } 129 130 private size_t id; 131 } 132 133 /// Allocate a timer 134 public nothrow auto timer() { 135 return Timer(atomicFetchAdd(timerId, 1)); 136 } 137 138 139 public struct Event { 140 nothrow: 141 @disable this(this); 142 143 this(bool signaled) { 144 int[2] fds; 145 pipe(fds).checked; 146 if (signaled) trigger(); 147 this.fds = fds; 148 } 149 150 /// Wait for the event to be triggered, then reset and return atomically 151 void waitAndReset() { 152 byte[4096] bytes = void; 153 ssize_t r; 154 do { 155 r = read(fds[0], bytes.ptr, bytes.sizeof); 156 } while(r < 0 && errno == EINTR); 157 } 158 159 void waitAndReset() shared { 160 this.unshared.waitAndReset(); 161 } 162 163 /// Trigger the event. 164 void trigger() { 165 ubyte[1] bytes = void; 166 ssize_t r; 167 do { 168 r = write(fds[1], bytes.ptr, 1); 169 } while(r < 0 && errno == EINTR); 170 } 171 172 void trigger() shared { 173 this.unshared.trigger(); 174 } 175 176 /// 177 void dispose() { 178 close(fds[0]); 179 close(fds[1]); 180 } 181 182 void dispose() shared { 183 this.unshared.dispose(); 184 } 185 186 private int[2] fds; 187 } 188 189 /// 190 public nothrow auto event(bool signaled) { 191 return Event(signaled); 192 } 193 194 195 public struct Semaphore { 196 nothrow: 197 @disable this(this); 198 199 this(int initial) { 200 int[2] fds; 201 pipe(fds).checked; 202 if (initial > 0) { 203 trigger(initial); 204 } 205 this.fds = fds; 206 } 207 208 /// 209 void wait() { 210 byte[1] bytes = void; 211 ssize_t r; 212 do { 213 r = read(fds[0], bytes.ptr, bytes.sizeof); 214 } while(r < 0 && errno == EINTR); 215 } 216 217 /// 218 void wait() shared { 219 this.unshared.wait(); 220 } 221 222 /// 223 void trigger(int count) { 224 ubyte[4096] bytes = void; 225 ssize_t size = count > 4096 ? 4096 : count; 226 ssize_t r; 227 do { 228 r = write(fds[1], bytes.ptr, size); 229 } while(r < 0 && errno == EINTR); 230 } 231 232 /// 233 void trigger(int count) shared { 234 this.unshared.trigger(count); 235 } 236 237 /// 238 void dispose() { 239 close(fds[0]); 240 close(fds[1]); 241 } 242 243 /// 244 void dispose() shared { 245 this.unshared.dispose(); 246 } 247 248 private int[2] fds; 249 } 250 251 /// 252 public nothrow auto semaphore(int initial) { 253 return Semaphore(initial); 254 } 255 256 struct AwaitingFiber { 257 shared FiberExt fiber; 258 AwaitingFiber* next; 259 260 void scheduleAll(int wakeFd) nothrow 261 { 262 auto w = &this; 263 FiberExt head; 264 // first process all AwaitingFibers since they are on stack 265 do { 266 auto fiber = steal(w.fiber); 267 if (fiber) { 268 fiber.unshared.next = head; 269 head = fiber.unshared; 270 } 271 w = w.next; 272 } while(w); 273 while(head) { 274 logf("Waking with FD=%d", wakeFd); 275 head.wakeFd = wakeFd; 276 auto next = head.next; 277 head.schedule(); 278 head = next; 279 } 280 } 281 } 282 283 class FiberExt : Fiber { 284 FiberExt next; 285 uint numScheduler; 286 int wakeFd; // recieves fd that woken us up 287 288 enum PAGESIZE = 4096; 289 290 this(void function() fn, uint numSched) nothrow { 291 super(fn); 292 numScheduler = numSched; 293 } 294 295 this(void delegate() dg, uint numSched) nothrow { 296 super(dg); 297 numScheduler = numSched; 298 } 299 300 void schedule() nothrow 301 { 302 scheds[numScheduler].queue.push(this); 303 } 304 } 305 306 FiberExt currentFiber; 307 shared RawEvent termination; // termination event, triggered once last fiber exits 308 shared pthread_t eventLoop; // event loop, runs outside of D runtime 309 shared int alive; // count of non-terminated Fibers scheduled 310 311 struct SchedulerBlock { 312 shared IntrusiveQueue!(FiberExt, RawEvent) queue; 313 shared uint assigned; 314 size_t[1] padding; 315 } 316 static assert(SchedulerBlock.sizeof == 64); 317 318 package(photon) shared SchedulerBlock[] scheds; 319 320 enum int MAX_EVENTS = 500; 321 enum int SIGNAL = 42; 322 323 324 package(photon) void schedulerEntry(size_t n) 325 { 326 int tid = gettid(); 327 /*cpu_set_t mask; 328 CPU_SET(n, &mask); 329 sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity"); 330 */ 331 shared SchedulerBlock* sched = scheds.ptr + n; 332 while (alive > 0) { 333 sched.queue.event.waitAndReset(); 334 for(;;) { 335 FiberExt f = sched.queue.drain(); 336 if (f is null) break; // drained an empty queue, time to sleep 337 do { 338 auto next = f.next; //save next, it will be reused on scheduling 339 currentFiber = f; 340 logf("Fiber %x started", cast(void*)f); 341 try { 342 f.call(); 343 } 344 catch (Exception e) { 345 stderr.writeln(e); 346 atomicOp!"-="(alive, 1); 347 } 348 if (f.state == FiberExt.State.TERM) { 349 logf("Fiber %s terminated", cast(void*)f); 350 atomicOp!"-="(alive, 1); 351 } 352 f = next; 353 } while(f !is null); 354 } 355 } 356 termination.trigger(); 357 foreach (ref s; scheds) { 358 s.queue.event.trigger(); 359 } 360 } 361 362 /// Convenience overload for functions 363 public void go(void function() func) { 364 go({ func(); }); 365 } 366 367 /// Setup a fiber task to run on the Photon scheduler. 368 public void go(void delegate() func) { 369 import std.random; 370 uint choice; 371 if (scheds.length == 1) choice = 0; 372 else { 373 uint a = uniform!"[)"(0, cast(uint)scheds.length); 374 uint b = uniform!"[)"(0, cast(uint)scheds.length-1); 375 if (a == b) b = cast(uint)scheds.length-1; 376 uint loadA = scheds[a].assigned; 377 uint loadB = scheds[b].assigned; 378 if (loadA < loadB) choice = a; 379 else choice = b; 380 } 381 atomicOp!"+="(scheds[choice].assigned, 1); 382 atomicOp!"+="(alive, 1); 383 auto f = new FiberExt(func, choice); 384 logf("Assigned %x -> %d / %d scheduler", cast(void*)f, choice, scheds.length); 385 f.schedule(); 386 } 387 388 shared Descriptor[] descriptors; 389 shared int event_loop_fd; 390 shared int signal_loop_fd; 391 392 enum ReaderState: uint { 393 EMPTY = 0, 394 UNCERTAIN = 1, 395 READING = 2, 396 READY = 3 397 } 398 399 enum WriterState: uint { 400 READY = 0, 401 UNCERTAIN = 1, 402 WRITING = 2, 403 FULL = 3 404 } 405 406 enum DescriptorState: uint { 407 NOT_INITED, 408 INITIALIZING, 409 NONBLOCKING, 410 THREADPOOL 411 } 412 413 // list of awaiting fibers 414 shared struct Descriptor { 415 ReaderState _readerState; 416 AwaitingFiber* _readerWaits; 417 WriterState _writerState; 418 AwaitingFiber* _writerWaits; 419 DescriptorState state; 420 nothrow: 421 ReaderState readerState()() { 422 return atomicLoad(_readerState); 423 } 424 425 WriterState writerState()() { 426 return atomicLoad(_writerState); 427 } 428 429 // try to change state & return whatever it happend to be in the end 430 bool changeReader()(ReaderState from, ReaderState to) { 431 return cas(&_readerState, from, to); 432 } 433 434 // ditto for writer 435 bool changeWriter()(WriterState from, WriterState to) { 436 return cas(&_writerState, from, to); 437 } 438 439 // 440 shared(AwaitingFiber)* readWaiters()() { 441 return atomicLoad(_readerWaits); 442 } 443 444 // 445 shared(AwaitingFiber)* writeWaiters()(){ 446 return atomicLoad(_writerWaits); 447 } 448 449 // try to enqueue reader fiber given old head 450 bool enqueueReader()(shared(AwaitingFiber)* fiber) { 451 auto head = readWaiters; 452 if (head == fiber) { 453 return true; // TODO: HACK 454 } 455 fiber.next = head; 456 return cas(&_readerWaits, head, fiber); 457 } 458 459 void removeReader()(shared(AwaitingFiber)* fiber) { 460 auto head = steal(_readerWaits); 461 if (head is null || head.next is null) return; 462 head = removeFromList(head.unshared, fiber); 463 cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null); 464 } 465 466 // try to enqueue writer fiber given old head 467 bool enqueueWriter()(shared(AwaitingFiber)* fiber) { 468 auto head = writeWaiters; 469 if (head == fiber) { 470 return true; // TODO: HACK 471 } 472 fiber.next = head; 473 return cas(&_writerWaits, head, fiber); 474 } 475 476 void removeWriter()(shared(AwaitingFiber)* fiber) { 477 auto head = steal(_writerWaits); 478 if (head is null || head.next is null) return; 479 head = removeFromList(head.unshared, fiber); 480 cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null); 481 } 482 483 // try to schedule readers - if fails - someone added a reader, it's now his job to check state 484 void scheduleReaders()(int wakeFd) { 485 auto w = steal(_readerWaits); 486 if (w) w.unshared.scheduleAll(wakeFd); 487 } 488 489 // try to schedule writers, ditto 490 void scheduleWriters()(int wakeFd) { 491 auto w = steal(_writerWaits); 492 if (w) w.unshared.scheduleAll(wakeFd); 493 } 494 } 495 496 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*) 497 { 498 version(photon_tracing) printStats(); 499 _exit(9); 500 } 501 502 version(photon_tracing) 503 void printStats() 504 { 505 // TODO: report on various events in eventloop/scheduler 506 string msg = "Tracing report:\n\n"; 507 write(2, msg.ptr, msg.length); 508 } 509 510 shared int kq; 511 shared int eventLoopId; 512 __gshared ssize_t function (const timespec* req, const timespec* rem) libcNanosleep; 513 Timer[] timerPool; // thread-local pool of preallocated timers 514 515 nothrow auto getSleepTimer() { 516 Timer tm; 517 if (timerPool.length == 0) { 518 tm = timer(); 519 } else { 520 tm = timerPool[$-1]; 521 timerPool.length = timerPool.length-1; 522 } 523 return tm; 524 } 525 526 nothrow void freeSleepTimer(Timer tm) { 527 timerPool.assumeSafeAppend(); 528 timerPool ~= tm; 529 } 530 531 /// Delay fiber execution by `req` duration. 532 public nothrow void delay(T)(T req) 533 if (is(T : const timespec*) || is(T : Duration)) { 534 auto tm = getSleepTimer(); 535 tm.wait(req); 536 freeSleepTimer(tm); 537 } 538 539 public void startloop() 540 { 541 int threads = cast(int)sysconf(_SC_NPROCESSORS_ONLN).checked; 542 kq = kqueue(); 543 enforce(kq != -1); 544 ssize_t fdMax = sysconf(_SC_OPEN_MAX).checked; 545 descriptors = (cast(shared(Descriptor*)) mmap(null, fdMax * Descriptor.sizeof, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0))[0..fdMax]; 546 scheds = new SchedulerBlock[threads]; 547 termination = RawEvent(0); 548 foreach(ref sched; scheds) { 549 sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0)); 550 } 551 pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null); 552 } 553 554 package(photon) void stoploop() 555 { 556 void* ret; 557 pthread_join(cast(pthread_t)eventLoop, &ret); 558 } 559 560 extern(C) void* processEventsEntry(void*) 561 { 562 KEvent[MAX_EVENTS] ke; 563 for (;;) { 564 int cnt = kevent(kq, null, 0, ke.ptr, MAX_EVENTS, null); 565 enforce(cnt >= 0); 566 for (int i = 0; i < cnt; i++) { 567 auto fd = cast(int)ke[i].ident; 568 auto filter = ke[i].filter; 569 auto descriptor = descriptors.ptr + fd; 570 if (filter == EVFILT_READ) { 571 logf("Read event for fd=%d", fd); 572 if (fd == termination.fds[0]) return null; 573 auto state = descriptor.readerState; 574 logf("read state = %d", state); 575 final switch(descriptor.readerState) with(ReaderState) { 576 case EMPTY: 577 logf("Trying to schedule readers"); 578 descriptor.changeReader(EMPTY, READY); 579 descriptor.scheduleReaders(fd); 580 logf("Scheduled readers"); 581 break; 582 case UNCERTAIN: 583 descriptor.changeReader(UNCERTAIN, READY); 584 break; 585 case READING: 586 if (!descriptor.changeReader(READING, UNCERTAIN)) { 587 if (descriptor.changeReader(EMPTY, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake readers 588 descriptor.scheduleReaders(fd); 589 } 590 break; 591 case READY: 592 descriptor.scheduleReaders(fd); 593 break; 594 } 595 } 596 if (filter == EVFILT_WRITE) { 597 logf("Write event for fd=%d", fd); 598 auto state = descriptor.writerState; 599 logf("write state = %d", state); 600 final switch(state) with(WriterState) { 601 case FULL: 602 descriptor.changeWriter(FULL, READY); 603 descriptor.scheduleWriters(fd); 604 break; 605 case UNCERTAIN: 606 descriptor.changeWriter(UNCERTAIN, READY); 607 break; 608 case WRITING: 609 if (!descriptor.changeWriter(WRITING, UNCERTAIN)) { 610 if (descriptor.changeWriter(FULL, UNCERTAIN)) // if became empty - move to UNCERTAIN and wake writers 611 descriptor.scheduleWriters(fd); 612 } 613 break; 614 case READY: 615 descriptor.scheduleWriters(fd); 616 break; 617 } 618 logf("Awaits %x", cast(void*)descriptor.writeWaiters); 619 } 620 if (filter == EVFILT_TIMER) { 621 size_t udata = cast(size_t)ke[i].udata; 622 if (udata & 0x1) { 623 auto ptr = udata & ~1; 624 FiberExt fiber = *cast(FiberExt*)&ptr; 625 fiber.schedule(); 626 } 627 else { 628 auto thread = cast(thread_act_t)udata >> 1; 629 thread_resume(thread); 630 } 631 } 632 descriptors[ke[i].ident].scheduleWriters(cast(int)ke[i].ident); 633 } 634 } 635 } 636 637 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, noop = 0xFFFFF } 638 enum SyscallKind { accept, read, write, connect } 639 640 // intercept - a filter for file descriptor, changes flags and register on first use 641 void interceptFd(Fcntl needsFcntl)(int fd) nothrow { 642 logf("Hit interceptFD"); 643 if (fd < 0 || fd >= descriptors.length) return; 644 if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) { 645 logf("First use, registering fd = %s", fd); 646 static if(needsFcntl == Fcntl.explicit) { 647 int flags = fcntl(fd, F_GETFL, 0); 648 fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked; 649 logf("Setting FCNTL. %x", cast(void*)currentFiber); 650 } 651 KEvent[2] ke; 652 ke[0].ident = fd; 653 ke[1].ident = fd; 654 ke[0].filter = EVFILT_READ; 655 ke[1].filter = EVFILT_WRITE; 656 ke[1].flags = ke[0].flags = EV_ADD | EV_ENABLE | EV_CLEAR; 657 timespec timeout; 658 timeout.tv_nsec = 1000; 659 kevent(kq, ke.ptr, 2, null, 0, &timeout); 660 } 661 } 662 663 void deregisterFd(int fd) nothrow { 664 if(fd >= 0 && fd < descriptors.length) { 665 auto descriptor = descriptors.ptr + fd; 666 atomicStore(descriptor._writerState, WriterState.READY); 667 atomicStore(descriptor._readerState, ReaderState.EMPTY); 668 descriptor.scheduleReaders(fd); 669 descriptor.scheduleWriters(fd); 670 atomicStore(descriptor.state, DescriptorState.NOT_INITED); 671 } 672 } 673 674 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...) 675 (int fd, T args) nothrow { 676 if (currentFiber is null) { 677 logf("%s PASSTHROUGH FD=%s", name, fd); 678 return __syscall(ident, fd, args); 679 } 680 else { 681 logf("HOOKED %s FD=%d", name, fd); 682 interceptFd!(fcntlStyle)(fd); 683 shared(Descriptor)* descriptor = descriptors.ptr + fd; 684 if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) { 685 logf("%s syscall THREADPOLL FD=%d", name, fd); 686 //TODO: offload syscall to thread-pool 687 return __syscall(ident, fd, args); 688 } 689 L_start: 690 shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null); 691 // set flags argument if able to avoid direct fcntl calls 692 static if (fcntlStyle != Fcntl.explicit) 693 { 694 args[2] |= fcntlStyle; 695 } 696 //if (kind == SyscallKind.accept) 697 logf("kind:s args:%s", kind, args); 698 static if(kind == SyscallKind.accept || kind == SyscallKind.read) { 699 auto state = descriptor.readerState; 700 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 701 final switch (state) with (ReaderState) { 702 case EMPTY: 703 logf("EMPTY - enqueue reader"); 704 if (!descriptor.enqueueReader(&await)) goto L_start; 705 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 706 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd); 707 FiberExt.yield(); 708 goto L_start; 709 case UNCERTAIN: 710 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING 711 goto case READING; 712 case READY: 713 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads 714 goto case READING; 715 case READING: 716 ssize_t resp = __syscall(ident, fd, args); 717 static if (kind == SyscallKind.accept) { 718 if (resp >= 0) // for accept we never know if we emptied the queue 719 descriptor.changeReader(READING, UNCERTAIN); 720 else if (errno == ERR || errno == EAGAIN) { 721 if (descriptor.changeReader(READING, EMPTY)) 722 goto case EMPTY; 723 goto L_start; // became UNCERTAIN or READY in meantime 724 } 725 } 726 else static if (kind == SyscallKind.read) { 727 if (resp == args[1]) // length is 2nd in (buf, length, ...) 728 descriptor.changeReader(READING, UNCERTAIN); 729 else if(resp >= 0) 730 descriptor.changeReader(READING, EMPTY); 731 else if (errno == ERR || errno == EAGAIN) { 732 if (descriptor.changeReader(READING, EMPTY)) 733 goto case EMPTY; 734 goto L_start; // became UNCERTAIN or READY in meantime 735 } 736 } 737 else 738 static assert(0); 739 return resp; 740 } 741 } 742 else static if(kind == SyscallKind.write || kind == SyscallKind.connect) { 743 //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking? 744 auto state = descriptor.writerState; 745 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 746 final switch (state) with (WriterState) { 747 case FULL: 748 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber); 749 if (!descriptor.enqueueWriter(&await)) goto L_start; 750 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 751 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd); 752 FiberExt.yield(); 753 goto L_start; 754 case UNCERTAIN: 755 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber); 756 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING 757 goto case WRITING; 758 case READY: 759 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes 760 goto case WRITING; 761 case WRITING: 762 ssize_t resp = __syscall(ident, fd, args); 763 static if (kind == SyscallKind.connect) { 764 if(resp >= 0) { 765 descriptor.changeWriter(WRITING, READY); 766 } 767 else if (errno == ERR || errno == EALREADY) { 768 if (descriptor.changeWriter(WRITING, FULL)) { 769 goto case FULL; 770 } 771 goto L_start; // became UNCERTAIN or READY in meantime 772 } 773 return resp; 774 } 775 else { 776 if (resp == args[1]) // (buf, len) args to syscall 777 descriptor.changeWriter(WRITING, UNCERTAIN); 778 else if(resp >= 0) { 779 logf("Short-write on FD=%d, become FULL", fd); 780 descriptor.changeWriter(WRITING, FULL); 781 } 782 else if (errno == ERR || errno == EAGAIN) { 783 if (descriptor.changeWriter(WRITING, FULL)) { 784 logf("Sudden block on FD=%d, become FULL", fd); 785 goto case FULL; 786 } 787 goto L_start; // became UNCERTAIN or READY in meantime 788 } 789 return resp; 790 } 791 } 792 } 793 assert(0); 794 } 795 } 796 797 // ====================================================================================== 798 // SYSCALL warappers intercepts 799 // ====================================================================================== 800 nothrow: 801 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow 802 { 803 return universalSyscall!(SYS_READ, "read", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK) 804 (fd, cast(size_t)buf, count); 805 } 806 807 extern(C) ssize_t write(int fd, const void *buf, size_t count) 808 { 809 return universalSyscall!(SYS_WRITE, "write", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 810 (fd, cast(size_t)buf, count); 811 } 812 813 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen) 814 { 815 return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK) 816 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 817 } 818 819 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen) 820 { 821 return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS) 822 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 823 } 824 825 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, 826 const sockaddr *dest_addr, socklen_t addrlen) 827 { 828 return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 829 (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen); 830 } 831 832 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow { 833 sockaddr_in src_addr; 834 src_addr.sin_family = AF_INET; 835 src_addr.sin_port = 0; 836 src_addr.sin_addr.s_addr = htonl(INADDR_ANY); 837 ssize_t addrlen = sockaddr_in.sizeof; 838 return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen); 839 } 840 841 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, 842 sockaddr *src_addr, ssize_t* addrlen) nothrow 843 { 844 return universalSyscall!(SYS_RECVFROM, "recvfrom", SyscallKind.read, Fcntl.msg, EWOULDBLOCK) 845 (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen); 846 } 847 848 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout) 849 { 850 bool nonBlockingCheck(ref ssize_t result) { 851 bool uncertain; 852 L_cacheloop: 853 foreach (ref fd; fds[0..nfds]) { 854 interceptFd!(Fcntl.explicit)(fd.fd); 855 fd.revents = 0; 856 auto descriptor = descriptors.ptr + fd.fd; 857 if (fd.events & POLLIN) { 858 auto state = descriptor.readerState; 859 logf("Found event %d for reader in select", state); 860 switch(state) with(ReaderState) { 861 case READY: 862 fd.revents |= POLLIN; 863 break; 864 case EMPTY: 865 break; 866 default: 867 uncertain = true; 868 break L_cacheloop; 869 } 870 } 871 if (fd.events & POLLOUT) { 872 auto state = descriptor.writerState; 873 logf("Found event %d for writer in select", state); 874 switch(state) with(WriterState) { 875 case READY: 876 fd.revents |= POLLOUT; 877 break; 878 case FULL: 879 break; 880 default: 881 uncertain = true; 882 break L_cacheloop; 883 } 884 } 885 } 886 // fallback to system poll call if descriptor state is uncertain 887 if (uncertain) { 888 logf("Fallback to system poll, descriptors have uncertain state"); 889 ssize_t p = raw_poll(fds, nfds, 0); 890 if (p != 0) { 891 result = p; 892 logf("Raw poll returns %d", result); 893 return true; 894 } 895 } 896 else { 897 ssize_t j = 0; 898 foreach (i; 0..nfds) { 899 if (fds[i].revents) { 900 fds[j++] = fds[i]; 901 } 902 } 903 logf("Using our own event cache: %d events", j); 904 if (j > 0) { 905 result = cast(ssize_t)j; 906 return true; 907 } 908 } 909 return false; 910 } 911 if (currentFiber is null) { 912 logf("POLL PASSTHROUGH!"); 913 return raw_poll(fds, nfds, timeout); 914 } 915 else { 916 logf("HOOKED POLL %d fds timeout %d", nfds, timeout); 917 if (nfds < 0) { 918 errno = EINVAL; 919 return -1; 920 } 921 if (nfds == 0) { 922 if (timeout == 0) return 0; 923 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 924 Fiber.yield(); 925 logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd); 926 return 0; 927 } 928 foreach(ref fd; fds[0..nfds]) { 929 if (fd.fd < 0 || fd.fd >= descriptors.length) { 930 errno = EBADF; 931 return -1; 932 } 933 fd.revents = 0; 934 } 935 ssize_t result = 0; 936 if (timeout <= 0) return raw_poll(fds, nfds, timeout); 937 if (nonBlockingCheck(result)) return result; 938 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 939 foreach (i; 0..nfds) { 940 if (fds[i].events & POLLIN) 941 descriptors[fds[i].fd].enqueueReader(&aw); 942 else if(fds[i].events & POLLOUT) 943 descriptors[fds[i].fd].enqueueWriter(&aw); 944 } 945 Fiber.yield(); 946 foreach (i; 0..nfds) { 947 if (fds[i].events & POLLIN) 948 descriptors[fds[i].fd].removeReader(&aw); 949 else if(fds[i].events & POLLOUT) 950 descriptors[fds[i].fd].removeWriter(&aw); 951 } 952 nonBlockingCheck(result); 953 return result; 954 } 955 } 956 957 extern(C) private ssize_t nanosleep(const timespec* req, const timespec* rem) { 958 if (currentFiber !is null) { 959 delay(req); 960 return 0; 961 } else { 962 auto timer = getSleepTimer(); 963 timer.waitThread(req); 964 thread_suspend(mach_thread_self()); 965 return 0; 966 } 967 } 968 969 extern(C) private ssize_t close(int fd) nothrow 970 { 971 logf("HOOKED CLOSE FD=%d", fd); 972 deregisterFd(fd); 973 return cast(int)__syscall(SYS_CLOSE, fd); 974 } 975 976 977 int gettid() 978 { 979 return cast(int)__syscall(SYS_GETTID); 980 } 981 982 ssize_t raw_read(int fd, void *buf, size_t count) nothrow { 983 logf("Raw read on FD=%d", fd); 984 return __syscall(SYS_READ, fd, cast(ssize_t) buf, cast(ssize_t) count); 985 } 986 987 ssize_t raw_write(int fd, const void *buf, size_t count) nothrow 988 { 989 logf("Raw write on FD=%d", fd); 990 return __syscall(SYS_WRITE, fd, cast(size_t) buf, count); 991 } 992 993 ssize_t raw_poll(pollfd *fds, nfds_t nfds, int timeout) 994 { 995 logf("Raw poll"); 996 return __syscall(SYS_POLL, cast(size_t)fds, cast(size_t) nfds, timeout); 997 }