1 module photon.freebsd.core; 2 version(FreeBSD): 3 private: 4 5 import std.stdio; 6 import std.string; 7 import std.format; 8 import std.exception; 9 import std.conv; 10 import std.array; 11 import core.thread; 12 import core.internal.spinlock; 13 import core.sys.posix.sys.types; 14 import core.sys.posix.sys.socket; 15 import core.sys.posix.poll; 16 import core.sys.posix.netinet.in_; 17 import core.sys.freebsd.unistd; 18 import core.sync.mutex; 19 import core.stdc.errno; 20 import core.atomic; 21 import core.sys.posix.stdlib: abort; 22 import core.sys.posix.fcntl; 23 import core.memory; 24 import core.sys.posix.sys.mman; 25 import core.sys.posix.pthread; 26 import core.sys.linux.sys.signalfd; 27 28 import photon.freebsd.support; 29 import photon.freebsd.syscalls; 30 import photon.ds.common; 31 import photon.ds.intrusive_queue; 32 33 // T becomes thread-local b/c it's stolen from shared resource 34 auto steal(T)(ref shared T arg) 35 { 36 for (;;) { 37 auto v = atomicLoad(arg); 38 if(cas(&arg, v, cast(shared(T))null)) return v; 39 } 40 } 41 42 43 shared struct RawEvent { 44 nothrow: 45 this(int init) { 46 fd = eventfd(init, 0); 47 } 48 49 void waitAndReset() { 50 byte[8] bytes = void; 51 ssize_t r; 52 do { 53 r = raw_read(fd, bytes.ptr, 8); 54 } while(r < 0 && errno == EINTR); 55 r.checked("event reset"); 56 } 57 58 void trigger() { 59 union U { 60 ulong cnt; 61 ubyte[8] bytes; 62 } 63 U value; 64 value.cnt = 1; 65 ssize_t r; 66 do { 67 r = raw_write(fd, value.bytes.ptr, 8); 68 } while(r < 0 && errno == EINTR); 69 r.checked("event trigger"); 70 } 71 72 int fd; 73 } 74 75 struct Timer { 76 nothrow: 77 private timer_t timer; 78 79 static void ms2ts(timespec *ts, ulong ms) 80 { 81 ts.tv_sec = ms / 1000; 82 ts.tv_nsec = (ms % 1000) * 1000000; 83 } 84 85 void arm(int timeout) { 86 timespec ts_timeout; 87 ms2ts(&ts_timeout, timeout); //convert miliseconds to timespec 88 itimerspec its; 89 its.it_value = ts_timeout; 90 its.it_interval.tv_sec = 0; 91 its.it_interval.tv_nsec = 0; 92 timer_settime(timer, 0, &its, null); 93 } 94 95 void disarm() { 96 itimerspec its; // zeros 97 timer_settime(timer, 0, &its, null); 98 } 99 100 void dispose() { 101 timer_delete(timer).checked; 102 } 103 } 104 105 Timer timer() { 106 int timerfd = timer_create(CLOCK_MONOTONIC, null, null).checked; 107 interceptFd!(Fcntl.noop)(timerfd); 108 return Timer(timerfd); 109 } 110 111 enum EventState { ready, fibers }; 112 113 shared struct Event { 114 SpinLock lock = SpinLock(SpinLock.Contention.brief); 115 EventState state = EventState.ready; 116 FiberExt awaiting; 117 118 void reset() { 119 lock.lock(); 120 auto f = awaiting.unshared; 121 awaiting = null; 122 state = EventState.ready; 123 lock.unlock(); 124 while(f !is null) { 125 auto next = f.next; 126 f.schedule(); 127 f = next; 128 } 129 } 130 131 bool ready(){ 132 lock.lock(); 133 scope(exit) lock.unlock(); 134 return state == EventState.ready; 135 } 136 137 void await(){ 138 lock.lock(); 139 scope(exit) lock.unlock(); 140 if (state == EventState.ready) return; 141 if (currentFiber !is null) { 142 currentFiber.next = awaiting.unshared; 143 awaiting = cast(shared)currentFiber; 144 state = EventState.fibers; 145 } 146 else abort(); //TODO: threads 147 } 148 } 149 150 struct AwaitingFiber { 151 shared FiberExt fiber; 152 AwaitingFiber* next; 153 154 void scheduleAll(int wakeFd) nothrow 155 { 156 auto w = &this; 157 FiberExt head; 158 // first process all AwaitingFibers since they are on stack 159 do { 160 auto fiber = steal(w.fiber); 161 if (fiber) { 162 fiber.unshared.next = head; 163 head = fiber.unshared; 164 } 165 w = w.next; 166 } while(w); 167 while(head) { 168 logf("Waking with FD=%d", wakeFd); 169 head.wakeFd = wakeFd; 170 head.schedule(); 171 head = head.next; 172 } 173 } 174 } 175 176 class FiberExt : Fiber { 177 FiberExt next; 178 uint numScheduler; 179 int wakeFd; // recieves fd that woken us up 180 181 enum PAGESIZE = 4096; 182 183 this(void function() fn, uint numSched) nothrow { 184 super(fn); 185 numScheduler = numSched; 186 } 187 188 this(void delegate() dg, uint numSched) nothrow { 189 super(dg); 190 numScheduler = numSched; 191 } 192 193 void schedule() nothrow 194 { 195 scheds[numScheduler].queue.push(this); 196 } 197 } 198 199 FiberExt currentFiber; 200 shared RawEvent termination; // termination event, triggered once last fiber exits 201 shared pthread_t eventLoop; // event loop, runs outside of D runtime 202 shared int alive; // count of non-terminated Fibers scheduled 203 204 struct SchedulerBlock { 205 shared IntrusiveQueue!(FiberExt, RawEvent) queue; 206 shared uint assigned; 207 size_t[2] padding; 208 } 209 static assert(SchedulerBlock.sizeof == 64); 210 211 package(photon) shared SchedulerBlock[] scheds; 212 213 enum int MAX_EVENTS = 500; 214 enum int SIGNAL = 42; 215 216 struct kevent { 217 size_t ident; /* identifier for this event */ 218 short filter; /* filter for event */ 219 ushort flags; /* action flags for kqueue */ 220 uint fflags; /* filter flag value */ 221 long data; /* filter data value */ 222 void* udata; /* opaque user data identifier */ 223 ulong[4] ext; /* extensions */ 224 }; 225 226 package(photon) void schedulerEntry(size_t n) 227 { 228 int tid = gettid(); 229 cpu_set_t mask; 230 CPU_SET(n, &mask); 231 sched_setaffinity(tid, mask.sizeof, &mask).checked("sched_setaffinity"); 232 shared SchedulerBlock* sched = scheds.ptr + n; 233 while (alive > 0) { 234 sched.queue.event.waitAndReset(); 235 for(;;) { 236 FiberExt f = sched.queue.drain(); 237 if (f is null) break; // drained an empty queue, time to sleep 238 do { 239 auto next = f.next; //save next, it will be reused on scheduling 240 currentFiber = f; 241 logf("Fiber %x started", cast(void*)f); 242 try { 243 f.call(); 244 } 245 catch (Exception e) { 246 stderr.writeln(e); 247 atomicOp!"-="(alive, 1); 248 } 249 if (f.state == FiberExt.State.TERM) { 250 logf("Fiber %s terminated", cast(void*)f); 251 atomicOp!"-="(alive, 1); 252 } 253 f = next; 254 } while(f !is null); 255 } 256 } 257 termination.trigger(); 258 } 259 260 public void go(void delegate() func) { 261 import std.random; 262 uint choice; 263 if (scheds.length == 1) choice = 0; 264 else { 265 uint a = uniform!"[)"(0, cast(uint)scheds.length); 266 uint b = uniform!"[)"(0, cast(uint)scheds.length-1); 267 if (a == b) b = cast(uint)scheds.length-1; 268 uint loadA = scheds[a].assigned; 269 uint loadB = scheds[b].assigned; 270 if (loadA < loadB) choice = a; 271 else choice = b; 272 } 273 atomicOp!"+="(scheds[choice].assigned, 1); 274 atomicOp!"+="(alive, 1); 275 auto f = new FiberExt(func, choice); 276 logf("Assigned %x -> %d scheduler", cast(void*)f, choice); 277 f.schedule(); 278 } 279 280 shared Descriptor[] descriptors; 281 shared int event_loop_fd; 282 shared int signal_loop_fd; 283 284 enum ReaderState: uint { 285 EMPTY = 0, 286 UNCERTAIN = 1, 287 READING = 2, 288 READY = 3 289 } 290 291 enum WriterState: uint { 292 READY = 0, 293 UNCERTAIN = 1, 294 WRITING = 2, 295 FULL = 3 296 } 297 298 enum DescriptorState: uint { 299 NOT_INITED, 300 INITIALIZING, 301 NONBLOCKING, 302 THREADPOOL 303 } 304 305 // list of awaiting fibers 306 shared struct Descriptor { 307 ReaderState _readerState; 308 AwaitingFiber* _readerWaits; 309 WriterState _writerState; 310 AwaitingFiber* _writerWaits; 311 DescriptorState state; 312 nothrow: 313 ReaderState readerState()() { 314 return atomicLoad(_readerState); 315 } 316 317 WriterState writerState()() { 318 return atomicLoad(_writerState); 319 } 320 321 // try to change state & return whatever it happend to be in the end 322 bool changeReader()(ReaderState from, ReaderState to) { 323 return cas(&_readerState, from, to); 324 } 325 326 // ditto for writer 327 bool changeWriter()(WriterState from, WriterState to) { 328 return cas(&_writerState, from, to); 329 } 330 331 // 332 shared(AwaitingFiber)* readWaiters()() { 333 return atomicLoad(_readerWaits); 334 } 335 336 // 337 shared(AwaitingFiber)* writeWaiters()(){ 338 return atomicLoad(_writerWaits); 339 } 340 341 // try to enqueue reader fiber given old head 342 bool enqueueReader()(shared(AwaitingFiber)* fiber) { 343 auto head = readWaiters; 344 if (head == fiber) { 345 return true; // TODO: HACK 346 } 347 fiber.next = head; 348 return cas(&_readerWaits, head, fiber); 349 } 350 351 void removeReader()(shared(AwaitingFiber)* fiber) { 352 auto head = steal(_readerWaits); 353 if (head is null || head.next is null) return; 354 head = removeFromList(head.unshared, fiber); 355 cas(&_readerWaits, head, cast(shared(AwaitingFiber*))null); 356 } 357 358 // try to enqueue writer fiber given old head 359 bool enqueueWriter()(shared(AwaitingFiber)* fiber) { 360 auto head = writeWaiters; 361 if (head == fiber) { 362 return true; // TODO: HACK 363 } 364 fiber.next = head; 365 return cas(&_writerWaits, head, fiber); 366 } 367 368 void removeWriter()(shared(AwaitingFiber)* fiber) { 369 auto head = steal(_writerWaits); 370 if (head is null || head.next is null) return; 371 head = removeFromList(head.unshared, fiber); 372 cas(&_writerWaits, head, cast(shared(AwaitingFiber*))null); 373 } 374 375 // try to schedule readers - if fails - someone added a reader, it's now his job to check state 376 void scheduleReaders()(int wakeFd) { 377 auto w = steal(_readerWaits); 378 if (w) w.unshared.scheduleAll(wakeFd); 379 } 380 381 // try to schedule writers, ditto 382 void scheduleWriters()(int wakeFd) { 383 auto w = steal(_writerWaits); 384 if (w) w.unshared.scheduleAll(wakeFd); 385 } 386 } 387 388 extern(C) void graceful_shutdown_on_signal(int, siginfo_t*, void*) 389 { 390 version(photon_tracing) printStats(); 391 _exit(9); 392 } 393 394 version(photon_tracing) 395 void printStats() 396 { 397 // TODO: report on various events in eventloop/scheduler 398 string msg = "Tracing report:\n\n"; 399 write(2, msg.ptr, msg.length); 400 } 401 402 shared int kq; 403 404 public void startloop() 405 { 406 import core.cpuid; 407 uint threads = threadsPerCPU; 408 kq = kqueue(); 409 enforce(kq != -1); 410 411 eventLoop = pthread_create(cast(pthread_t*)&eventLoop, null, &processEventsEntry, null); 412 } 413 414 package(photon) void stoploop() 415 { 416 void* ret; 417 pthread_join(eventLoop, &ret); 418 } 419 420 extern(C) void* processEventsEntry(void*) 421 { 422 kevent ke; 423 for (;;) { 424 if (kevent(kq, null, 0, &ke, 1, null) != -1) { 425 logf("A write occured on the file"); 426 } 427 } 428 } 429 430 enum Fcntl: int { explicit = 0, msg = MSG_DONTWAIT, sock = SOCK_NONBLOCK, noop = 0xFFFFF } 431 enum SyscallKind { accept, read, write, connect } 432 433 // intercept - a filter for file descriptor, changes flags and register on first use 434 void interceptFd(Fcntl needsFcntl)(int fd) nothrow { 435 logf("Hit interceptFD"); 436 if (fd < 0 || fd >= descriptors.length) return; 437 if (cas(&descriptors[fd].state, DescriptorState.NOT_INITED, DescriptorState.INITIALIZING)) { 438 logf("First use, registering fd = %s", fd); 439 static if(needsFcntl == Fcntl.explicit) { 440 int flags = fcntl(fd, F_GETFL, 0); 441 fcntl(fd, F_SETFL, flags | O_NONBLOCK).checked; 442 logf("Setting FCNTL. %x", cast(void*)currentFiber); 443 } 444 kevent ke; 445 ke.ident = fd; 446 ke.filter = EVFILT_READ | EVFILT_WRITE; 447 ke.flags = EV_ADD | EV_ENABLE; 448 timespec timeout; 449 timeout.tv_nsec = 1000; 450 enforce(kevent(kq, null, 0, &ke, 1, ×pec) >= 0); 451 } 452 } 453 454 void deregisterFd(int fd) nothrow { 455 if(fd >= 0 && fd < descriptors.length) { 456 auto descriptor = descriptors.ptr + fd; 457 atomicStore(descriptor._writerState, WriterState.READY); 458 atomicStore(descriptor._readerState, ReaderState.EMPTY); 459 descriptor.scheduleReaders(fd); 460 descriptor.scheduleWriters(fd); 461 atomicStore(descriptor.state, DescriptorState.NOT_INITED); 462 } 463 } 464 465 ssize_t universalSyscall(size_t ident, string name, SyscallKind kind, Fcntl fcntlStyle, ssize_t ERR, T...) 466 (int fd, T args) nothrow { 467 if (currentFiber is null) { 468 logf("%s PASSTHROUGH FD=%s", name, fd); 469 return syscall(ident, fd, args).withErrorno; 470 } 471 else { 472 logf("HOOKED %s FD=%d", name, fd); 473 interceptFd!(fcntlStyle)(fd); 474 shared(Descriptor)* descriptor = descriptors.ptr + fd; 475 if (atomicLoad(descriptor.state) == DescriptorState.THREADPOOL) { 476 logf("%s syscall THREADPOLL FD=%d", name, fd); 477 //TODO: offload syscall to thread-pool 478 return syscall(ident, fd, args).withErrorno; 479 } 480 L_start: 481 shared AwaitingFiber await = AwaitingFiber(cast(shared)currentFiber, null); 482 // set flags argument if able to avoid direct fcntl calls 483 static if (fcntlStyle != Fcntl.explicit) 484 { 485 args[2] |= fcntlStyle; 486 } 487 //if (kind == SyscallKind.accept) 488 logf("kind:s args:%s", kind, args); 489 static if(kind == SyscallKind.accept || kind == SyscallKind.read) { 490 auto state = descriptor.readerState; 491 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 492 final switch (state) with (ReaderState) { 493 case EMPTY: 494 logf("EMPTY - enqueue reader"); 495 if (!descriptor.enqueueReader(&await)) goto L_start; 496 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 497 if (descriptor.readerState != EMPTY) descriptor.scheduleReaders(fd); 498 FiberExt.yield(); 499 goto L_start; 500 case UNCERTAIN: 501 descriptor.changeReader(UNCERTAIN, READING); // may became READY or READING 502 goto case READING; 503 case READY: 504 descriptor.changeReader(READY, READING); // always succeeds if 1 fiber reads 505 goto case READING; 506 case READING: 507 ssize_t resp = syscall(ident, fd, args); 508 static if (kind == SyscallKind.accept) { 509 if (resp >= 0) // for accept we never know if we emptied the queue 510 descriptor.changeReader(READING, UNCERTAIN); 511 else if (resp == -ERR || resp == -EAGAIN) { 512 if (descriptor.changeReader(READING, EMPTY)) 513 goto case EMPTY; 514 goto L_start; // became UNCERTAIN or READY in meantime 515 } 516 } 517 else static if (kind == SyscallKind.read) { 518 if (resp == args[1]) // length is 2nd in (buf, length, ...) 519 descriptor.changeReader(READING, UNCERTAIN); 520 else if(resp >= 0) 521 descriptor.changeReader(READING, EMPTY); 522 else if (resp == -ERR || resp == -EAGAIN) { 523 if (descriptor.changeReader(READING, EMPTY)) 524 goto case EMPTY; 525 goto L_start; // became UNCERTAIN or READY in meantime 526 } 527 } 528 else 529 static assert(0); 530 return withErrorno(resp); 531 } 532 } 533 else static if(kind == SyscallKind.write || kind == SyscallKind.connect) { 534 //TODO: Handle short-write b/c of EWOULDBLOCK to apear as fully blocking? 535 auto state = descriptor.writerState; 536 logf("%s syscall state is %d. Fiber %x", name, state, cast(void*)currentFiber); 537 final switch (state) with (WriterState) { 538 case FULL: 539 logf("FULL FD=%d Fiber %x", fd, cast(void*)currentFiber); 540 if (!descriptor.enqueueWriter(&await)) goto L_start; 541 // changed state to e.g. READY or UNCERTAIN in meantime, may need to reschedule 542 if (descriptor.writerState != FULL) descriptor.scheduleWriters(fd); 543 FiberExt.yield(); 544 goto L_start; 545 case UNCERTAIN: 546 logf("UNCERTAIN on FD=%d Fiber %x", fd, cast(void*)currentFiber); 547 descriptor.changeWriter(UNCERTAIN, WRITING); // may became READY or WRITING 548 goto case WRITING; 549 case READY: 550 descriptor.changeWriter(READY, WRITING); // always succeeds if 1 fiber writes 551 goto case WRITING; 552 case WRITING: 553 ssize_t resp = syscall(ident, fd, args); 554 static if (kind == SyscallKind.connect) { 555 if(resp >= 0) { 556 descriptor.changeWriter(WRITING, READY); 557 } 558 else if (resp == -ERR || resp == -EALREADY) { 559 if (descriptor.changeWriter(WRITING, FULL)) { 560 goto case FULL; 561 } 562 goto L_start; // became UNCERTAIN or READY in meantime 563 } 564 return withErrorno(resp); 565 } 566 else { 567 if (resp == args[1]) // (buf, len) args to syscall 568 descriptor.changeWriter(WRITING, UNCERTAIN); 569 else if(resp >= 0) { 570 logf("Short-write on FD=%d, become FULL", fd); 571 descriptor.changeWriter(WRITING, FULL); 572 } 573 else if (resp == -ERR || resp == -EAGAIN) { 574 if (descriptor.changeWriter(WRITING, FULL)) { 575 logf("Sudden block on FD=%d, become FULL", fd); 576 goto case FULL; 577 } 578 goto L_start; // became UNCERTAIN or READY in meantime 579 } 580 return withErrorno(resp); 581 } 582 } 583 } 584 assert(0); 585 } 586 } 587 588 // ====================================================================================== 589 // SYSCALL warappers intercepts 590 // ====================================================================================== 591 592 extern(C) ssize_t read(int fd, void *buf, size_t count) nothrow 593 { 594 return universalSyscall!(SYS_READ, "READ", SyscallKind.read, Fcntl.explicit, EWOULDBLOCK) 595 (fd, cast(size_t)buf, count); 596 } 597 598 extern(C) ssize_t write(int fd, const void *buf, size_t count) 599 { 600 return universalSyscall!(SYS_WRITE, "WRITE", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 601 (fd, cast(size_t)buf, count); 602 } 603 604 extern(C) ssize_t accept(int sockfd, sockaddr *addr, socklen_t *addrlen) 605 { 606 return universalSyscall!(SYS_ACCEPT, "accept", SyscallKind.accept, Fcntl.explicit, EWOULDBLOCK) 607 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 608 } 609 610 extern(C) ssize_t accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) 611 { 612 return universalSyscall!(SYS_ACCEPT4, "accept4", SyscallKind.accept, Fcntl.sock, EWOULDBLOCK) 613 (sockfd, cast(size_t) addr, cast(size_t) addrlen, flags); 614 } 615 616 extern(C) ssize_t connect(int sockfd, const sockaddr *addr, socklen_t *addrlen) 617 { 618 return universalSyscall!(SYS_CONNECT, "connect", SyscallKind.connect, Fcntl.explicit, EINPROGRESS) 619 (sockfd, cast(size_t) addr, cast(size_t) addrlen); 620 } 621 622 extern(C) ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, 623 const sockaddr *dest_addr, socklen_t addrlen) 624 { 625 return universalSyscall!(SYS_SENDTO, "sendto", SyscallKind.write, Fcntl.explicit, EWOULDBLOCK) 626 (sockfd, cast(size_t) buf, len, flags, cast(size_t) dest_addr, cast(size_t) addrlen); 627 } 628 629 extern(C) size_t recv(int sockfd, void *buf, size_t len, int flags) nothrow { 630 sockaddr_in src_addr; 631 src_addr.sin_family = AF_INET; 632 src_addr.sin_port = 0; 633 src_addr.sin_addr.s_addr = htonl(INADDR_ANY); 634 ssize_t addrlen = sockaddr_in.sizeof; 635 return recvfrom(sockfd, buf, len, flags, cast(sockaddr*)&src_addr, &addrlen); 636 } 637 638 extern(C) private ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, 639 sockaddr *src_addr, ssize_t* addrlen) nothrow 640 { 641 return universalSyscall!(SYS_RECVFROM, "RECVFROM", SyscallKind.read, Fcntl.msg, EWOULDBLOCK) 642 (sockfd, cast(size_t)buf, len, flags, cast(size_t)src_addr, cast(size_t)addrlen); 643 } 644 645 extern(C) private ssize_t poll(pollfd *fds, nfds_t nfds, int timeout) 646 { 647 bool nonBlockingCheck(ref ssize_t result) { 648 bool uncertain; 649 L_cacheloop: 650 foreach (ref fd; fds[0..nfds]) { 651 interceptFd!(Fcntl.explicit)(fd.fd); 652 fd.revents = 0; 653 auto descriptor = descriptors.ptr + fd.fd; 654 if (fd.events & POLLIN) { 655 auto state = descriptor.readerState; 656 logf("Found event %d for reader in select", state); 657 switch(state) with(ReaderState) { 658 case READY: 659 fd.revents |= POLLIN; 660 break; 661 case EMPTY: 662 break; 663 default: 664 uncertain = true; 665 break L_cacheloop; 666 } 667 } 668 if (fd.events & POLLOUT) { 669 auto state = descriptor.writerState; 670 logf("Found event %d for writer in select", state); 671 switch(state) with(WriterState) { 672 case READY: 673 fd.revents |= POLLOUT; 674 break; 675 case FULL: 676 break; 677 default: 678 uncertain = true; 679 break L_cacheloop; 680 } 681 } 682 } 683 // fallback to system poll call if descriptor state is uncertain 684 if (uncertain) { 685 logf("Fallback to system poll, descriptors have uncertain state"); 686 ssize_t p = raw_poll(fds, nfds, 0); 687 if (p != 0) { 688 result = p; 689 logf("Raw poll returns %d", result); 690 return true; 691 } 692 } 693 else { 694 ssize_t j = 0; 695 foreach (i; 0..nfds) { 696 if (fds[i].revents) { 697 fds[j++] = fds[i]; 698 } 699 } 700 logf("Using our own event cache: %d events", j); 701 if (j > 0) { 702 result = cast(ssize_t)j; 703 return true; 704 } 705 } 706 return false; 707 } 708 if (currentFiber is null) { 709 logf("POLL PASSTHROUGH!"); 710 return raw_poll(fds, nfds, timeout); 711 } 712 else { 713 logf("HOOKED POLL %d fds timeout %d", nfds, timeout); 714 if (nfds < 0) return -EINVAL.withErrorno; 715 if (nfds == 0) { 716 if (timeout == 0) return 0; 717 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 718 Timer tm = timer(); 719 descriptors[tm.fd].enqueueReader(&aw); 720 scope(exit) tm.dispose(); 721 tm.arm(timeout); 722 logf("Timer fd=%d", tm.fd); 723 Fiber.yield(); 724 logf("Woke up after select %x. WakeFd=%d", cast(void*)currentFiber, currentFiber.wakeFd); 725 return 0; 726 } 727 foreach(ref fd; fds[0..nfds]) { 728 if (fd.fd < 0 || fd.fd >= descriptors.length) return -EBADF.withErrorno; 729 fd.revents = 0; 730 } 731 ssize_t result = 0; 732 if (timeout <= 0) return raw_poll(fds, nfds, timeout); 733 if (nonBlockingCheck(result)) return result; 734 shared AwaitingFiber aw = shared(AwaitingFiber)(cast(shared)currentFiber); 735 foreach (i; 0..nfds) { 736 if (fds[i].events & POLLIN) 737 descriptors[fds[i].fd].enqueueReader(&aw); 738 else if(fds[i].events & POLLOUT) 739 descriptors[fds[i].fd].enqueueWriter(&aw); 740 } 741 Timer tm = timer(); 742 scope(exit) tm.dispose(); 743 tm.arm(timeout); 744 descriptors[tm.fd].enqueueReader(&aw); 745 Fiber.yield(); 746 tm.disarm(); 747 atomicStore(descriptors[tm.fd]._readerWaits, cast(shared(AwaitingFiber)*)null); 748 foreach (i; 0..nfds) { 749 if (fds[i].events & POLLIN) 750 descriptors[fds[i].fd].removeReader(&aw); 751 else if(fds[i].events & POLLOUT) 752 descriptors[fds[i].fd].removeWriter(&aw); 753 } 754 logf("Woke up after select %x. WakeFD=%d", cast(void*)currentFiber, currentFiber.wakeFd); 755 if (currentFiber.wakeFd == tm.fd) return 0; 756 else { 757 nonBlockingCheck(result); 758 return result; 759 } 760 } 761 } 762 763 extern(C) private ssize_t close(int fd) nothrow 764 { 765 logf("HOOKED CLOSE FD=%d", fd); 766 deregisterFd(fd); 767 return cast(int)withErrorno(syscall(SYS_CLOSE, fd)); 768 }