1 module photon.windows.core; 2 version(Windows): 3 private: 4 5 import core.sys.windows.core; 6 import core.sys.windows.winsock2; 7 import core.atomic; 8 import core.thread; 9 import core.internal.spinlock; 10 import std.exception; 11 import std.windows.syserror; 12 import core.stdc.stdlib; 13 import std.random; 14 import std.stdio; 15 import std.traits; 16 import std.meta; 17 18 import rewind.map; 19 20 import photon.ds.common; 21 import photon.ds.intrusive_queue; 22 import photon.windows.support; 23 24 shared struct RawEvent { 25 nothrow: 26 this(bool signaled) { 27 ev = cast(shared(HANDLE))CreateEventA(null, FALSE, signaled, null); 28 assert(ev != null, "Failed to create RawEvent"); 29 } 30 31 void waitAndReset() { 32 auto ret = WaitForSingleObject(cast(HANDLE)ev, INFINITE); 33 assert(ret == WAIT_OBJECT_0, "Failed while waiting on event"); 34 } 35 36 void trigger() { 37 auto ret = SetEvent(cast(HANDLE)ev); 38 assert(ret != 0); 39 } 40 41 HANDLE ev; 42 } 43 44 struct MultiAwait 45 { 46 int n; 47 void delegate() trigger; 48 MultiAwaitBox* box; 49 } 50 51 struct MultiAwaitBox { 52 shared size_t refCount; 53 shared FiberExt fiber; 54 } 55 56 extern(Windows) VOID waitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult) { 57 auto fiber = cast(FiberExt)Context; 58 fiber.schedule(); 59 } 60 61 62 extern(Windows) VOID waitAnyCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult) { 63 auto await = cast(MultiAwait*)Context; 64 auto fiber = cast()steal(await.box.fiber); 65 if (fiber) { 66 logf("AwaitAny callback waking up on %d object", await.n); 67 fiber.wakeUpObject = await.n; 68 fiber.schedule(); 69 } 70 else { 71 logf("AwaitAny callback - triggering awaitable again"); 72 await.trigger(); 73 } 74 auto cnt = atomicFetchSub(await.box.refCount, 1); 75 if (cnt == 1) free(await.box); 76 free(await); 77 CloseThreadpoolWait(Wait); 78 } 79 80 /// Event object 81 public struct Event { 82 83 @disable this(this); 84 85 this(bool signaled) { 86 ev = cast(HANDLE)CreateEventA(null, FALSE, signaled, null); 87 assert(ev != null, "Failed to create Event"); 88 } 89 90 /// Wait for the event to be triggered, then reset and return atomically 91 void waitAndReset() { 92 auto wait = CreateThreadpoolWait(&waitCallback, cast(void*)currentFiber, &environ); 93 wenforce(wait != null, "Failed to create threadpool wait object"); 94 SetThreadpoolWait(wait, cast(HANDLE)ev, null); 95 FiberExt.yield(); 96 CloseThreadpoolWait(wait); 97 } 98 99 /// 100 void waitAndReset() shared { 101 this.unshared.waitAndReset(); 102 } 103 104 private void registerForWaitAny(int n, MultiAwaitBox* box) { 105 auto context = cast(MultiAwait*)calloc(1, MultiAwait.sizeof); 106 context.box = box; 107 context.n = n; 108 context.trigger = cast(void delegate())&this.trigger; 109 auto wait = CreateThreadpoolWait(&waitAnyCallback, cast(void*)context, &environ); 110 wenforce(wait != null, "Failed to create threadpool wait object"); 111 SetThreadpoolWait(wait, cast(HANDLE)ev, null); 112 } 113 114 private void registerForWaitAny(int n, MultiAwaitBox* box) shared { 115 this.unshared.registerForWaitAny(n, box); 116 } 117 118 /// Trigger the event. 119 void trigger() { 120 auto ret = SetEvent(cast(HANDLE)ev); 121 assert(ret != 0); 122 } 123 124 void trigger() shared { 125 this.unshared.trigger(); 126 } 127 128 private: 129 HANDLE ev; 130 } 131 132 /// 133 public Event event(bool signaled) { 134 return Event(signaled); 135 } 136 137 /// Semaphore object 138 public struct Semaphore { 139 @disable this(this); 140 141 this(int count) { 142 // set max count to MacOS pipe limit 143 sem = cast(HANDLE)CreateSemaphoreA(null, count, 4096, null); 144 assert(sem != null, "Failed to create semaphore"); 145 } 146 147 this(int count) shared { 148 // set max count to MacOS pipe limit 149 sem = cast(shared(HANDLE))CreateSemaphoreA(null, count, 4096, null); 150 assert(sem != null, "Failed to create semaphore"); 151 } 152 153 /// 154 void wait() { 155 auto wait = CreateThreadpoolWait(&waitCallback, cast(void*)currentFiber, &environ); 156 wenforce(wait != null, "Failed to create threadpool wait object"); 157 SetThreadpoolWait(wait, cast(HANDLE)sem, null); 158 FiberExt.yield(); 159 CloseThreadpoolWait(wait); 160 } 161 162 void wait() shared { 163 this.unshared.wait(); 164 } 165 166 private void registerForWaitAny(int n, MultiAwaitBox* box) { 167 auto context = cast(MultiAwait*)calloc(1, MultiAwait.sizeof); 168 context.box = box; 169 context.n = n; 170 context.trigger = { this.trigger(1); }; 171 auto wait = CreateThreadpoolWait(&waitAnyCallback, cast(void*)context, &environ); 172 wenforce(wait != null, "Failed to create threadpool wait object"); 173 SetThreadpoolWait(wait, cast(HANDLE)sem, null); 174 } 175 176 private void registerForWaitAny(int n, MultiAwaitBox* box) shared { 177 this.unshared.registerForWaitAny(n, box); 178 } 179 180 181 /// 182 void trigger(int count) { 183 auto ret = ReleaseSemaphore(cast(HANDLE)sem, count, null); 184 assert(ret); 185 } 186 187 /// 188 void trigger(int count) shared { 189 this.unshared.trigger(count); 190 } 191 192 /// 193 void dispose() { 194 CloseHandle(cast(HANDLE)sem); 195 } 196 197 /// 198 void dispose() shared { 199 this.unshared.dispose(); 200 } 201 202 private: 203 HANDLE sem; 204 } 205 206 /// 207 public auto semaphore(int count) { 208 return Semaphore(count); 209 } 210 211 extern(Windows) VOID timerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_TIMER Timer) { 212 FiberExt fiber = cast(FiberExt)Context; 213 fiber.schedule(); 214 } 215 216 /// 217 struct Timer { 218 void wait(Duration dur) { 219 auto timer = CreateThreadpoolTimer(&timerCallback, cast(void*)currentFiber, &environ); 220 wenforce(timer != null, "Failed to create threadpool timer"); 221 FILETIME time; 222 long hnsecs = -dur.total!"hnsecs"; 223 time.dwHighDateTime = cast(DWORD)(hnsecs >> 32); 224 time.dwLowDateTime = hnsecs & 0xFFFF_FFFF; 225 SetThreadpoolTimer(timer, &time, 0, 0); 226 FiberExt.yield(); 227 CloseThreadpoolTimer(timer); 228 } 229 } 230 231 /// 232 public auto timer() { 233 return Timer(); 234 } 235 236 /// 237 public void delay(Duration req) { 238 auto tm = Timer(); // Stateless on Windows 239 tm.wait(req); 240 } 241 242 /// 243 enum isAwaitable(E) = is (E : Event) || is (E : Semaphore) 244 || is(E : Event*) || is(E : Semaphore*); 245 246 /// 247 public size_t awaitAny(Awaitable...)(auto ref Awaitable args) 248 if (allSatisfy!(isAwaitable, Awaitable)) { 249 auto box = cast(MultiAwaitBox*)calloc(1, MultiAwaitBox.sizeof); 250 box.refCount = args.length; 251 box.fiber = cast(shared)currentFiber; 252 foreach (int i, ref v; args) { 253 v.registerForWaitAny(i, box); 254 } 255 FiberExt.yield(); 256 return currentFiber.wakeUpObject; 257 } 258 259 /// 260 public size_t awaitAny(Awaitable)(Awaitable[] args) 261 if (allSatisfy!(isAwaitable, Awaitable)) { 262 auto box = cast(MultiAwaitBox*)calloc(1, MultiAwaitBox.sizeof); 263 box.refCount = args.length; 264 box.fiber = cast(shared)currentFiber; 265 foreach (int i, ref v; args) { 266 v.registerForWaitAny(i, box); 267 } 268 FiberExt.yield(); 269 return currentFiber.wakeUpObject; 270 } 271 272 struct SchedulerBlock { 273 shared IntrusiveQueue!(FiberExt, RawEvent) queue; 274 shared uint assigned; 275 size_t[1] padding; 276 } 277 static assert(SchedulerBlock.sizeof == 64); 278 279 class FiberExt : Fiber { 280 FiberExt next; 281 uint numScheduler; 282 int bytesTransfered; 283 int wakeUpObject; 284 285 enum PAGESIZE = 4096; 286 287 this(void function() fn, uint numSched) nothrow { 288 super(fn); 289 numScheduler = numSched; 290 } 291 292 this(void delegate() dg, uint numSched) nothrow { 293 super(dg); 294 numScheduler = numSched; 295 } 296 297 void schedule() nothrow 298 { 299 scheds[numScheduler].queue.push(this); 300 } 301 } 302 303 package(photon) shared SchedulerBlock[] scheds; 304 305 enum MAX_THREADPOOL_SIZE = 100; 306 FiberExt currentFiber; 307 __gshared Map!(SOCKET, FiberExt) ioWaiters = new Map!(SOCKET, FiberExt); // mapping of sockets to awaiting fiber 308 __gshared RawEvent termination; // termination event, triggered once last fiber exits 309 __gshared HANDLE iocp; // IO Completion port 310 __gshared PTP_POOL threadPool; // for synchronious syscalls 311 __gshared TP_CALLBACK_ENVIRON_V3 environ; // callback environment for the pool 312 shared int alive; // count of non-terminated Fibers scheduled 313 314 315 public void startloop() { 316 SYSTEM_INFO info; 317 GetSystemInfo(&info); 318 // TODO: handle NUMA case 319 uint threads = info.dwNumberOfProcessors; 320 scheds = new SchedulerBlock[threads]; 321 foreach(ref sched; scheds) { 322 sched.queue = IntrusiveQueue!(FiberExt, RawEvent)(RawEvent(0)); 323 } 324 threadPool = CreateThreadpool(null); 325 wenforce(threadPool != null, "Failed to create threadpool"); 326 SetThreadpoolThreadMaximum(threadPool, MAX_THREADPOOL_SIZE); 327 wenforce(SetThreadpoolThreadMinimum(threadPool, 1) == TRUE, "Failed to set threadpool minimum size"); 328 InitializeThreadpoolEnvironment(&environ); 329 SetThreadpoolCallbackPool(&environ, threadPool); 330 331 termination = RawEvent(false); 332 iocp = CreateIoCompletionPort(cast(HANDLE)INVALID_HANDLE_VALUE, null, 0, 1); 333 wenforce(iocp != null, "Failed to create IO Completion Port"); 334 wenforce(CreateThread(null, 0, &eventLoop, null, 0, null) != null, "Failed to start event loop"); 335 } 336 337 /// Convenience overload for functions 338 public void go(void function() func) { 339 go({ func(); }); 340 } 341 342 /// Setup a fiber task to run on the Photon scheduler. 343 public void go(void delegate() func) { 344 import std.random; 345 uint choice; 346 if (scheds.length == 1) choice = 0; 347 else { 348 uint a = uniform!"[)"(0, cast(uint)scheds.length); 349 uint b = uniform!"[)"(0, cast(uint)scheds.length-1); 350 if (a == b) b = cast(uint)scheds.length-1; 351 uint loadA = scheds[a].assigned; 352 uint loadB = scheds[b].assigned; 353 if (loadA < loadB) choice = a; 354 else choice = b; 355 } 356 atomicOp!"+="(scheds[choice].assigned, 1); 357 atomicOp!"+="(alive, 1); 358 auto f = new FiberExt(func, choice); 359 logf("Assigned %x -> %d scheduler", cast(void*)f, choice); 360 f.schedule(); 361 } 362 363 package(photon) void schedulerEntry(size_t n) 364 { 365 // TODO: handle NUMA case 366 wenforce(SetThreadAffinityMask(GetCurrentThread(), 1L<<n), "failed to set affinity"); 367 shared SchedulerBlock* sched = scheds.ptr + n; 368 while (alive > 0) { 369 sched.queue.event.waitAndReset(); 370 for(;;) { 371 FiberExt f = sched.queue.drain(); 372 if (f is null) break; // drained an empty queue, time to sleep 373 do { 374 auto next = f.next; //save next, it will be reused on scheduling 375 currentFiber = f; 376 logf("Fiber %x started", cast(void*)f); 377 try { 378 f.call(); 379 } 380 catch (Exception e) { 381 stderr.writeln(e); 382 atomicOp!"-="(alive, 1); 383 } 384 if (f.state == FiberExt.State.TERM) { 385 logf("Fiber %s terminated", cast(void*)f); 386 atomicOp!"-="(alive, 1); 387 } 388 f = next; 389 } while(f !is null); 390 } 391 } 392 termination.trigger(); 393 foreach (ref s; scheds) { 394 s.queue.event.trigger(); 395 } 396 } 397 398 enum int MAX_COMPLETIONS = 500; 399 400 extern(Windows) uint eventLoop(void* param) { 401 HANDLE[2] events; 402 events[0] = iocp; 403 events[1] = cast(HANDLE)termination.ev; 404 logf("Started event loop! IOCP = %x termination = %x", iocp, termination.ev); 405 for (;;) { 406 auto ret = WaitForMultipleObjects(2, events.ptr, FALSE, INFINITE); 407 logf("Got signalled in event loop %d", ret); 408 if (ret == WAIT_OBJECT_0) { // iocp 409 OVERLAPPED_ENTRY[MAX_COMPLETIONS] entries = void; 410 uint count = 0; 411 while(GetQueuedCompletionStatusEx(iocp, entries.ptr, MAX_COMPLETIONS, &count, 0, FALSE)) { 412 logf("Dequeued I/O events=%d", count); 413 foreach (e; entries[0..count]) { 414 SOCKET sock = cast(SOCKET)e.lpCompletionKey; 415 FiberExt fiber = ioWaiters[sock]; 416 fiber.bytesTransfered = cast(int)e.dwNumberOfBytesTransferred; 417 fiber.schedule(); 418 } 419 if (count < MAX_COMPLETIONS) break; 420 } 421 } 422 else if (ret == WAIT_OBJECT_0 + 1) { // termination 423 break; 424 } 425 else { 426 logf("Failed to wait for multiple objects: %x", ret); 427 break; 428 } 429 } 430 ExitThread(0); 431 return 0; 432 } 433 434 435 // =========================================================================== 436 // INTERCEPTS 437 // =========================================================================== 438 439 extern(Windows) SOCKET socket(int af, int type, int protocol) { 440 logf("Intercepted socket!"); 441 SOCKET s = WSASocketW(af, type, protocol, null, 0, WSA_FLAG_OVERLAPPED); 442 registerSocket(s); 443 return s; 444 } 445 446 struct AcceptState { 447 SOCKET socket; 448 sockaddr* addr; 449 LPINT addrlen; 450 FiberExt fiber; 451 } 452 453 extern(Windows) VOID acceptJob(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) 454 { 455 AcceptState* state = cast(AcceptState*)Context; 456 logf("Started threadpool job"); 457 SOCKET resp = WSAAccept(state.socket, state.addr, state.addrlen, null, 0); 458 if (resp != INVALID_SOCKET) { 459 registerSocket(resp); 460 } 461 state.socket = resp; 462 state.fiber.schedule(); 463 } 464 465 extern(Windows) SOCKET accept(SOCKET s, sockaddr* addr, LPINT addrlen) { 466 logf("Intercepted accept!"); 467 AcceptState state; 468 state.socket = s; 469 state.addr = addr; 470 state.addrlen = addrlen; 471 state.fiber = currentFiber; 472 PTP_WORK work = CreateThreadpoolWork(&acceptJob, &state, &environ); 473 wenforce(work != null, "Failed to create work for threadpool"); 474 SubmitThreadpoolWork(work); 475 FiberExt.yield(); 476 CloseThreadpoolWork(work); 477 return state.socket; 478 } 479 480 void registerSocket(SOCKET s) { 481 HANDLE port = iocp; 482 wenforce(CreateIoCompletionPort(cast(void*)s, port, cast(size_t)s, 0) == port, "failed to register I/O completion"); 483 } 484 485 extern(Windows) int recv(SOCKET s, void* buf, int len, int flags) { 486 OVERLAPPED overlapped; 487 WSABUF wsabuf = WSABUF(cast(uint)len, buf); 488 ioWaiters[s] = currentFiber; 489 uint received = 0; 490 int ret = WSARecv(s, &wsabuf, 1, &received, cast(uint*)&flags, cast(LPWSAOVERLAPPED)&overlapped, null); 491 logf("Got recv %d", ret); 492 if (ret >= 0) { 493 FiberExt.yield(); 494 return received; 495 } 496 else { 497 auto lastError = GetLastError(); 498 logf("Last error = %d", lastError); 499 if (lastError == ERROR_IO_PENDING) { 500 FiberExt.yield(); 501 return currentFiber.bytesTransfered; 502 } 503 else 504 return ret; 505 } 506 } 507 508 extern(Windows) int send(SOCKET s, void* buf, int len, int flags) { 509 OVERLAPPED overlapped; 510 WSABUF wsabuf = WSABUF(cast(uint)len, buf); 511 ioWaiters[s] = currentFiber; 512 uint sent = 0; 513 int ret = WSASend(s, &wsabuf, 1, &sent, flags, cast(LPWSAOVERLAPPED)&overlapped, null); 514 logf("Get send %d", ret); 515 if (ret >= 0) { 516 FiberExt.yield(); 517 return sent; 518 } 519 else { 520 auto lastError = GetLastError(); 521 logf("Last error = %d", lastError); 522 if (lastError == ERROR_IO_PENDING) { 523 FiberExt.yield(); 524 return currentFiber.bytesTransfered; 525 } 526 else 527 return ret; 528 } 529 } 530 531 532 extern(Windows) void Sleep(DWORD dwMilliseconds) { 533 if (currentFiber !is null) { 534 auto tm = timer(); 535 tm.wait(dwMilliseconds.msecs); 536 } else { 537 SleepEx(dwMilliseconds, FALSE); 538 } 539 }