1 module poll_test; 2 import std.stdio; 3 import core.sys.posix.sys.socket; 4 import core.sys.posix.unistd : read, write, _exit; 5 import core.sys.posix.sys.types; 6 import core.stdc.errno; 7 import core.sys.posix.sys.socket; 8 import core.thread; 9 import core.sys.posix.stdlib: abort; 10 import core.sys.posix.poll; 11 import core.sys.posix.fcntl; 12 import photon; 13 import photon.linux.support; 14 import photon.linux.syscalls; 15 16 shared int idx = 0; 17 18 void check(int code) { 19 if(code < 0) 20 abort(); 21 } 22 23 void writer(int fd) { 24 writefln("<started writer, fd = %d>", fd); 25 auto s = "wait and write\n"; 26 for (int i = 0; i < 30; ++i) { 27 logf("writer idx = %d", i); 28 idx = i; 29 ssize_t rc = write(fd, s.ptr, s.length).checked("write fail"); 30 logf("write rc = %d", rc); 31 Thread.sleep(1.seconds); 32 } 33 logf("<finished writer>"); 34 } 35 36 void reader(int fd) { 37 logf("<started reader, fd = %d>", fd); 38 char[100] buf; 39 ssize_t total = 15; 40 int timeout = 2000; 41 bool finished = false; 42 pollfd fds; 43 fds.fd = fd; 44 fds.events = POLLIN; 45 fds.revents = 0; 46 do { 47 int rc = poll(&fds, 1, timeout).checked("poll"); 48 logf("rc = %d", rc); 49 if (rc == 0) continue; 50 if (idx == 29) finished = true; 51 //logf("preparing to read"); 52 ssize_t resp = read(fds.fd, buf.ptr, total).checked("read fail"); 53 logf("read resp = %s", resp); 54 } while(!finished); 55 logf(" <finished reader>"); 56 } 57 58 void main(){ 59 int[2] socks; 60 61 startloop(); 62 socketpair(AF_UNIX, SOCK_STREAM, 0, socks).checked("here"); 63 logf("socks = %s", socks); 64 auto wr = new Thread(() => writer(socks[0])); 65 wr.start(); 66 67 spawn(() => reader(socks[1])); 68 runFibers(); 69 70 wr.join(); 71 }