Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Include/internal/pycore_dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ _PyDict_NotifyEvent(PyDict_WatchEvent event,
PyObject *value)
{
assert(Py_REFCNT((PyObject*)mp) > 0);
int watcher_bits = mp->_ma_watcher_tag & DICT_WATCHER_MASK;
uint64_t tag = FT_ATOMIC_LOAD_UINT64_RELAXED(mp->_ma_watcher_tag);
int watcher_bits = tag & DICT_WATCHER_MASK;
if (watcher_bits) {
RARE_EVENT_STAT_INC(watched_dict_modification);
_PyDict_SendEvent(watcher_bits, event, mp, key, value);
Expand Down Expand Up @@ -364,7 +365,8 @@ PyDictObject *_PyObject_MaterializeManagedDict_LockHeld(PyObject *);
static inline Py_ssize_t
_PyDict_UniqueId(PyDictObject *mp)
{
return (Py_ssize_t)(mp->_ma_watcher_tag >> DICT_UNIQUE_ID_SHIFT);
uint64_t tag = FT_ATOMIC_LOAD_UINT64_RELAXED(mp->_ma_watcher_tag);
return (Py_ssize_t)(tag >> DICT_UNIQUE_ID_SHIFT);
}

static inline void
Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_dict_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ extern "C" {

struct _Py_dict_state {
uint32_t next_keys_version;
PyMutex watcher_mutex; // Protects the watchers array (free-threaded builds)
_PyOnceFlag watcher_setup_once; // One-time optimizer watcher setup
PyDict_WatchCallback watchers[DICT_MAX_WATCHERS];
};

Expand Down
14 changes: 14 additions & 0 deletions Include/internal/pycore_pyatomic_ft_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ extern "C" {
_Py_atomic_load_uint16_relaxed(&value)
#define FT_ATOMIC_LOAD_UINT32_RELAXED(value) \
_Py_atomic_load_uint32_relaxed(&value)
#define FT_ATOMIC_LOAD_UINT64_RELAXED(value) \
_Py_atomic_load_uint64_relaxed(&value)
#define FT_ATOMIC_LOAD_ULONG_RELAXED(value) \
_Py_atomic_load_ulong_relaxed(&value)
#define FT_ATOMIC_STORE_PTR_RELAXED(value, new_value) \
Expand Down Expand Up @@ -125,7 +127,14 @@ extern "C" {
_Py_atomic_load_ullong_relaxed(&value)
#define FT_ATOMIC_ADD_SSIZE(value, new_value) \
(void)_Py_atomic_add_ssize(&value, new_value)
#define FT_ATOMIC_ADD_UINT64(value, new_value) \
(void)_Py_atomic_add_uint64(&value, new_value)
#define FT_ATOMIC_OR_UINT64(value, new_value) \
(void)_Py_atomic_or_uint64(&value, new_value)
#define FT_ATOMIC_AND_UINT64(value, new_value) \
(void)_Py_atomic_and_uint64(&value, new_value)
#define FT_MUTEX_LOCK(lock) PyMutex_Lock(lock)
#define FT_MUTEX_LOCK_FLAGS(lock, flags) PyMutex_LockFlags(lock, flags)
#define FT_MUTEX_UNLOCK(lock) PyMutex_Unlock(lock)

#else
Expand All @@ -144,6 +153,7 @@ extern "C" {
#define FT_ATOMIC_LOAD_UINT8_RELAXED(value) value
#define FT_ATOMIC_LOAD_UINT16_RELAXED(value) value
#define FT_ATOMIC_LOAD_UINT32_RELAXED(value) value
#define FT_ATOMIC_LOAD_UINT64_RELAXED(value) value
#define FT_ATOMIC_LOAD_ULONG_RELAXED(value) value
#define FT_ATOMIC_STORE_PTR_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_STORE_PTR_RELEASE(value, new_value) value = new_value
Expand Down Expand Up @@ -182,7 +192,11 @@ extern "C" {
#define FT_ATOMIC_LOAD_ULLONG_RELAXED(value) value
#define FT_ATOMIC_STORE_ULLONG_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_ADD_SSIZE(value, new_value) (void)(value += new_value)
#define FT_ATOMIC_ADD_UINT64(value, new_value) (void)(value += new_value)
#define FT_ATOMIC_OR_UINT64(value, new_value) (void)(value |= new_value)
#define FT_ATOMIC_AND_UINT64(value, new_value) (void)(value &= new_value)
#define FT_MUTEX_LOCK(lock) do {} while (0)
#define FT_MUTEX_LOCK_FLAGS(lock, flags) do {} while (0)
#define FT_MUTEX_UNLOCK(lock) do {} while (0)

#endif
Expand Down
89 changes: 89 additions & 0 deletions Lib/test/test_free_threading/test_dict_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import unittest

from test.support import import_helper, threading_helper

_testcapi = import_helper.import_module("_testcapi")

ITERS = 100
NTHREADS = 20


@threading_helper.requires_working_threading()
class TestDictWatcherThreadSafety(unittest.TestCase):
# Watcher kinds from _testcapi
EVENTS = 0 # appends dict events as strings to global event list

def test_concurrent_add_clear_watchers(self):
"""Race AddWatcher and ClearWatcher from multiple threads.

Uses more threads than available watcher slots (5 user slots out
of DICT_MAX_WATCHERS=8).
"""
results = []

def worker():
for _ in range(ITERS):
try:
wid = _testcapi.add_dict_watcher(self.EVENTS)
except RuntimeError:
continue # All slots taken
self.assertGreaterEqual(wid, 0)
results.append(wid)
_testcapi.clear_dict_watcher(wid)

threading_helper.run_concurrently(worker, NTHREADS)

# Verify at least some watchers were successfully added
self.assertGreater(len(results), 0)

def test_concurrent_watch_unwatch(self):
"""Race Watch and Unwatch on the same dict from multiple threads."""
wid = _testcapi.add_dict_watcher(self.EVENTS)
dicts = [{} for _ in range(10)]

def worker():
for _ in range(ITERS):
for d in dicts:
_testcapi.watch_dict(wid, d)
for d in dicts:
_testcapi.unwatch_dict(wid, d)

try:
threading_helper.run_concurrently(worker, NTHREADS)

# Verify watching still works after concurrent watch/unwatch
_testcapi.watch_dict(wid, dicts[0])
dicts[0]["key"] = "value"
events = _testcapi.get_dict_watcher_events()
self.assertIn("new:key:value", events)
finally:
_testcapi.clear_dict_watcher(wid)

def test_concurrent_modify_watched_dict(self):
"""Race dict mutations (triggering callbacks) with watch/unwatch."""
wid = _testcapi.add_dict_watcher(self.EVENTS)
d = {}
_testcapi.watch_dict(wid, d)

def mutator():
for i in range(ITERS):
d[f"key_{i}"] = i
d.pop(f"key_{i}", None)

def toggler():
for i in range(ITERS):
_testcapi.watch_dict(wid, d)
d[f"toggler_{i}"] = i
_testcapi.unwatch_dict(wid, d)

workers = [mutator, toggler] * (NTHREADS // 2)
try:
threading_helper.run_concurrently(workers)
events = _testcapi.get_dict_watcher_events()
self.assertGreater(len(events), 0)
finally:
_testcapi.clear_dict_watcher(wid)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Made :c:func:`PyDict_AddWatcher`, :c:func:`PyDict_ClearWatcher`,
:c:func:`PyDict_Watch`, and :c:func:`PyDict_Unwatch` thread-safe on the
:term:`free threaded <free threading>` build.
26 changes: 15 additions & 11 deletions Modules/_testcapi/watchers.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "pycore_function.h" // FUNC_MAX_WATCHERS
#include "pycore_interp_structs.h" // CODE_MAX_WATCHERS
#include "pycore_context.h" // CONTEXT_MAX_WATCHERS
#include "pycore_lock.h" // _PyOnceFlag

/*[clinic input]
module _testcapi
Expand All @@ -18,6 +19,14 @@ module _testcapi
// Test dict watching
static PyObject *g_dict_watch_events = NULL;
static int g_dict_watchers_installed = 0;
static _PyOnceFlag g_dict_watch_once = {0};

static int
_init_dict_watch_events(void *arg)
{
g_dict_watch_events = PyList_New(0);
return g_dict_watch_events ? 0 : -1;
}

static int
dict_watch_callback(PyDict_WatchEvent event,
Expand Down Expand Up @@ -106,13 +115,10 @@ add_dict_watcher(PyObject *self, PyObject *kind)
if (watcher_id < 0) {
return NULL;
}
if (!g_dict_watchers_installed) {
assert(!g_dict_watch_events);
if (!(g_dict_watch_events = PyList_New(0))) {
return NULL;
}
if (_PyOnceFlag_CallOnce(&g_dict_watch_once, _init_dict_watch_events, NULL) < 0) {
return NULL;
}
g_dict_watchers_installed++;
_Py_atomic_add_int(&g_dict_watchers_installed, 1);
return PyLong_FromLong(watcher_id);
}

Expand All @@ -122,10 +128,8 @@ clear_dict_watcher(PyObject *self, PyObject *watcher_id)
if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
return NULL;
}
g_dict_watchers_installed--;
if (!g_dict_watchers_installed) {
assert(g_dict_watch_events);
Py_CLEAR(g_dict_watch_events);
if (_Py_atomic_add_int(&g_dict_watchers_installed, -1) == 1) {
PyList_Clear(g_dict_watch_events);
}
Py_RETURN_NONE;
}
Expand Down Expand Up @@ -164,7 +168,7 @@ _testcapi_unwatch_dict_impl(PyObject *module, int watcher_id, PyObject *dict)
static PyObject *
get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
{
if (!g_dict_watch_events) {
if (_Py_atomic_load_int(&g_dict_watchers_installed) <= 0) {
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
return NULL;
}
Expand Down
42 changes: 31 additions & 11 deletions Objects/dictobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -7842,13 +7842,19 @@ validate_watcher_id(PyInterpreterState *interp, int watcher_id)
PyErr_Format(PyExc_ValueError, "Invalid dict watcher ID %d", watcher_id);
return -1;
}
if (!interp->dict_state.watchers[watcher_id]) {
PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_RELAXED(
interp->dict_state.watchers[watcher_id]);
if (cb == NULL) {
PyErr_Format(PyExc_ValueError, "No dict watcher set for ID %d", watcher_id);
return -1;
}
return 0;
}

// In free-threaded builds, Add/Clear serialize on watcher_mutex and publish
// callbacks with release stores. SendEvent reads them lock-free using
// acquire loads.

int
PyDict_Watch(int watcher_id, PyObject* dict)
{
Expand All @@ -7860,7 +7866,8 @@ PyDict_Watch(int watcher_id, PyObject* dict)
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
((PyDictObject*)dict)->_ma_watcher_tag |= (1LL << watcher_id);
FT_ATOMIC_OR_UINT64(((PyDictObject*)dict)->_ma_watcher_tag,
1ULL << watcher_id);
return 0;
}

Expand All @@ -7875,36 +7882,48 @@ PyDict_Unwatch(int watcher_id, PyObject* dict)
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
((PyDictObject*)dict)->_ma_watcher_tag &= ~(1LL << watcher_id);
FT_ATOMIC_AND_UINT64(((PyDictObject*)dict)->_ma_watcher_tag,
~(1ULL << watcher_id));
return 0;
}

int
PyDict_AddWatcher(PyDict_WatchCallback callback)
{
int watcher_id = -1;
PyInterpreterState *interp = _PyInterpreterState_GET();

FT_MUTEX_LOCK_FLAGS(&interp->dict_state.watcher_mutex,
_Py_LOCK_DONT_DETACH);
/* Some watchers are reserved for CPython, start at the first available one */
for (int i = FIRST_AVAILABLE_WATCHER; i < DICT_MAX_WATCHERS; i++) {
if (!interp->dict_state.watchers[i]) {
interp->dict_state.watchers[i] = callback;
return i;
FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[i], callback);
watcher_id = i;
goto done;
}
}

PyErr_SetString(PyExc_RuntimeError, "no more dict watcher IDs available");
return -1;
done:
FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
return watcher_id;
}

int
PyDict_ClearWatcher(int watcher_id)
{
int res = 0;
PyInterpreterState *interp = _PyInterpreterState_GET();
FT_MUTEX_LOCK_FLAGS(&interp->dict_state.watcher_mutex,
_Py_LOCK_DONT_DETACH);
if (validate_watcher_id(interp, watcher_id)) {
return -1;
res = -1;
goto done;
}
interp->dict_state.watchers[watcher_id] = NULL;
return 0;
FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[watcher_id], NULL);
done:
FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
return res;
}

static const char *
Expand All @@ -7929,7 +7948,8 @@ _PyDict_SendEvent(int watcher_bits,
PyInterpreterState *interp = _PyInterpreterState_GET();
for (int i = 0; i < DICT_MAX_WATCHERS; i++) {
if (watcher_bits & 1) {
PyDict_WatchCallback cb = interp->dict_state.watchers[i];
PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_ACQUIRE(
interp->dict_state.watchers[i]);
if (cb && (cb(event, (PyObject*)mp, key, value) < 0)) {
// We don't want to resurrect the dict by potentially having an
// unraisablehook keep a reference to it, so we don't pass the
Expand Down
23 changes: 17 additions & 6 deletions Python/optimizer_analysis.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "pycore_opcode_metadata.h"
#include "pycore_opcode_utils.h"
#include "pycore_pystate.h" // _PyInterpreterState_GET()
#include "pycore_pyatomic_ft_wrappers.h" // FT_MUTEX_LOCK/UNLOCK
#include "pycore_tstate.h" // _PyThreadStateImpl
#include "pycore_uop_metadata.h"
#include "pycore_long.h"
Expand Down Expand Up @@ -117,14 +118,15 @@ static int
get_mutations(PyObject* dict) {
assert(PyDict_CheckExact(dict));
PyDictObject *d = (PyDictObject *)dict;
return (d->_ma_watcher_tag >> DICT_MAX_WATCHERS) & ((1 << DICT_WATCHED_MUTATION_BITS)-1);
uint64_t tag = FT_ATOMIC_LOAD_UINT64_RELAXED(d->_ma_watcher_tag);
return (tag >> DICT_MAX_WATCHERS) & ((1 << DICT_WATCHED_MUTATION_BITS) - 1);
}

static void
increment_mutations(PyObject* dict) {
assert(PyDict_CheckExact(dict));
PyDictObject *d = (PyDictObject *)dict;
d->_ma_watcher_tag += (1 << DICT_MAX_WATCHERS);
FT_ATOMIC_ADD_UINT64(d->_ma_watcher_tag, 1ULL << DICT_MAX_WATCHERS);
}

/* The first two dict watcher IDs are reserved for CPython,
Expand Down Expand Up @@ -153,6 +155,17 @@ type_watcher_callback(PyTypeObject* type)
return 0;
}

static int
_setup_optimizer_watchers(void *Py_UNUSED(arg))
{
PyInterpreterState *interp = _PyInterpreterState_GET();
FT_ATOMIC_STORE_PTR_RELEASE(
interp->dict_state.watchers[GLOBALS_WATCHER_ID],
globals_watcher_callback);
interp->type_watchers[TYPE_WATCHER_ID] = type_watcher_callback;
return 0;
}

static PyObject *
convert_global_to_const(_PyUOpInstruction *inst, PyObject *obj, bool pop, bool insert)
{
Expand Down Expand Up @@ -467,10 +480,8 @@ optimize_uops(

// Make sure that watchers are set up
PyInterpreterState *interp = _PyInterpreterState_GET();
if (interp->dict_state.watchers[GLOBALS_WATCHER_ID] == NULL) {
interp->dict_state.watchers[GLOBALS_WATCHER_ID] = globals_watcher_callback;
interp->type_watchers[TYPE_WATCHER_ID] = type_watcher_callback;
}
_PyOnceFlag_CallOnce(&interp->dict_state.watcher_setup_once,
_setup_optimizer_watchers, NULL);

_Py_uop_abstractcontext_init(ctx, dependencies);
_Py_UOpsAbstractFrame *frame = _Py_uop_frame_new(ctx, (PyCodeObject *)func->func_code, NULL, 0);
Expand Down
1 change: 1 addition & 0 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ _Py_COMP_DIAG_POP
&(runtime)->allocators.mutex, \
&(runtime)->_main_interpreter.types.mutex, \
&(runtime)->_main_interpreter.code_state.mutex, \
&(runtime)->_main_interpreter.dict_state.watcher_mutex, \
}

static void
Expand Down
1 change: 1 addition & 0 deletions Tools/c-analyzer/cpython/ignored.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ Modules/_testcapi/object.c - MyObject_dealloc_called -
Modules/_testcapi/object.c - MyType -
Modules/_testcapi/structmember.c - test_structmembersType_OldAPI -
Modules/_testcapi/watchers.c - g_dict_watch_events -
Modules/_testcapi/watchers.c - g_dict_watch_once -
Modules/_testcapi/watchers.c - g_dict_watchers_installed -
Modules/_testcapi/watchers.c - g_type_modified_events -
Modules/_testcapi/watchers.c - g_type_watchers_installed -
Expand Down
Loading