1 module photon.windows.core;
2 version(Windows):
3 private:
4 
5 import core.sys.windows.core;
6 import core.atomic;
7 import core.internal.spinlock;
8 import core.stdc.stdlib;
9 import core.thread;
10 import std.exception;
11 import std.windows.syserror;
12 import std.random;
13 import std.format;
14 
15 
16 struct SchedulerBlock
17 {
18     AlignedSpinLock lock; // lock around the queue
19     UMS_COMPLETION_LIST* completionList;
20     RingQueue!(UMS_CONTEXT*) queue; // queue has the number of outstanding threads
21     shared uint assigned; // total assigned UMS threads
22 
23     this(int size)
24     {
25         lock = AlignedSpinLock(SpinLock.Contention.brief);
26         queue = RingQueue!(UMS_CONTEXT*)(size);
27         wenforce(CreateUmsCompletionList(&completionList), "failed to create UMS completion");
28     }
29 }
30 
31 package(photon) __gshared SchedulerBlock[] scheds;
32 shared uint activeThreads;
33 size_t schedNum; // (TLS) number of scheduler
34 
35 struct Functor
36 {
37 	void delegate() func;
38 }
39 
40 public void startloop()
41 {
42     import core.cpuid;
43     uint threads = threadsPerCPU;
44     scheds = new SchedulerBlock[threads];
45     foreach (ref sched; scheds)
46         sched = SchedulerBlock(100_000);
47 }
48 
49 extern(Windows) uint worker(void* func)
50 {
51     auto functor = *cast(Functor*)func;
52     functor.func();
53     return 0;
54 }
55 
56 public void spawnLight(void delegate() func) {
57     return spawn(func);
58 }
59 
60 public void spawnHeavy(void delegate() func) {
61     new Thread(func).start();
62 }
63 
64 public void spawn(void delegate() func)
65 {
66     ubyte[128] buf = void;
67     size_t size = buf.length;
68     PROC_THREAD_ATTRIBUTE_LIST* attrList = cast(PROC_THREAD_ATTRIBUTE_LIST*)buf.ptr;
69     wenforce(InitializeProcThreadAttributeList(attrList, 1, 0, &size), "failed to initialize proc thread");
70     scope(exit) DeleteProcThreadAttributeList(attrList);
71     
72     UMS_CONTEXT* ctx;
73     wenforce(CreateUmsThreadContext(&ctx), "failed to create UMS context");
74 
75     // power of 2 random choices:
76     size_t a = uniform!"[)"(0, scheds.length);
77     size_t b = uniform!"[)"(0, scheds.length);
78     uint loadA = scheds[a].assigned; // take into account active queue.size?
79     uint loadB = scheds[b].assigned; // ditto
80     if (loadA < loadB) atomicOp!"+="(scheds[a].assigned, 1);
81     else atomicOp!"+="(scheds[b].assigned, 1);
82     UMS_CREATE_THREAD_ATTRIBUTES umsAttrs;
83     umsAttrs.UmsCompletionList = loadA < loadB ? scheds[a].completionList : scheds[b].completionList;
84     umsAttrs.UmsContext = ctx;
85     umsAttrs.UmsVersion = UMS_VERSION;
86 
87     wenforce(UpdateProcThreadAttribute(attrList, 0, PROC_THREAD_ATTRIBUTE_UMS_THREAD, &umsAttrs, umsAttrs.sizeof, null, null), "failed to update proc thread");
88     HANDLE handle = wenforce(CreateRemoteThreadEx(GetCurrentProcess(), null, 0, &worker, new Functor(func), 0, attrList, null), "failed to create thread");
89     atomicOp!"+="(activeThreads, 1);
90 }
91 
92 package(photon) void schedulerEntry(size_t n)
93 {
94     schedNum = n;
95     UMS_SCHEDULER_STARTUP_INFO info;
96     info.UmsVersion = UMS_VERSION;
97     info.CompletionList = scheds[n].completionList;
98     info.SchedulerProc = &umsScheduler;
99     info.SchedulerParam = null;
100     wenforce(SetThreadAffinityMask(GetCurrentThread(), 1<<n), "failed to set affinity");
101     wenforce(EnterUmsSchedulingMode(&info), "failed to enter UMS mode\n");
102 }
103 
104 extern(Windows) VOID umsScheduler(UMS_SCHEDULER_REASON Reason, ULONG_PTR ActivationPayload, PVOID SchedulerParam)
105 {
106     UMS_CONTEXT* ready;
107     auto completionList = scheds[schedNum].completionList;
108        logf("-----\nGot scheduled, reason: %d, schedNum: %x\n"w, Reason, schedNum);
109     if(!DequeueUmsCompletionListItems(completionList, 0, &ready)){
110         logf("Failed to dequeue ums workers!\n"w);
111         return;
112     }    
113     for (;;)
114     {
115       scheds[schedNum].lock.lock();
116       auto queue = &scheds[schedNum].queue; // struct, so take a ref
117       while (ready != null)
118       {
119           logf("Dequeued UMS thread context: %x\n"w, ready);
120           queue.push(ready);
121           ready = GetNextUmsListItem(ready);
122       }
123       scheds[schedNum].lock.unlock();
124       while(!queue.empty)
125       {
126         UMS_CONTEXT* ctx = queue.pop;
127         logf("Fetched thread context from our queue: %x\n", ctx);
128         BOOLEAN terminated;
129         uint size;
130         if(!QueryUmsThreadInformation(ctx, UMS_THREAD_INFO_CLASS.UmsThreadIsTerminated, &terminated, BOOLEAN.sizeof, &size))
131         {
132             logf("Query UMS failed: %d\n"w, GetLastError());
133             return;
134         }
135         if (!terminated)
136         {
137             auto ret = ExecuteUmsThread(ctx);
138             if (ret == ERROR_RETRY) // this UMS thread is locked, try it later
139             {
140                 logf("Need retry!\n");
141                 queue.push(ctx);
142             }
143             else
144             {
145                 logf("Failed to execute thread: %d\n"w, GetLastError());
146                 return;
147             }
148         }
149         else
150         {
151             logf("Terminated: %x\n"w, ctx);
152             //TODO: delete context or maybe cache them somewhere?
153             DeleteUmsThreadContext(ctx);
154             atomicOp!"-="(scheds[schedNum].assigned, 1);
155             atomicOp!"-="(activeThreads, 1);
156         }
157       }
158       if (activeThreads == 0)
159       {
160           logf("Shutting down\n"w);
161           return;
162       }
163       if(!DequeueUmsCompletionListItems(completionList, INFINITE, &ready))
164       {
165            logf("Failed to dequeue UMS workers!\n"w);
166            return;
167       }
168     }
169 }