1 module photon.macos.core; 2 version(OSX): 3 private: 4 5 import std.stdio; 6 import std.string; 7 import std.format; 8 import std.exception; 9 import std.conv; 10 import std.array; 11 import core.thread; 12 import core.internal.spinlock; 13 import core.sys.posix.sys.types; 14 import core.sys.posix.sys.socket; 15 import core.sys.posix.poll; 16 import core.sys.posix.netinet.in_; 17 import core.sys.freebsd.unistd; 18 import core.sync.mutex; 19 import core.stdc.errno; 20 import core.stdc.signal; 21 import core.stdc.time; 22 import core.atomic; 23 import core.sys.posix.stdlib: abort; 24 import core.sys.posix.fcntl; 25 import core.memory; 26 import core.sys.posix.sys.mman; 27 import core.sys.posix.pthread; 28 29 import photon.freebsd.support; 30 import photon.ds.common; 31 import photon.ds.intrusive_queue; 32 33 alias quad_t = ulong; 34 alias off_t = long; 35 extern(C) nothrow off_t __syscall(quad_t number, ...); 36 extern(C) nothrow int kevent(int kq, const KEvent* changelist, int nchanges, const KEvent* eventlist, int nevent, timespec* tm); 37 38 struct KEvent { 39 size_t ident; /* identifier for this event */ 40 short filter; /* filter for event */ 41 ushort flags; /* action flags for kqueue */ 42 uint fflags; /* filter flag value */ 43 long data; /* filter data value */ 44 void* udata; /* opaque user data identifier */ 45 ulong[4] ext; /* extensions */ 46 }; 47 48 enum SYS_READ = 3; 49 enum SYS_WRITE = 4; 50 enum SYS_ACCEPT = 30; 51 enum SYS_CONNECT = 98; 52 enum SYS_SENDTO = 133; 53 enum SYS_RECVFROM = 29; 54 enum SYS_CLOSE = 6; 55 enum SYS_GETTID = 286; 56 enum SYS_POLL = 230; 57 58 // T becomes thread-local b/c it's stolen from shared resource 59 auto steal(T)(ref shared T arg) 60 { 61 for (;;) { 62 auto v = atomicLoad(arg); 63 if(cas(&arg, v, cast(shared(T))null)) return v; 64 } 65 } 66 67 68 shared struct RawEvent { 69 nothrow: 70 this(int init) { 71 fd = eventfd(init, 0); 72 } 73 74 void waitAndReset() { 75 byte[8] bytes = void; 76 ssize_t r; 77 do { 78 r = raw_read(fd, bytes.ptr, 8); 79 } while(r < 0 && errno == EINTR); 80 r.checked("event reset"); 81 } 82 83 void trigger() { 84 union U { 85 ulong cnt; 86 ubyte[8] bytes; 87 } 88 U value; 89 value.cnt = 1; 90 ssize_t r; 91 do { 92 r = raw_write(fd, value.bytes.ptr, 8); 93 } while(r < 0 && errno == EINTR); 94 r.checked("event trigger"); 95 } 96 97 int fd; 98 } 99 /* 100 struct Timer { 101 nothrow: 102 private timer_t timer; 103 104 static void ms2ts(timespec *ts, ulong ms) 105 { 106 ts.tv_sec = ms / 1000; 107 ts.tv_nsec = (ms % 1000) * 1000000; 108 } 109 110 void arm(int timeout) { 111 timespec ts_timeout; 112 ms2ts(&ts_timeout, timeout); //convert miliseconds to timespec 113 itimerspec its; 114 its.it_value = ts_timeout; 115 its.it_interval.tv_sec = 0; 116 its.it_interval.tv_nsec = 0; 117 timer_settime(timer, 0, &its, null); 118 } 119 120 void disarm() { 121 itimerspec its; // zeros 122 timer_settime(timer, 0, &its, null); 123 } 124 125 void dispose() { 126 timer_delete(timer).checked; 127 } 128 } 129 130 Timer timer() { 131 int timerfd = timer_create(CLOCK_MONOTONIC, null, null).checked; 132 interceptFd!(Fcntl.noop)(timerfd); 133 return Timer(timerfd); 134 } 135 */ 136 enum EventState { ready, fibers }; 137 138 shared struct Event { 139 SpinLock lock = SpinLock(SpinLock.Contention.brief); 140 EventState state = EventState.ready; 141 FiberExt awaiting; 142 143 void reset() { 144 lock.lock(); 145 auto f = awaiting.unshared; 146 awaiting = null; 147 state = EventState.ready; 148 lock.unlock(); 149 while(f !is null) { 150 auto next = f.next; 151 f.schedule(); 152 f = next; 153 } 154 } 155 156 bool ready(){ 157 lock.lock(); 158 scope(exit) lock.unlock(); 159 return state == EventState.ready; 160 } 161 162 void await(){ 163 lock.lock(); 164 scope(exit) lock.unlock(); 165 if (state == EventState.ready) return; 166 if (currentFiber !is null) { 167 currentFiber.next = awaiting.unshared; 168 awaiting = cast(shared)currentFiber; 169 state = EventState.fibers; 170 } 171 else abort(); //TODO: threads 172 } 173 } 174 175 struct AwaitingFiber { 176 shared FiberExt fiber; 177 AwaitingFiber* next; 178 179 void scheduleAll(int wakeFd) nothrow 180 { 181 auto w = &this; 182 FiberExt head; 183 // first process all AwaitingFibers since they are on stack 184 do { 185 auto fiber = steal(w.fiber); 186 if (fiber) { 187 fiber.unshared.next = head; 188 head = fiber.unshared; 189 } 190 w = w.next; 191 } while(w); 192 while(head) { 193 logf("Waking with FD=%d", wakeFd); 194 head.wakeFd = wakeFd; 195 head.schedule(); 196 head = head.next; 197 } 198 } 199 } 200 201 class FiberExt : Fiber { 202 FiberExt next; 203 uint numScheduler; 204 int wakeFd; // recieves fd that woken us up 205 206 enum PAGESIZE = 4096; 207 208 this(void function() fn, uint numSched) nothrow { 209 super(fn); 210 numScheduler = numSched; 211 } 212 213 this(void delegate() dg, uint numSched) nothrow { 214 super(dg); 215 numScheduler = numSched; 216 } 217 218 void schedule() nothrow 219 { 220 scheds[numScheduler].queue.push(this); 221 } 222 } 223 224 FiberExt currentFiber; 225 shared RawEvent termination; // termination event, triggered once last fiber exits 226 shared pthread_t eventLoop; // event loop, runs outside of D runtime 227 shared int alive; // count of non-terminated Fibers scheduled 228 229 struct SchedulerBlock { 230 shared IntrusiveQueue!(FiberExt, RawEvent) queue; 231 shared uint assigned; 232 size_t[2] padding; 233 } 234 static assert(SchedulerBlock.sizeof == 64); 235 236 package(photon) shared SchedulerBlock[] scheds; 237 238 enum int MAX_EVENTS = 500; 239 enum int SIGNAL = 42; 240 241 extern(C) int kqueue(); 242 243 package(photon) void schedulerEntry(size_t n) 244 { 245 int tid = gettid(); 246 cpu_set_t mask; 247 CPU_SET(n, &mask); 248 sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity"); 249 shared SchedulerBlock* sched = scheds.ptr + n; 250 while (alive > 0) { 251 sched.queue.event.waitAndReset(); 252 for(;;) { 253 FiberExt f = sched.queue.drain(); 254 if (f is null) break; // drained an empty queue, time to sleep 255 do { 256 auto next = f.next; //save next, it will be reused on scheduling 257 currentFiber = f; 258 logf("Fiber %x started", cast(void*)f); 259 try { 260 f.call(); 261 } 262 catch (Exception e) { 263 stderr.writeln(e); 264 atomicOp!"-="(alive, 1); 265 } 266 if (f.state == FiberExt.State.TERM) { 267 logf("Fiber %s terminated", cast(void*)f); 268 atomicOp!"-="(alive, 1); 269 } 270 f = next; 271 } while(f !is null); 272 } 273 } 274 termination.trigger(); 275 } 276 277 public void go(void delegate() func) { 278 import std.random; 279 uint choice; 280 if (scheds.length == 1) choice = 0; 281 else { 282 uint a = uniform!"[)"(0, cast(uint)scheds.length); 283 uint b = uniform!"[)"(0, cast(uint)scheds.length-1); 284 if (a == b) b = cast(uint)scheds.length-1; 285 uint loadA = scheds[a].assigned; 286 uint loadB = scheds[b].assigned; 287 if (loadA < loadB) choice = a; 288 else choice = b; 289 } 290 atomicOp!"+="(scheds[choice].assigned, 1); 291 atomicOp!"+="(alive, 1); 292 auto f = new FiberExt(func, choice); 293 logf("Assigned %x -> %d scheduler", cast(void*)f, choice); 294 f.schedule(); 295 } 296 297 shared Descriptor[] descriptors; 298 shared int event_loop_fd; 299 shared int signal_loop_fd; 300 301 enum ReaderState: uint { 302 EMPTY = 0, 303 UNCERTAIN = 1, 304 READING = 2, 305 READY = 3 306 } 307 308 enum WriterState: uint { 309 READY = 0, 310 UNCERTAIN = 1, 311 WRITING = 2, 312 FULL = 3 313 } 314 315 enum DescriptorState: uint { 316 NOT_INITED, 317 INITIALIZING, 318 NONBLOCKING, 319 THREADPOOL 320 } 321 322 // list of awaiting fibers 323 shared struct Descriptor { 324 ReaderState _readerState; 325 AwaitingFiber* _readerWaits; 326 WriterState _writerState; 327 AwaitingFiber* _writerWaits; 328 DescriptorState state; 329 nothrow: 330 ReaderState readerState()() { 331 return atomicLoad(_readerState); 332 } 333 334 WriterState writerState()() { 335 return atomicLoad(_writerState); 336 } 337 338 // try to change state & return whatever it happend to be in the end 339 bool changeReader()(ReaderState from, ReaderState to) { 340 return cas(&_readerState, from, to); 341 } 342 343 // ditto for writer 344 bool changeWriter()(WriterState from, WriterState to) { 345 return cas(&_writerState, from, to); 346 } 347 348 // 349 shared(AwaitingFiber)* readWaiters()() { 350 return atomicLoad(_readerWaits); 351 } 352 353 // 354 shared(AwaitingFiber)* writeWaiters()(){ 355 return atomicLoad(_writerWaits); 356 } 357 358 // try to enqueue reader fiber given old head 359 bool enqueueReader()(shared(AwaitingFiber)* fiber) { 360 auto head = readWaiters; 361 if (head == fiber) { 362 return true; // TODO: HACK 363 } 364 fiber.next = head; 365 return cas(&_readerWaits, head, fiber); 366 } 367 368 void removeReader()(shared(AwaitingFiber)* fiber) { 369 auto head = steal(_readerWaits); 370 if (head is null || head.next is null) return; 371 head = removeFromList(head.unshared, fiber); 372 cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null); 373 } 374 375 // try to enqueue writer fiber given old head 376 bool enqueueWriter()(shared(AwaitingFiber)* fiber) { 377 auto head = writeWaiters; 378 if (head == fiber) { 379 return true; // TODO: HACK 380 } 381 fiber.next = head; 382 return cas(&_writerWaits, head, fiber); 383 } 384 385 void removeWriter()(shared(AwaitingFiber)* fiber) { 386 auto head = steal(_writerWaits); 387 if (head is null || head.next is null) return; 388 head = removeFromList(head.unshared, fiber); 389 cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null); 390 } 391 392 // try to schedule readers - if fails - someone added a reader, it's now his job to check state 393 void scheduleReaders()(int wakeFd) { 394 auto w = steal(_readerWaits); 395 if (w) w.unshared.scheduleAll(wakeFd); 396 } 397 398 // try to schedule writers, ditto 399 void scheduleWriters()(int wakeFd) { 400 auto w = steal(_writerWaits); 401 if (w) w.unshared.scheduleAll(wakeFd); 402 } 403 } 404 405 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*) 406 { 407 version(photon_tracing) printStats(); 408 _exit(9); 409 } 410 411 version(photon_tracing) 412 void printStats() 413 { 414 // TODO: report on various events in eventloop/scheduler 415 string msg = "Tracing report:\n\n"; 416 write(2, msg.ptr, msg.length); 417 } 418 419 shared int kq; 420 shared int eventLoopId; 421 422 public void startloop() 423 { 424 import core.cpuid; 425 uint threads = threadsPerCPU; 426 kq = kqueue(); 427 enforce(kq != -1); 428 429 pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null); 430 } 431 432 package(photon) void stoploop() 433 { 434 void* ret; 435 pthread_join(cast(pthread_t)eventLoop, &ret); 436 } 437 438 enum EVFILT_READ = -1; 439 enum EVFILT_WRITE = -2; 440 441 extern(C) void* processEventsEntry(void*) 442 { 443 KEvent[MAX_EVENTS] ke; 444 for (;;) { 445 int cnt = kevent(kq, null, 0, ke.ptr, MAX_EVENTS, null); 446 enforce(cnt >= 0); 447 for (int i = 0; i < cnt; i++) { 448 if (ke[i].filter == EVFILT_READ) { 449 //auto reader = descriptors[ke[i].ident].removeReader(); 450 } else if(ke[i].filter == EVFILT_WRITE) { 451 //auto writer = descriptors[ke[i].ident].removeWriter(); 452 } 453 } 454 } 455 } 456 457 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF } 458 enum SyscallKind { accept, read, write, connect } 459 460 // intercept - a filter for file descriptor, changes flags and register on first use 461 void interceptFd(Fcntl needsFcntl)(int fd) nothrow { 462 logf("Hit interceptFD"); 463 if (fd < 0 || fd >= descriptors.length) return; 464 if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) { 465 logf("First use, registering fd = %s", fd); 466 static if(needsFcntl == Fcntl.explicit) { 467 int flags = fcntl(fd, F_GETFL, 0); 468 fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked; 469 logf("Setting FCNTL. %x", cast(void*)currentFiber); 470 } 471 KEvent ke; 472 ke.ident = fd; 473 ke.filter = (-1) | (-2); 474 ke.flags = 0x0001 | 0x0004; 475 timespec timeout; 476 timeout.tv_nsec = 1000; 477 kevent(kq, &ke, 1, null, 0, &timeout); 478 } 479 } 480 481 void deregisterFd(int fd) nothrow { 482 if(fd >= 0 && fd < descriptors.length) { 483 auto descriptor = descriptors.ptr + fd; 484 atomicStore(descriptor._writerState, WriterState.READY); 485 atomicStore(descriptor._readerState, ReaderState.EMPTY); 486 descriptor.scheduleReaders(fd); 487 descriptor.scheduleWriters(fd); 488 atomicStore(descriptor.state, DescriptorState.NOT_INITED); 489 } 490 } 491 492 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...) 493 (int fd, T args) nothrow { 494 if (currentFiber is null) { 495 logf("%s PASSTHROUGH FD=%s", name, fd); 496 return __syscall(ident, fd, args).withErrorno; 497 } 498 else { 499 logf("HOOKED %s FD=%d", name, fd); 500 interceptFd!(fcntlStyle)(fd); 501 shared(Descriptor)* descriptor = descriptors.ptr + fd; 502 if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) { 503 logf("%s syscall THREADPOLL FD=%d", name, fd); 504 //TODO: offload syscall to thread-pool 505 return __syscall(ident, fd, args).withErrorno; 506 } 507 L_start: 508 shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null); 509 // set flags argument if able to avoid direct fcntl calls 510 static if (fcntlStyle != Fcntl.explicit) 511 { 512 args[2] |= fcntlStyle; 513 } 514 //if (kind == SyscallKind.accept) 515 logf("kind:s args:%s", kind, args); 516 static if(kind == SyscallKind.accept || kind == SyscallKind.read) { 517 auto state = descriptor.readerState; 518 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 519 final switch (state) with (ReaderState) { 520 case EMPTY: 521 logf("EMPTY - enqueue reader"); 522 if (!descriptor.enqueueReader(&await)) goto L_start; 523 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 524 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd); 525 FiberExt.yield(); 526 goto L_start; 527 case UNCERTAIN: 528 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING 529 goto case READING; 530 case READY: 531 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads 532 goto case READING; 533 case READING: 534 ssize_t resp = __syscall(ident, fd, args); 535 static if (kind == SyscallKind.accept) { 536 if (resp >= 0) // for accept we never know if we emptied the queue 537 descriptor.changeReader(READING, UNCERTAIN); 538 else if (resp == -ERR || resp == -EAGAIN) { 539 if (descriptor.changeReader(READING, EMPTY)) 540 goto case EMPTY; 541 goto L_start; // became UNCERTAIN or READY in meantime 542 } 543 } 544 else static if (kind == SyscallKind.read) { 545 if (resp == args[1]) // length is 2nd in (buf, length, ...) 546 descriptor.changeReader(READING, UNCERTAIN); 547 else if(resp >= 0) 548 descriptor.changeReader(READING, EMPTY); 549 else if (resp == -ERR || resp == -EAGAIN) { 550 if (descriptor.changeReader(READING, EMPTY)) 551 goto case EMPTY; 552 goto L_start; // became UNCERTAIN or READY in meantime 553 } 554 } 555 else 556 static assert(0); 557 return withErrorno(resp); 558 } 559 } 560 else static if(kind == SyscallKind.write || kind == SyscallKind.connect) { 561 //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking? 562 auto state = descriptor.writerState; 563 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 564 final switch (state) with (WriterState) { 565 case FULL: 566 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber); 567 if (!descriptor.enqueueWriter(&await)) goto L_start; 568 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 569 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd); 570 FiberExt.yield(); 571 goto L_start; 572 case UNCERTAIN: 573 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber); 574 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING 575 goto case WRITING; 576 case READY: 577 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes 578 goto case WRITING; 579 case WRITING: 580 ssize_t resp = __syscall(ident, fd, args); 581 static if (kind == SyscallKind.connect) { 582 if(resp >= 0) { 583 descriptor.changeWriter(WRITING, READY); 584 } 585 else if (resp == -ERR || resp == -EALREADY) { 586 if (descriptor.changeWriter(WRITING, FULL)) { 587 goto case FULL; 588 } 589 goto L_start; // became UNCERTAIN or READY in meantime 590 } 591 return withErrorno(resp); 592 } 593 else { 594 if (resp == args[1]) // (buf, len) args to syscall 595 descriptor.changeWriter(WRITING, UNCERTAIN); 596 else if(resp >= 0) { 597 logf("Short-write on FD=%d, become FULL", fd); 598 descriptor.changeWriter(WRITING, FULL); 599 } 600 else if (resp == -ERR || resp == -EAGAIN) { 601 if (descriptor.changeWriter(WRITING, FULL)) { 602 logf("Sudden block on FD=%d, become FULL", fd); 603 goto case FULL; 604 } 605 goto L_start; // became UNCERTAIN or READY in meantime 606 } 607 return withErrorno(resp); 608 } 609 } 610 } 611 assert(0); 612 } 613 } 614 615 // ====================================================================================== 616 // SYSCALL warappers intercepts 617 // ====================================================================================== 618 619 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow 620 { 621 return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK) 622 (fd, cast(size_t)buf, count); 623 } 624 625 extern(C) ssize_t write(int fd, const void *buf, size_t count) 626 { 627 return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 628 (fd, cast(size_t)buf, count); 629 } 630 631 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen) 632 { 633 return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK) 634 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 635 } 636 637 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen) 638 { 639 return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS) 640 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 641 } 642 643 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, 644 const sockaddr *dest_addr, socklen_t addrlen) 645 { 646 return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 647 (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen); 648 } 649 650 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow { 651 sockaddr_in src_addr; 652 src_addr.sin_family = AF_INET; 653 src_addr.sin_port = 0; 654 src_addr.sin_addr.s_addr = htonl(INADDR_ANY); 655 ssize_t addrlen = sockaddr_in.sizeof; 656 return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen); 657 } 658 659 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, 660 sockaddr *src_addr, ssize_t* addrlen) nothrow 661 { 662 return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK) 663 (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen); 664 } 665 666 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout) 667 { 668 bool nonBlockingCheck(ref ssize_t result) { 669 bool uncertain; 670 L_cacheloop: 671 foreach (ref fd; fds[0..nfds]) { 672 interceptFd!(Fcntl.explicit)(fd.fd); 673 fd.revents = 0; 674 auto descriptor = descriptors.ptr + fd.fd; 675 if (fd.events & POLLIN) { 676 auto state = descriptor.readerState; 677 logf("Found event %d for reader in select", state); 678 switch(state) with(ReaderState) { 679 case READY: 680 fd.revents |= POLLIN; 681 break; 682 case EMPTY: 683 break; 684 default: 685 uncertain = true; 686 break L_cacheloop; 687 } 688 } 689 if (fd.events & POLLOUT) { 690 auto state = descriptor.writerState; 691 logf("Found event %d for writer in select", state); 692 switch(state) with(WriterState) { 693 case READY: 694 fd.revents |= POLLOUT; 695 break; 696 case FULL: 697 break; 698 default: 699 uncertain = true; 700 break L_cacheloop; 701 } 702 } 703 } 704 // fallback to system poll call if descriptor state is uncertain 705 if (uncertain) { 706 logf("Fallback to system poll, descriptors have uncertain state"); 707 ssize_t p = raw_poll(fds, nfds, 0); 708 if (p != 0) { 709 result = p; 710 logf("Raw poll returns %d", result); 711 return true; 712 } 713 } 714 else { 715 ssize_t j = 0; 716 foreach (i; 0..nfds) { 717 if (fds[i].revents) { 718 fds[j++] = fds[i]; 719 } 720 } 721 logf("Using our own event cache: %d events", j); 722 if (j > 0) { 723 result = cast(ssize_t)j; 724 return true; 725 } 726 } 727 return false; 728 } 729 if (currentFiber is null) { 730 logf("POLL PASSTHROUGH!"); 731 return raw_poll(fds, nfds, timeout); 732 } 733 else { 734 logf("HOOKED POLL %d fds timeout %d", nfds, timeout); 735 if (nfds < 0) return -EINVAL.withErrorno; 736 if (nfds == 0) { 737 if (timeout == 0) return 0; 738 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 739 Fiber.yield(); 740 logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd); 741 return 0; 742 } 743 foreach(ref fd; fds[0..nfds]) { 744 if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno; 745 fd.revents = 0; 746 } 747 ssize_t result = 0; 748 if (timeout <= 0) return raw_poll(fds, nfds, timeout); 749 if (nonBlockingCheck(result)) return result; 750 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 751 foreach (i; 0..nfds) { 752 if (fds[i].events & POLLIN) 753 descriptors[fds[i].fd].enqueueReader(&aw); 754 else if(fds[i].events & POLLOUT) 755 descriptors[fds[i].fd].enqueueWriter(&aw); 756 } 757 Fiber.yield(); 758 foreach (i; 0..nfds) { 759 if (fds[i].events & POLLIN) 760 descriptors[fds[i].fd].removeReader(&aw); 761 else if(fds[i].events & POLLOUT) 762 descriptors[fds[i].fd].removeWriter(&aw); 763 } 764 nonBlockingCheck(result); 765 return result; 766 } 767 } 768 769 extern(C) private ssize_t close(int fd) nothrow 770 { 771 logf("HOOKED CLOSE FD=%d", fd); 772 deregisterFd(fd); 773 return cast(int)withErrorno(__syscall(SYS_CLOSE, fd)); 774 } 775 776 777 int gettid() 778 { 779 return cast(int)__syscall(SYS_GETTID); 780 } 781 782 ssize_t raw_read(int fd, void *buf, size_t count) nothrow { 783 logf("Raw read on FD=%d", fd); 784 return __syscall(SYS_READ, fd, cast(ssize_t) buf, cast(ssize_t) count).withErrorno; 785 } 786 787 ssize_t raw_write(int fd, const void *buf, size_t count) nothrow 788 { 789 logf("Raw write on FD=%d", fd); 790 return __syscall(SYS_WRITE, fd, cast(size_t) buf, count).withErrorno; 791 } 792 793 ssize_t raw_poll(pollfd *fds, nfds_t nfds, int timeout) 794 { 795 logf("Raw poll"); 796 return __syscall(SYS_POLL, cast(size_t)fds, cast(size_t) nfds, timeout).withErrorno; 797 }