1 module photon.windows.core; 2 version(Windows): 3 private: 4 5 import core.sys.windows.core; 6 import core.atomic; 7 import core.internal.spinlock; 8 import core.stdc.stdlib; 9 import core.thread; 10 import std.exception; 11 import std.windows.syserror; 12 import std.random; 13 import std.format; 14 15 16 struct SchedulerBlock 17 { 18 AlignedSpinLock lock; // lock around the queue 19 UMS_COMPLETION_LIST* completionList; 20 RingQueue!(UMS_CONTEXT*) queue; // queue has the number of outstanding threads 21 shared uint assigned; // total assigned UMS threads 22 23 this(int size) 24 { 25 lock = AlignedSpinLock(SpinLock.Contention.brief); 26 queue = RingQueue!(UMS_CONTEXT*)(size); 27 wenforce(CreateUmsCompletionList(&completionList), "failed to create UMS completion"); 28 } 29 } 30 31 package(photon) __gshared SchedulerBlock[] scheds; 32 shared uint activeThreads; 33 size_t schedNum; // (TLS) number of scheduler 34 35 struct Functor 36 { 37 void delegate() func; 38 } 39 40 public void startloop() 41 { 42 import core.cpuid; 43 uint threads = threadsPerCPU; 44 scheds = new SchedulerBlock[threads]; 45 foreach (ref sched; scheds) 46 sched = SchedulerBlock(100_000); 47 } 48 49 extern(Windows) uint worker(void* func) 50 { 51 auto functor = *cast(Functor*)func; 52 functor.func(); 53 return 0; 54 } 55 56 public void spawnLight(void delegate() func) { 57 return spawn(func); 58 } 59 60 public void spawnHeavy(void delegate() func) { 61 new Thread(func).start(); 62 } 63 64 public void spawn(void delegate() func) 65 { 66 ubyte[128] buf = void; 67 size_t size = buf.length; 68 PROC_THREAD_ATTRIBUTE_LIST* attrList = cast(PROC_THREAD_ATTRIBUTE_LIST*)buf.ptr; 69 wenforce(InitializeProcThreadAttributeList(attrList, 1, 0, &size), "failed to initialize proc thread"); 70 scope(exit) DeleteProcThreadAttributeList(attrList); 71 72 UMS_CONTEXT* ctx; 73 wenforce(CreateUmsThreadContext(&ctx), "failed to create UMS context"); 74 75 // power of 2 random choices: 76 size_t a = uniform!"[)"(0, scheds.length); 77 size_t b = uniform!"[)"(0, scheds.length); 78 uint loadA = scheds[a].assigned; // take into account active queue.size? 79 uint loadB = scheds[b].assigned; // ditto 80 if (loadA < loadB) atomicOp!"+="(scheds[a].assigned, 1); 81 else atomicOp!"+="(scheds[b].assigned, 1); 82 UMS_CREATE_THREAD_ATTRIBUTES umsAttrs; 83 umsAttrs.UmsCompletionList = loadA < loadB ? scheds[a].completionList : scheds[b].completionList; 84 umsAttrs.UmsContext = ctx; 85 umsAttrs.UmsVersion = UMS_VERSION; 86 87 wenforce(UpdateProcThreadAttribute(attrList, 0, PROC_THREAD_ATTRIBUTE_UMS_THREAD, &umsAttrs, umsAttrs.sizeof, null, null), "failed to update proc thread"); 88 HANDLE handle = wenforce(CreateRemoteThreadEx(GetCurrentProcess(), null, 0, &worker, new Functor(func), 0, attrList, null), "failed to create thread"); 89 atomicOp!"+="(activeThreads, 1); 90 } 91 92 package(photon) void schedulerEntry(size_t n) 93 { 94 schedNum = n; 95 UMS_SCHEDULER_STARTUP_INFO info; 96 info.UmsVersion = UMS_VERSION; 97 info.CompletionList = scheds[n].completionList; 98 info.SchedulerProc = &umsScheduler; 99 info.SchedulerParam = null; 100 wenforce(SetThreadAffinityMask(GetCurrentThread(), 1<<n), "failed to set affinity"); 101 wenforce(EnterUmsSchedulingMode(&info), "failed to enter UMS mode\n"); 102 } 103 104 extern(Windows) VOID umsScheduler(UMS_SCHEDULER_REASON Reason, ULONG_PTR ActivationPayload, PVOID SchedulerParam) 105 { 106 UMS_CONTEXT* ready; 107 auto completionList = scheds[schedNum].completionList; 108 logf("-----\nGot scheduled, reason: %d, schedNum: %x\n"w, Reason, schedNum); 109 if(!DequeueUmsCompletionListItems(completionList, 0, &ready)){ 110 logf("Failed to dequeue ums workers!\n"w); 111 return; 112 } 113 for (;;) 114 { 115 scheds[schedNum].lock.lock(); 116 auto queue = &scheds[schedNum].queue; // struct, so take a ref 117 while (ready != null) 118 { 119 logf("Dequeued UMS thread context: %x\n"w, ready); 120 queue.push(ready); 121 ready = GetNextUmsListItem(ready); 122 } 123 scheds[schedNum].lock.unlock(); 124 while(!queue.empty) 125 { 126 UMS_CONTEXT* ctx = queue.pop; 127 logf("Fetched thread context from our queue: %x\n", ctx); 128 BOOLEAN terminated; 129 uint size; 130 if(!QueryUmsThreadInformation(ctx, UMS_THREAD_INFO_CLASS.UmsThreadIsTerminated, &terminated, BOOLEAN.sizeof, &size)) 131 { 132 logf("Query UMS failed: %d\n"w, GetLastError()); 133 return; 134 } 135 if (!terminated) 136 { 137 auto ret = ExecuteUmsThread(ctx); 138 if (ret == ERROR_RETRY) // this UMS thread is locked, try it later 139 { 140 logf("Need retry!\n"); 141 queue.push(ctx); 142 } 143 else 144 { 145 logf("Failed to execute thread: %d\n"w, GetLastError()); 146 return; 147 } 148 } 149 else 150 { 151 logf("Terminated: %x\n"w, ctx); 152 //TODO: delete context or maybe cache them somewhere? 153 DeleteUmsThreadContext(ctx); 154 atomicOp!"-="(scheds[schedNum].assigned, 1); 155 atomicOp!"-="(activeThreads, 1); 156 } 157 } 158 if (activeThreads == 0) 159 { 160 logf("Shutting down\n"w); 161 return; 162 } 163 if(!DequeueUmsCompletionListItems(completionList, INFINITE, &ready)) 164 { 165 logf("Failed to dequeue UMS workers!\n"w); 166 return; 167 } 168 } 169 }