NEW v0.2.1 — Production-ready race detector

Find data races
before they find you.

Wrap any shared object with protect(). Unsynchronized concurrent access instantly throws a RaceConditionError — with thread names, timing, and fix instructions. Zero overhead in production.

$ pip install raceguard
View API →
thread simulation — live
Error output
python main.py
Traceback (most recent call last): File "main.py", line 14, in worker shared_list.append(item) raceguard.RaceConditionError: ──────────────────────────────────────────────────────────── Race condition detected! ──────────────────────────────────────────────────────────── Object : list → [0, 1, 2, 3, 4, 5, …] Operation : write (previous was write) Current thread : 'Writer-2' Previous thread : 'Writer-1' Time gap : 2.41 ms (window: 10 ms) Lock held : No ──────────────────────────────────────────────────────────── Fix: wrap the access in 'with locked(obj):' or decorate with @with_lock(obj).
Detect. Understand. Fix.
No manual instrumentation. No post-mortem guesswork. One line of code.
Instant detection
Raises the moment two threads access the same object within the race window without holding a lock. Works on list, dict, set, and any custom object.
🔒
Lock-aware heuristic
Tracks whether the previous access was under a lock. If it was, the next access — even lockless — is correctly deemed safe. Zero false positives.
🎯
One line. Any object.
data = protect([]) — that's it. The proxy behaves transparently, automatically protecting nested child objects without manual wrapping.
🛡️
Zero Prod Overhead
Set RACEGUARD_ENABLED=0 (or call configure(enabled=False) before ANY objects are wrapped) in production for a true zero-cost passthrough of your original objects. In dev-mode, proxy overhead is mere microseconds per access.
📖
Rich error reports
Every error includes thread names, timing data, operation type, and a specific fix suggestion — so you know exactly what to do next.
🔗
Semantic Race Protection
Group multiple proxied objects under a single AtomicGroup to enforce strict joint transactional safety, preventing logical blindspots and TOCTOU (Time-Of-Check to Time-Of-Use) races.
⏱️
Strict Mode for CI
Use configure(strict=True) in CI to catch lockless accesses regardless of time delays. Overcomes the heuristic limits of highly-loaded systems.
🔄
Asyncio Tracking
Seamlessly tracks underlying asyncio task identities to catch thread interleaving between pure async tasks on the event loop.
🌍
Cross-Platform Verified
Fully supported and rigorously tested across native Windows, Linux containers, and macOS environments.
Drop in. No refactor.
Wrap → guard → ship. The whole API fits in your head.
✗ without raceguard python
import threading

results = []  # shared, unprotected

def worker(x):
    results.append(x)
    # silent data corruption in prod

threads = [
    threading.Thread(target=worker, args=(i,))
    for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
# bug impossible to reproduce reliably
✓ with raceguard python
import threading
from raceguard import protect, with_lock

results = protect([])  # ← one change

@with_lock(results)
def worker(x):
    results.append(x)  # lock held → safe

threads = [
    threading.Thread(target=worker, args=(i,))
    for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
context mgr + atomic group python
from raceguard import protect, locked, AtomicGroup

a = protect([])
b = protect({})

# bind objects transactionally
group = AtomicGroup(a, b)

def transfer():
    with locked(group):
        a.append(1)
        b['x'] = 1
    # locks automatically resolve
custom objects + production toggle python
from raceguard import protect, with_lock, configure

class Counter:
    def __init__(self): self.value = 0
    def increment(self): self.value += 1

c = protect(Counter())

@with_lock(c)
def safe_inc():
    c.increment()  # attribute read proxied

# log, warn, or raise (default)
configure(mode="log")

# true zero-overhead passthrough
configure(enabled=False)
How _rg_check() decides.
  • Same thread?
    Always safe. Re-entrant access by one thread can never race with itself.
  • Previous access locked?
    Safe. A lock-protected write commits state atomically — the next reader sees a consistent snapshot.
  • Both lockless + within window?
    RaceConditionError. Two unguarded threads touching the same object in rapid succession.
  • Far apart in time?
    Safe. Separated by more than race_window (default 10 ms), treated as sequential. (Note: ABA-style races may evade this heuristic unless Strict Mode is used).
_rg_check(mode) same thread? yes no safe ✓ last locked? yes no safe ✓ within window? no yes safe ✓ RACE ✗

Stop guessing.
Start detecting.

One import. One function call. Ship threading bugs to the grave.

$ pip install raceguard