-
Notifications
You must be signed in to change notification settings - Fork 72
Surround the GIL with a ReentrantLock on the Julia side. #637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
943473f
3b6caaa
beea5f2
2f795dc
53867da
58dba62
3b97f9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,3 +7,7 @@ dist/ | |
.CondaPkg/ | ||
/jltest.* | ||
uv.lock | ||
|
||
# pixi environments | ||
.pixi | ||
*.egg-info |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
struct TaskState | ||
task::Task | ||
sticky::Bool # original stickiness of the task | ||
state::C.PyGILState_STATE | ||
end | ||
|
||
struct TaskStack | ||
stack::Vector{TaskState} | ||
count::IdDict{Task,Int} | ||
condvar::Threads.Condition | ||
function TaskStack() | ||
return new(TaskState[], IdDict{Task,Int}(), Threads.Condition()) | ||
end | ||
end | ||
function Base.last(task_stack::TaskStack)::Task | ||
return last(task_stack.stack).task | ||
end | ||
function Base.push!(task_stack::TaskStack, task::Task) | ||
original_sticky = task.sticky | ||
# The task should not migrate threads while acquiring or holding the GIL | ||
task.sticky = true | ||
gil_state = C.PyGILState_Ensure() | ||
|
||
# Save the stickiness and state for when we release | ||
state = TaskState(task, original_sticky, gil_state) | ||
push!(task_stack.stack, state) | ||
|
||
# Increment the count for this task | ||
count = get(task_stack.count, task, 0) | ||
task_stack.count[task] = count + 1 | ||
|
||
return task_stack | ||
end | ||
function Base.pop!(task_stack::TaskStack)::Task | ||
state = pop!(task_stack.stack) | ||
task = state.task | ||
sticky = state.sticky | ||
gil_state = state.state | ||
|
||
# Decrement the count for this task | ||
count = task_stack.count[task] - 1 | ||
if count == 0 | ||
# If 0, remove it from the key set | ||
pop!(task_stack.count, task) | ||
else | ||
task_stack[task] = count | ||
end | ||
|
||
C.PyGILState_Release(gil_state) | ||
|
||
# Restore sticky state after releasing the GIL | ||
task.sticky = sticky | ||
|
||
Base.lock(task_stack.condvar) do | ||
notify(task_stack.condvar) | ||
end | ||
|
||
return task | ||
end | ||
Base.isempty(task_stack::TaskStack) = isempty(task_stack.stack) | ||
|
||
if !isdefined(Base, :OncePerThread) | ||
|
||
# OncePerThread is implemented in full in Julia 1.12 | ||
# This implementation is meant for compatibility with Julia 1.10 and 1.11 | ||
# and only supports a static number of threads. Use Julia 1.12 for dynamic | ||
# thread usage. | ||
mutable struct OncePerThread{T,F} <: Function | ||
@atomic xs::Vector{T} # values | ||
@atomic ss::Vector{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent | ||
const initializer::F | ||
function OncePerThread{T,F}(initializer::F) where {T,F} | ||
nt = Threads.maxthreadid() | ||
return new{T,F}(Vector{T}(undef, nt), zeros(UInt8, nt), initializer) | ||
end | ||
end | ||
OncePerThread{T}(initializer::Type{U}) where {T, U} = OncePerThread{T,Type{U}}(initializer) | ||
(once::OncePerThread{T,F})() where {T,F} = once[Threads.threadid()] | ||
function Base.getindex(once::OncePerThread, tid::Integer) | ||
tid = Threads.threadid() | ||
ss = @atomic :acquire once.ss | ||
xs = @atomic :monotonic once.xs | ||
if checkbounds(Bool, xs, tid) | ||
if ss[tid] == 0 | ||
xs[tid] = once.initializer() | ||
ss[tid] = 1 | ||
end | ||
return xs[tid] | ||
else | ||
throw(ErrorException("Thread id $tid is out of bounds as initially allocated. Use Julia 1.12 for dynamic thread usage.")) | ||
end | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not make this per-task rather than per-thread? I use something like this in SymbolicRegression.jl: struct PerTaskCache{T,F<:Function}
constructor::F
PerTaskCache{T}(constructor::F) where {T,F} = new{T,F}(constructor)
PerTaskCache{T}() where {T} = PerTaskCache{T}(() -> T())
end
function Base.getindex(cache::PerTaskCache{T}) where {T}
tls = Base.task_local_storage()
haskey(tls, cache) && return tls[cache]::T
value = cache.constructor()::T
tls[cache] = value
return value
end
function Base.setindex!(cache::PerTaskCache{T}, value::T) where {T}
tls = Base.task_local_storage()
tls[cache] = value
return value
end Then I can use it on each task, without worrying about locking or dynamic threading: const CachedPrep = PerTaskCache{Dict{UInt,Any}}()
function foo()
cache = CachedPrep[]
#= do stuff with cache::Dict{UInt,Any} =#
end There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Python GIL knows nothing about Julia's The goal of the new lock that Jameson proposed is to lock the GIL on a thread and make Several It is essentially a thread and GIL aware There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that holding the GIL together is sort of difficult to arrange to do entirely correctly, since python also allows callees to unlock it, even though that is unsound in the presence of other any other locks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks; makes sense! |
||
|
||
end | ||
|
||
struct GlobalInterpreterLock <: Base.AbstractLock | ||
lock_owners::OncePerThread{TaskStack} | ||
function GlobalInterpreterLock() | ||
return new(OncePerThread{TaskStack}(TaskStack)) | ||
end | ||
end | ||
function Base.lock(gil::GlobalInterpreterLock) | ||
push!(gil.lock_owners(), current_task()) | ||
return nothing | ||
end | ||
function Base.unlock(gil::GlobalInterpreterLock) | ||
lock_owner::TaskStack = gil.lock_owners() | ||
while last(lock_owner) != current_task() | ||
wait(lock_owner.condvar) | ||
end | ||
task = pop!(lock_owner) | ||
@assert task == current_task() | ||
return nothing | ||
end | ||
function Base.islocked(gil::GlobalInterpreterLock) | ||
# TODO: handle Julia 1.10 and 1.11 case when have not allocated up to maxthreadid | ||
return any(!isempty(gil.lock_owners[thread_index]) for thread_index in 1:Threads.maxthreadid()) | ||
end | ||
|
||
const _GIL = GlobalInterpreterLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Locking on
__init__
is probably a bad idea even though we are technically holding the GIL since this will disable finalizers.Rather we should consider invoking
lock
while executing other calls.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disabling this apparently allowed the python tests to succeed.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After sleeping on this, I propose you make the following changes to make this fix work:
in(lockowners().set, current_task)
for safety (forcing the user to make explicit GIL usage on each Task with a call to lock on each Task)?current_task().sticky = true
to prevent it migrating to a thread that is not holding the GIL.__init__
is dangerous since that doesn't run on the main Task in some user applications even though it always does so during your CI tests. You might just want to add some special exemptions for when current_task() == root_task(0)?Does that all sound logical and reasonable? My proposal already went through a few drafts as I tried to make it seem straightforward enough to explain and use, so it might not be the only way to deal with some of these constraints, and I'm probably forgetting at least some important details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OncePerThread
is to be introduced in Julia 1.12 which has yet to be released at the time of this writing. Is there a lesser version of this that could be implemented prior to Julia 1.12, preferably to Julia 1.10 LTS?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vtjnash Should
Condition
here beBase.Condition
(Base.GenericCondition{Base.AlwaysLockedST}
) orThreads.Condition
(Base.GenericCondition{ReentrantLock}
) ?I originally thought you meant
Threads.Condition
, but now I'm thinkingBase.Condition
may be correct since in theory all of theTask
s waiting to unlock should already be on the same thread.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Base.Condition, since this is locked to thread-local state