Skip to content

Commit 1caee45

Browse files
Merge pull request #348 from swiftwasm/yt/fix-worker-scheduling
Ensure a job enqueued on a worker must be run within the same macro task
2 parents 3b92b8c + 138b439 commit 1caee45

File tree

2 files changed

+243
-28
lines changed

2 files changed

+243
-28
lines changed

Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift

+65-28
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ import WASILibc
8787
/// }
8888
/// ```
8989
///
90+
/// ## Scheduling invariants
91+
///
92+
/// * Jobs enqueued on a worker are guaranteed to run within the same macrotask in which they were scheduled.
93+
///
9094
/// ## Known limitations
9195
///
9296
/// Currently, the Cooperative Global Executor of Swift runtime has a bug around
@@ -135,22 +139,26 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
135139
/// +---------+ +------------+
136140
/// +----->| Idle |--[terminate]-->| Terminated |
137141
/// | +---+-----+ +------------+
138-
/// | |
139-
/// | [enqueue]
140-
/// | |
141-
/// [no more job] |
142-
/// | v
143-
/// | +---------+
144-
/// +------| Running |
145-
/// +---------+
142+
/// | | \
143+
/// | | \------------------+
144+
/// | | |
145+
/// | [enqueue] [enqueue] (on other thread)
146+
/// | | |
147+
/// [no more job] | |
148+
/// | v v
149+
/// | +---------+ +---------+
150+
/// +------| Running |<--[wake]--| Ready |
151+
/// +---------+ +---------+
146152
///
147153
enum State: UInt32, AtomicRepresentable {
148154
/// The worker is idle and waiting for a new job.
149155
case idle = 0
156+
/// A wake message is sent to the worker, but it has not been received it yet
157+
case ready = 1
150158
/// The worker is processing a job.
151-
case running = 1
159+
case running = 2
152160
/// The worker is terminated.
153-
case terminated = 2
161+
case terminated = 3
154162
}
155163
let state: Atomic<State> = Atomic(.idle)
156164
/// TODO: Rewrite it to use real queue :-)
@@ -197,32 +205,46 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
197205
func enqueue(_ job: UnownedJob) {
198206
statsIncrement(\.enqueuedJobs)
199207
var locked: Bool
208+
let onTargetThread = Self.currentThread === self
209+
// If it's on the thread and it's idle, we can directly schedule a `Worker/run` microtask.
210+
let desiredState: State = onTargetThread ? .running : .ready
200211
repeat {
201212
let result: Void? = jobQueue.withLockIfAvailable { queue in
202213
queue.append(job)
214+
trace("Worker.enqueue idle -> running")
203215
// Wake up the worker to process a job.
204-
switch state.exchange(.running, ordering: .sequentiallyConsistent) {
205-
case .idle:
206-
if Self.currentThread === self {
216+
trace("Worker.enqueue idle -> \(desiredState)")
217+
switch state.compareExchange(
218+
expected: .idle,
219+
desired: desiredState,
220+
ordering: .sequentiallyConsistent
221+
) {
222+
case (true, _):
223+
if onTargetThread {
207224
// Enqueueing a new job to the current worker thread, but it's idle now.
208225
// This is usually the case when a continuation is resumed by JS events
209226
// like `setTimeout` or `addEventListener`.
210227
// We can run the job and subsequently spawned jobs immediately.
211-
// JSPromise.resolve(JSValue.undefined).then { _ in
212-
_ = JSObject.global.queueMicrotask!(
213-
JSOneshotClosure { _ in
214-
self.run()
215-
return JSValue.undefined
216-
}
217-
)
228+
scheduleRunWithinMacroTask()
218229
} else {
219230
let tid = self.tid.load(ordering: .sequentiallyConsistent)
220231
swjs_wake_up_worker_thread(tid)
221232
}
222-
case .running:
233+
case (false, .idle):
234+
preconditionFailure("unreachable: idle -> \(desiredState) should return exchanged=true")
235+
case (false, .ready):
236+
// A wake message is sent to the worker, but it has not been received it yet
237+
if onTargetThread {
238+
// This means the job is enqueued outside of `Worker/run` (typically triggered
239+
// JS microtasks not awaited by Swift), then schedule a `Worker/run` within
240+
// the same macrotask.
241+
state.store(.running, ordering: .sequentiallyConsistent)
242+
scheduleRunWithinMacroTask()
243+
}
244+
case (false, .running):
223245
// The worker is already running, no need to wake up.
224246
break
225-
case .terminated:
247+
case (false, .terminated):
226248
// Will not wake up the worker because it's already terminated.
227249
break
228250
}
@@ -231,7 +253,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
231253
} while !locked
232254
}
233255

234-
func scheduleNextRun() {
256+
func scheduleRunWithinMacroTask() {
235257
_ = JSObject.global.queueMicrotask!(
236258
JSOneshotClosure { _ in
237259
self.run()
@@ -265,12 +287,27 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
265287
trace("Worker.start tid=\(tid)")
266288
}
267289

290+
/// On receiving a wake-up message from other thread
291+
func wakeUpFromOtherThread() {
292+
let (exchanged, _) = state.compareExchange(
293+
expected: .ready,
294+
desired: .running,
295+
ordering: .sequentiallyConsistent
296+
)
297+
guard exchanged else {
298+
// `Worker/run` was scheduled on the thread before JS event loop starts
299+
// a macrotask handling wake-up message.
300+
return
301+
}
302+
run()
303+
}
304+
268305
/// Process jobs in the queue.
269306
///
270307
/// Return when the worker has no more jobs to run or terminated.
271308
/// This method must be called from the worker thread after the worker
272309
/// is started by `start(executor:)`.
273-
func run() {
310+
private func run() {
274311
trace("Worker.run")
275312
guard let executor = parentTaskExecutor else {
276313
preconditionFailure("The worker must be started with a parent executor.")
@@ -290,7 +327,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
290327
queue.removeFirst()
291328
return job
292329
}
293-
// No more jobs to run now. Wait for a new job to be enqueued.
330+
// No more jobs to run now.
294331
let (exchanged, original) = state.compareExchange(
295332
expected: .running,
296333
desired: .idle,
@@ -301,7 +338,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
301338
case (true, _):
302339
trace("Worker.run exited \(original) -> idle")
303340
return nil // Regular case
304-
case (false, .idle):
341+
case (false, .idle), (false, .ready):
305342
preconditionFailure("unreachable: Worker/run running in multiple threads!?")
306343
case (false, .running):
307344
preconditionFailure("unreachable: running -> idle should return exchanged=true")
@@ -657,12 +694,12 @@ func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) {
657694
@_expose(wasm, "swjs_wake_worker_thread")
658695
#endif
659696
func _swjs_wake_worker_thread() {
660-
WebWorkerTaskExecutor.Worker.currentThread!.run()
697+
WebWorkerTaskExecutor.Worker.currentThread!.wakeUpFromOtherThread()
661698
}
662699

663700
private func trace(_ message: String) {
664701
#if JAVASCRIPTKIT_TRACE
665-
JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
702+
_ = JSObject.global.console.warn("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
666703
#endif
667704
}
668705

Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift

+178
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#if compiler(>=6.1) && _runtime(_multithreaded)
2+
import Synchronization
23
import XCTest
34
import _CJavaScriptKit // For swjs_get_worker_thread_id
45
@testable import JavaScriptKit
@@ -22,6 +23,7 @@ func pthread_mutex_lock(_ mutex: UnsafeMutablePointer<pthread_mutex_t>) -> Int32
2223
}
2324
#endif
2425

26+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
2527
final class WebWorkerTaskExecutorTests: XCTestCase {
2628
func testTaskRunOnMainThread() async throws {
2729
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
@@ -97,6 +99,182 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
9799
executor.terminate()
98100
}
99101

102+
func testScheduleJobWithinMacroTask1() async throws {
103+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
104+
defer { executor.terminate() }
105+
106+
final class Context: @unchecked Sendable {
107+
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
108+
let hasEnqueuedFromMain = Atomic<Bool>(false)
109+
let hasReachedNextMacroTask = Atomic<Bool>(false)
110+
let hasJobBEnded = Atomic<Bool>(false)
111+
let hasJobCEnded = Atomic<Bool>(false)
112+
}
113+
114+
// Scenario 1.
115+
// | Main | Worker |
116+
// | +---------------------+--------------------------+
117+
// | | | Start JS macrotask |
118+
// | | | Start 1st wake-loop |
119+
// | | | Enq JS microtask A |
120+
// | | | End 1st wake-loop |
121+
// | | | Start a JS microtask A |
122+
// time | Enq job B to Worker | [PAUSE] |
123+
// | | | Enq Swift job C |
124+
// | | | End JS microtask A |
125+
// | | | Start 2nd wake-loop |
126+
// | | | Run Swift job B |
127+
// | | | Run Swift job C |
128+
// | | | End 2nd wake-loop |
129+
// v | | End JS macrotask |
130+
// +---------------------+--------------------------+
131+
132+
let context = Context()
133+
Task {
134+
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
135+
try! await Task.sleep(nanoseconds: 1_000)
136+
}
137+
// Enqueue job B to Worker
138+
Task(executorPreference: executor) {
139+
XCTAssertFalse(isMainThread())
140+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
141+
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
142+
}
143+
XCTAssertTrue(isMainThread())
144+
// Resume worker thread to let it enqueue job C
145+
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
146+
}
147+
148+
// Start worker
149+
await Task(executorPreference: executor) {
150+
// Schedule a new macrotask to detect if the current macrotask has completed
151+
JSObject.global.setTimeout.function!(
152+
JSOneshotClosure { _ in
153+
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
154+
return .undefined
155+
},
156+
0
157+
)
158+
159+
// Enqueue a microtask, not managed by WebWorkerTaskExecutor
160+
JSObject.global.queueMicrotask.function!(
161+
JSOneshotClosure { _ in
162+
// Resume the main thread and let it enqueue job B
163+
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
164+
// Wait until the enqueue has completed
165+
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
166+
// Should be still in the same macrotask
167+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
168+
// Enqueue job C
169+
Task(executorPreference: executor) {
170+
// Should be still in the same macrotask
171+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
172+
// Notify that job C has completed
173+
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
174+
}
175+
return .undefined
176+
},
177+
0
178+
)
179+
// Wait until job B, C and the next macrotask have completed
180+
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent)
181+
|| !context.hasJobCEnded.load(ordering: .sequentiallyConsistent)
182+
|| !context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)
183+
{
184+
try! await Task.sleep(nanoseconds: 1_000)
185+
}
186+
}.value
187+
}
188+
189+
func testScheduleJobWithinMacroTask2() async throws {
190+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
191+
defer { executor.terminate() }
192+
193+
final class Context: @unchecked Sendable {
194+
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
195+
let hasEnqueuedFromMain = Atomic<Bool>(false)
196+
let hasReachedNextMacroTask = Atomic<Bool>(false)
197+
let hasJobBEnded = Atomic<Bool>(false)
198+
let hasJobCEnded = Atomic<Bool>(false)
199+
}
200+
201+
// Scenario 2.
202+
// (The order of enqueue of job B and C are reversed from Scenario 1)
203+
//
204+
// | Main | Worker |
205+
// | +---------------------+--------------------------+
206+
// | | | Start JS macrotask |
207+
// | | | Start 1st wake-loop |
208+
// | | | Enq JS microtask A |
209+
// | | | End 1st wake-loop |
210+
// | | | Start a JS microtask A |
211+
// | | | Enq Swift job C |
212+
// time | Enq job B to Worker | [PAUSE] |
213+
// | | | End JS microtask A |
214+
// | | | Start 2nd wake-loop |
215+
// | | | Run Swift job B |
216+
// | | | Run Swift job C |
217+
// | | | End 2nd wake-loop |
218+
// v | | End JS macrotask |
219+
// +---------------------+--------------------------+
220+
221+
let context = Context()
222+
Task {
223+
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
224+
try! await Task.sleep(nanoseconds: 1_000)
225+
}
226+
// Enqueue job B to Worker
227+
Task(executorPreference: executor) {
228+
XCTAssertFalse(isMainThread())
229+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
230+
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
231+
}
232+
XCTAssertTrue(isMainThread())
233+
// Resume worker thread to let it enqueue job C
234+
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
235+
}
236+
237+
// Start worker
238+
await Task(executorPreference: executor) {
239+
// Schedule a new macrotask to detect if the current macrotask has completed
240+
JSObject.global.setTimeout.function!(
241+
JSOneshotClosure { _ in
242+
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
243+
return .undefined
244+
},
245+
0
246+
)
247+
248+
// Enqueue a microtask, not managed by WebWorkerTaskExecutor
249+
JSObject.global.queueMicrotask.function!(
250+
JSOneshotClosure { _ in
251+
// Enqueue job C
252+
Task(executorPreference: executor) {
253+
// Should be still in the same macrotask
254+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
255+
// Notify that job C has completed
256+
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
257+
}
258+
// Resume the main thread and let it enqueue job B
259+
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
260+
// Wait until the enqueue has completed
261+
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
262+
// Should be still in the same macrotask
263+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
264+
return .undefined
265+
},
266+
0
267+
)
268+
// Wait until job B, C and the next macrotask have completed
269+
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent)
270+
|| !context.hasJobCEnded.load(ordering: .sequentiallyConsistent)
271+
|| !context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)
272+
{
273+
try! await Task.sleep(nanoseconds: 1_000)
274+
}
275+
}.value
276+
}
277+
100278
func testTaskGroupRunOnSameThread() async throws {
101279
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 3)
102280

0 commit comments

Comments
 (0)