From 7cdb3fb9f2060bd3fa1663395dc3fa96c9a1497f Mon Sep 17 00:00:00 2001 From: James M Snell Date: Sun, 29 Mar 2026 08:17:34 -0700 Subject: [PATCH] src: implement a prototype for uv coroutines Signed-off-by: James M Snell Assisted-by: Opencode/Open 4.6 --- node.gyp | 4 + src/api/callback.cc | 23 +- src/async_wrap.h | 24 +- src/coro/README.md | 251 ++++++++++++ src/coro/uv_awaitable.h | 375 +++++++++++++++++ src/coro/uv_promise.h | 87 ++++ src/coro/uv_task.h | 210 ++++++++++ src/coro/uv_tracked_task.h | 475 ++++++++++++++++++++++ src/env.cc | 155 ++++--- src/env.h | 47 ++- src/node_file.cc | 145 +++++++ test/cctest/test_coro.cc | 408 +++++++++++++++++++ test/parallel/test-fs-coro-async-hooks.js | 237 +++++++++++ test/parallel/test-fs-coro.js | 102 +++++ 14 files changed, 2422 insertions(+), 121 deletions(-) create mode 100644 src/coro/README.md create mode 100644 src/coro/uv_awaitable.h create mode 100644 src/coro/uv_promise.h create mode 100644 src/coro/uv_task.h create mode 100644 src/coro/uv_tracked_task.h create mode 100644 test/cctest/test_coro.cc create mode 100644 test/parallel/test-fs-coro-async-hooks.js create mode 100644 test/parallel/test-fs-coro.js diff --git a/node.gyp b/node.gyp index bd77943b105173..5c661f3c8015ff 100644 --- a/node.gyp +++ b/node.gyp @@ -219,6 +219,10 @@ 'src/compile_cache.h', 'src/connect_wrap.h', 'src/connection_wrap.h', + 'src/coro/uv_task.h', + 'src/coro/uv_tracked_task.h', + 'src/coro/uv_awaitable.h', + 'src/coro/uv_promise.h', 'src/cppgc_helpers.h', 'src/cppgc_helpers.cc', 'src/dataqueue/queue.h', diff --git a/src/api/callback.cc b/src/api/callback.cc index 0f458f470501c5..f1eb1c7d5be981 100644 --- a/src/api/callback.cc +++ b/src/api/callback.cc @@ -22,7 +22,7 @@ using v8::Value; CallbackScope::CallbackScope(Isolate* isolate, Local object, async_context async_context) - : CallbackScope(Environment::GetCurrent(isolate), object, async_context) {} + : CallbackScope(Environment::GetCurrent(isolate), object, async_context) {} CallbackScope::CallbackScope(Environment* env, Local object, @@ -52,8 +52,7 @@ CallbackScope::CallbackScope(Environment* env, } CallbackScope::~CallbackScope() { - if (try_catch_.HasCaught()) - private_->MarkAsFailed(); + if (try_catch_.HasCaught()) private_->MarkAsFailed(); delete private_; } @@ -86,7 +85,15 @@ InternalCallbackScope::InternalCallbackScope( } else { object = std::get*>(object_arg); } - std::visit([](auto* ptr) { CHECK_NOT_NULL(ptr); }, object); + // Global* may be null when no resource object was created + // (e.g., coroutine tasks when async_hooks are not active). + // push_async_context already handles the null case by skipping the + // native_execution_async_resources_ store. + if (auto* gptr = std::get_if*>(&object)) { + CHECK_IMPLIES(*gptr != nullptr, !(*gptr)->IsEmpty()); + } else { + std::visit([](auto* ptr) { CHECK_NOT_NULL(ptr); }, object); + } env->PushAsyncCallbackScope(); @@ -217,8 +224,7 @@ MaybeLocal InternalMakeCallback(Environment* env, Local context_frame) { CHECK(!recv.IsEmpty()); #ifdef DEBUG - for (int i = 0; i < argc; i++) - CHECK(!argv[i].IsEmpty()); + for (int i = 0; i < argc; i++) CHECK(!argv[i].IsEmpty()); #endif Local hook_cb = env->async_hooks_callback_trampoline(); @@ -231,8 +237,9 @@ MaybeLocal InternalMakeCallback(Environment* env, flags = InternalCallbackScope::kSkipAsyncHooks; use_async_hooks_trampoline = async_hooks->fields()[AsyncHooks::kBefore] + - async_hooks->fields()[AsyncHooks::kAfter] + - async_hooks->fields()[AsyncHooks::kUsesExecutionAsyncResource] > 0; + async_hooks->fields()[AsyncHooks::kAfter] + + async_hooks->fields()[AsyncHooks::kUsesExecutionAsyncResource] > + 0; } InternalCallbackScope scope( diff --git a/src/async_wrap.h b/src/async_wrap.h index e4884cb88301d4..6a09878157abc6 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -39,6 +39,7 @@ namespace node { V(FILEHANDLE) \ V(FILEHANDLECLOSEREQ) \ V(BLOBREADER) \ + V(COROREADFILE) \ V(FSEVENTWRAP) \ V(FSREQCALLBACK) \ V(FSREQPROMISE) \ @@ -124,11 +125,10 @@ class AsyncWrap : public BaseObject { }; enum ProviderType { -#define V(PROVIDER) \ - PROVIDER_ ## PROVIDER, +#define V(PROVIDER) PROVIDER_##PROVIDER, NODE_ASYNC_PROVIDER_TYPES(V) #undef V - PROVIDERS_LENGTH, + PROVIDERS_LENGTH, }; AsyncWrap(Environment* env, @@ -171,9 +171,9 @@ class AsyncWrap : public BaseObject { const v8::FunctionCallbackInfo& args); static void GetProviderType(const v8::FunctionCallbackInfo& args); static void QueueDestroyAsyncId( - const v8::FunctionCallbackInfo& args); + const v8::FunctionCallbackInfo& args); static void SetCallbackTrampoline( - const v8::FunctionCallbackInfo& args); + const v8::FunctionCallbackInfo& args); static void EmitAsyncInit(Environment* env, v8::Local object, @@ -212,22 +212,16 @@ class AsyncWrap : public BaseObject { int argc, v8::Local* argv); inline v8::MaybeLocal MakeCallback( - const v8::Local symbol, - int argc, - v8::Local* argv); + const v8::Local symbol, int argc, v8::Local* argv); inline v8::MaybeLocal MakeCallback( - const v8::Local symbol, - int argc, - v8::Local* argv); + const v8::Local symbol, int argc, v8::Local* argv); inline v8::MaybeLocal MakeCallback( - const v8::Local symbol, - int argc, - v8::Local* argv); + const v8::Local symbol, int argc, v8::Local* argv); virtual std::string diagnostic_name() const; const char* MemoryInfoName() const override; - static void WeakCallback(const v8::WeakCallbackInfo &info); + static void WeakCallback(const v8::WeakCallbackInfo& info); // Returns the object that 'owns' an async wrap. For example, for a // TCP connection handle, this is the corresponding net.Socket. diff --git a/src/coro/README.md b/src/coro/README.md new file mode 100644 index 00000000000000..bd4d4271c49a35 --- /dev/null +++ b/src/coro/README.md @@ -0,0 +1,251 @@ +# C++20 Coroutine support for libuv + +This directory contains an experimental C++20 coroutine layer for writing +asynchronous libuv operations as sequential C++ code using `co_await`. + +The primary goal is to allow multi-step async operations (such as +open + stat + read + close) to be written as straight-line C++ instead of +callback chains, while maintaining full integration with Node.js async\_hooks, +AsyncLocalStorage, microtask draining, and environment lifecycle management. + +## File overview + +* `uv_task.h` -- `UvTask`: The lightweight, untracked coroutine return type. + No V8 or Node.js dependencies. Suitable for internal C++ coroutines that do + not need async\_hooks visibility or task queue draining. + +* `uv_tracked_task.h` -- `UvTrackedTask`: The fully-integrated + coroutine return type. Each resume-to-suspend segment is wrapped in an + `InternalCallbackScope`, giving it the same semantics as any other callback + entry into Node.js. The `Provider` template parameter is an + `AsyncWrap::ProviderType` enum value that identifies the async resource type + visible to `async_hooks.createHook()` and trace events, using the same + type registry as the rest of Node.js. + +* `uv_awaitable.h` -- Awaitable wrappers for libuv async operations: + `UvFsAwaitable` (fs operations), `UvFsStatAwaitable` (stat-family), + `UvWorkAwaitable` (thread pool work), and `UvGetAddrInfoAwaitable` + (DNS resolution). Each embeds the libuv request struct directly in the + coroutine frame, avoiding separate heap allocations. Each also exposes a + `cancelable_req()` method returning the underlying `uv_req_t*` for + cancellation support during environment teardown. + +* `uv_promise.h` -- Helpers for bridging coroutines to JavaScript Promises: + `MakePromise()`, `ResolvePromise()`, `RejectPromiseWithUVError()`. The + resolve and reject helpers guard against calling V8 APIs when the + environment is shutting down (`can_call_into_js()` check). + +## Usage + +### Basic pattern (binding function) + +```cpp +// The coroutine. The return type carries the provider type as +// a compile-time template argument. +static coro::UvTrackedTask +DoAccessImpl( + Environment* env, + v8::Global resolver, + std::string path, + int mode) { + ssize_t result = co_await coro::UvFs( + env->event_loop(), uv_fs_access, path.c_str(), mode); + if (result < 0) + coro::RejectPromiseWithUVError(env, resolver, result, "access", + path.c_str()); + else + coro::ResolvePromiseUndefined(env, resolver); +} + +// The binding entry point (called from JavaScript). +static void Access(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + // ... parse args, check permissions ... + + auto resolver = coro::MakePromise(env, args); + auto task = DoAccessImpl(env, std::move(resolver), path, mode); + task.InitTracking(env); // assigns async_id, captures context, emits init + task.Start(); // begins execution (fire-and-forget) +} +``` + +### Multi-step operations + +Multiple libuv calls within a single coroutine are sequential co\_await +expressions. The intermediate steps (between two co\_await points) are pure C++ +with no V8 overhead: + +```cpp +static coro::UvTrackedTask +ReadFileImpl( + Environment* env, + v8::Global resolver, + std::string path) { + ssize_t fd = co_await coro::UvFs( + env->event_loop(), uv_fs_open, path.c_str(), O_RDONLY, 0); + if (fd < 0) { /* reject and co_return */ } + + auto [err, stat] = co_await coro::UvFsStat( + env->event_loop(), uv_fs_fstat, static_cast(fd)); + // ... read, close, resolve ... +} +``` + +### Coroutine composition + +`UvTask` and `UvTrackedTask` can be co\_awaited from other +coroutines. This allows factoring common operations into reusable helpers: + +```cpp +UvTask OpenFile(uv_loop_t* loop, const char* path, int flags) { + co_return co_await UvFs(loop, uv_fs_open, path, flags, 0); +} + +UvTrackedTask +OuterCoroutine(Environment* env, ...) { + ssize_t fd = co_await OpenFile(env->event_loop(), path, O_RDONLY); + // ... +} +``` + +## Lifecycle + +### UvTask (untracked) + +`UvTask` uses lazy initialization. The coroutine does not run until it is +either co\_awaited from another coroutine (symmetric transfer) or explicitly +started with `Start()`. When `Start()` is called, the coroutine runs until its +first `co_await`, then control returns to the caller. The coroutine frame +self-destructs when the coroutine completes. + +### UvTrackedTask (tracked) + +`UvTrackedTask` follows the same lazy/fire-and-forget pattern +but adds three phases around `Start()`: + +1. **Creation**: The coroutine frame is allocated from the thread-local + free-list (see "Frame allocator" below). The coroutine is suspended at + `initial_suspend` (lazy). + +2. **`InitTracking(env)`**: Assigns an `async_id`, captures the current + `async_context_frame` (for AsyncLocalStorage propagation), emits a trace + event using the provider name from the `ProviderType` enum, and registers + in the Environment's coroutine task list for cancellation during teardown. + If async\_hooks listeners are active (`kInit > 0` or + `kUsesExecutionAsyncResource > 0`), a resource object is created for + `executionAsyncResource()` and the `init` hook is emitted. The type name + V8 string comes from `IsolateData::async_wrap_providers_`, which is + pre-cached at Isolate startup with zero per-coroutine allocation cost. + +3. **`Start()`**: Marks the task as detached (fire-and-forget) and resumes + the coroutine. Each resume-to-suspend segment is wrapped in an + `InternalCallbackScope` that provides: + * async\_hooks `before`/`after` events + * `async_context_frame` save/restore (AsyncLocalStorage) + * Microtask and `process.nextTick` draining on close + * `request_waiting_` counter management for event loop liveness + +4. **Completion**: At `final_suspend`, the last `InternalCallbackScope` is + closed (draining task queues), the async\_hooks `destroy` event is emitted, + the task is unregistered from the Environment, and the coroutine frame is + returned to the thread-local free-list. If a detached coroutine has a + captured C++ exception that was never observed, `std::terminate()` is + called rather than silently discarding it. + +## How the awaitable dispatch works + +The `UvFs()` factory function returns a `UvFsAwaitable` that embeds a `uv_fs_t` +directly in the coroutine frame. When the coroutine hits `co_await`: + +1. `await_transform()` on the promise wraps it in a `TrackedAwaitable`. +2. `TrackedAwaitable::await_suspend()`: + * Closes the current `InternalCallbackScope` (drains microtasks/nextTick). + * Records the `uv_req_t*` for cancellation support (via `cancelable_req()`). + * Increments `request_waiting_` (event loop liveness). + * Calls the inner `await_suspend()`, which dispatches the libuv call with + `req_.data = this` pointing back to the awaitable. +3. The coroutine is suspended. Control returns to the event loop. +4. When the libuv operation completes, `OnComplete()` calls + `handle_.resume()` to resume the coroutine. +5. `TrackedAwaitable::await_resume()`: + * Decrements `request_waiting_`. + * Clears the cancellation pointer. + * Opens a new `InternalCallbackScope` for the next segment. + * Returns the result (e.g., `req_.result` for fs operations). + +The liveness counter and cancellation tracking are conditional on the inner +awaitable having a `cancelable_req()` method (checked at compile time via a +`requires` expression). When co\_awaiting another `UvTask` or `UvTrackedTask` +(coroutine composition), these steps are skipped. + +## Environment teardown + +During `Environment::CleanupHandles()`, the coroutine task list is iterated and +`Cancel()` is called on each active task. This calls `uv_cancel()` on the +in-flight libuv request (if any), which causes the libuv callback to fire with +`UV_ECANCELED`. The coroutine resumes, sees the error, and completes normally. +The `request_waiting_` counter ensures the teardown loop waits for all +coroutine I/O to finish before destroying the Environment. + +## Frame allocator + +Coroutine frames are allocated from a thread-local free-list rather than going +through `malloc`/`free` on every creation and destruction. This is implemented +via `promise_type::operator new` and `operator delete` in `TrackedPromiseBase`, +which route through `CoroFrameAlloc()` and `CoroFrameFree()`. + +The free-list uses size-class buckets with 256-byte granularity, covering +frames up to 4096 bytes (which covers typical coroutine frames). Frames larger +than 4096 bytes fall through to the global `operator new`. Since all coroutines +run on the event loop thread, the free-list requires no locking. + +Each bucket has a high-water mark of 32 cached frames. When a frame is freed +and its bucket is already at capacity, the frame is returned directly to the +system allocator instead of being cached. This bounds the retained memory +per bucket to at most 32 \* bucket\_size bytes (e.g., 32 \* 1024 = 32KB for the +1024-byte size class), preventing unbounded growth after a burst of concurrent +coroutines. + +After the first coroutine of a given size class completes, subsequent +coroutines of the same size class are allocated from the free-list with zero +`malloc` overhead. + +## Allocation comparison with ReqWrap + +For a single async operation (e.g., `fsPromises.access`): + +| | ReqWrap pattern | Coroutine (no hooks) | Coroutine (hooks active) | +| -------------------- | --------------- | -------------------- | ------------------------ | +| C++ heap allocations | 3 | 0 (free-list hit) | 0 (free-list hit) | +| V8 heap objects | 7 | 2 (resolver+promise) | 3 (+ resource object) | +| Total allocations | 10 | 2 | 3 | + +For a multi-step operation (open + stat + read + close): + +| | 4x ReqWrap | Single coroutine (no hooks) | Single coroutine (hooks active) | +| ----------------------------- | ---------- | --------------------------- | ------------------------------- | +| C++ heap allocations | 12 | 0 (free-list hit) | 0 (free-list hit) | +| V8 heap objects | 28 | 2 | 3 | +| Total allocations | 40 | 2 | 3 | +| InternalCallbackScope entries | 4 | 5 (one per segment) | 5 | + +The coroutine frame embeds the `uv_fs_t` (\~440 bytes) directly. The compiler +may overlay non-simultaneously-live awaitables in the frame, so a multi-step +coroutine does not necessarily pay N times the `uv_fs_t` cost. + +## Known limitations + +* **Heap snapshot visibility**: The coroutine frame is not visible to V8 heap + snapshots or `MemoryRetainer`. The thread-local free-list allocator reduces + malloc pressure but does not provide V8 with per-frame memory accounting. + The exact frame contents are not inspectable from heap snapshot tooling. + +* **Snapshot serialization**: `UvTrackedTask` holds `v8::Global` handles that + cannot be serialized into a startup snapshot. There is currently no safety + check to prevent snapshotting while coroutines are active. In practice this + is not a problem because snapshots are taken at startup before I/O begins. + +* **Free-list retention**: The thread-local free-list retains up to 32 frames + per size class bucket after a burst of concurrent coroutines. These frames + are held until reused or the thread exits. The bound is configurable via + `kMaxCachedPerBucket`. diff --git a/src/coro/uv_awaitable.h b/src/coro/uv_awaitable.h new file mode 100644 index 00000000000000..ddb673bcf04c6e --- /dev/null +++ b/src/coro/uv_awaitable.h @@ -0,0 +1,375 @@ +#ifndef SRC_CORO_UV_AWAITABLE_H_ +#define SRC_CORO_UV_AWAITABLE_H_ + +#include +#include +#include +#include +#include +#include + +#include "uv.h" + +namespace node { +namespace coro { + +// --------------------------------------------------------------------------- +// UvFsAwaitable — Awaitable that wraps a single uv_fs_* async operation. +// +// Created via the UvFs() factory function. Usage: +// +// ssize_t fd = co_await UvFs(loop, uv_fs_open, path, O_RDONLY, 0644); +// if (fd < 0) { /* handle error: fd is the negative uv_errno_t */ } +// +// The uv_fs_t request struct is embedded directly in this object, which +// lives in the coroutine frame across the suspension point. This means +// no separate heap allocation is needed for the request. +// +// On await_resume(), uv_fs_req_cleanup() is called automatically. +// --------------------------------------------------------------------------- + +template +class UvFsAwaitable { + public: + UvFsAwaitable(uv_loop_t* loop, Fn fn, Args... args) + : loop_(loop), fn_(fn), args_(std::move(args)...) { + memset(&req_, 0, sizeof(req_)); + } + + ~UvFsAwaitable() { + // Safety net: if the coroutine is destroyed without resuming + // (e.g. cancellation), still clean up any libuv-internal state. + if (needs_cleanup_) uv_fs_req_cleanup(&req_); + } + + // Move is safe before dispatch (req_.data isn't set until await_suspend). + UvFsAwaitable(UvFsAwaitable&& o) noexcept + : loop_(o.loop_), + fn_(o.fn_), + args_(std::move(o.args_)), + req_(o.req_), + handle_(o.handle_), + needs_cleanup_(std::exchange(o.needs_cleanup_, false)) { + memset(&o.req_, 0, sizeof(o.req_)); + } + UvFsAwaitable(const UvFsAwaitable&) = delete; + UvFsAwaitable& operator=(const UvFsAwaitable&) = delete; + UvFsAwaitable& operator=(UvFsAwaitable&&) = delete; + + bool await_ready() noexcept { return false; } + + void await_suspend(std::coroutine_handle<> h) noexcept { + handle_ = h; + req_.data = this; + + // Dispatch: call uv_fs_*(loop, &req_, ...args..., OnComplete) + int err = std::apply( + [this](auto&&... a) { + return fn_(loop_, &req_, std::forward(a)..., OnComplete); + }, + args_); + + if (err < 0) { + // Synchronous dispatch failure — store the error in req_.result + // so await_resume sees it, and resume immediately. + req_.result = err; + needs_cleanup_ = false; // nothing was dispatched + h.resume(); + } else { + needs_cleanup_ = true; + } + } + + // Returns req->result: the fd for open, byte count for read/write, + // 0 for success, or a negative uv_errno_t on error. + ssize_t await_resume() noexcept { + ssize_t result = req_.result; + uv_fs_req_cleanup(&req_); + needs_cleanup_ = false; + return result; + } + + // Access the raw request (useful before cleanup for stat, readdir, etc.) + const uv_fs_t& req() const { return req_; } + + // Returns a pointer to the underlying uv_req_t for cancellation support. + // Used by TrackedAwaitable to register with the request counter and + // cancellation infrastructure. + uv_req_t* cancelable_req() { return reinterpret_cast(&req_); } + + private: + static void OnComplete(uv_fs_t* req) { + auto* self = static_cast(req->data); + self->handle_.resume(); + } + + uv_loop_t* loop_; + Fn fn_; + std::tuple args_; + uv_fs_t req_; + std::coroutine_handle<> handle_; + bool needs_cleanup_ = false; +}; + +// --------------------------------------------------------------------------- +// UvFs() — Factory function for creating a UvFsAwaitable. +// +// Deduces the libuv function signature and argument types automatically. +// The callback parameter is NOT passed — it's injected by the awaitable. +// +// Examples: +// co_await UvFs(loop, uv_fs_open, "/tmp/test", O_RDONLY, 0644); +// co_await UvFs(loop, uv_fs_close, fd); +// co_await UvFs(loop, uv_fs_stat, "/tmp/test"); +// co_await UvFs(loop, uv_fs_read, fd, &iov, 1, offset); +// --------------------------------------------------------------------------- + +template +auto UvFs(uv_loop_t* loop, Fn fn, Args... args) { + return UvFsAwaitable(loop, fn, std::move(args)...); +} + +// --------------------------------------------------------------------------- +// UvFsStatAwaitable — Like UvFsAwaitable but await_resume returns the +// uv_stat_t instead of the raw result code. Used for stat/fstat/lstat. +// +// Usage: +// auto [err, stat] = co_await UvFsStat(loop, uv_fs_stat, path); +// --------------------------------------------------------------------------- + +template +class UvFsStatAwaitable { + public: + UvFsStatAwaitable(uv_loop_t* loop, Fn fn, Args... args) + : loop_(loop), fn_(fn), args_(std::move(args)...) { + memset(&req_, 0, sizeof(req_)); + } + + ~UvFsStatAwaitable() { + if (needs_cleanup_) uv_fs_req_cleanup(&req_); + } + + UvFsStatAwaitable(UvFsStatAwaitable&& o) noexcept + : loop_(o.loop_), + fn_(o.fn_), + args_(std::move(o.args_)), + req_(o.req_), + handle_(o.handle_), + needs_cleanup_(std::exchange(o.needs_cleanup_, false)) { + memset(&o.req_, 0, sizeof(o.req_)); + } + UvFsStatAwaitable(const UvFsStatAwaitable&) = delete; + UvFsStatAwaitable& operator=(const UvFsStatAwaitable&) = delete; + UvFsStatAwaitable& operator=(UvFsStatAwaitable&&) = delete; + + bool await_ready() noexcept { return false; } + + void await_suspend(std::coroutine_handle<> h) noexcept { + handle_ = h; + req_.data = this; + + int err = std::apply( + [this](auto&&... a) { + return fn_(loop_, &req_, std::forward(a)..., OnComplete); + }, + args_); + + if (err < 0) { + req_.result = err; + needs_cleanup_ = false; + h.resume(); + } else { + needs_cleanup_ = true; + } + } + + struct StatResult { + int error; // 0 on success, negative uv_errno_t on failure + uv_stat_t stat; // valid only when error == 0 + }; + + StatResult await_resume() noexcept { + StatResult r{}; + if (req_.result < 0) { + r.error = static_cast(req_.result); + } else { + r.error = 0; + r.stat = req_.statbuf; + } + uv_fs_req_cleanup(&req_); + needs_cleanup_ = false; + return r; + } + + uv_req_t* cancelable_req() { return reinterpret_cast(&req_); } + + private: + static void OnComplete(uv_fs_t* req) { + auto* self = static_cast(req->data); + self->handle_.resume(); + } + + uv_loop_t* loop_; + Fn fn_; + std::tuple args_; + uv_fs_t req_; + std::coroutine_handle<> handle_; + bool needs_cleanup_ = false; +}; + +template +auto UvFsStat(uv_loop_t* loop, Fn fn, Args... args) { + return UvFsStatAwaitable(loop, fn, std::move(args)...); +} + +// --------------------------------------------------------------------------- +// UvWork — Awaitable that wraps uv_queue_work. +// +// Runs a callable on the libuv thread pool. The coroutine suspends +// until the work is complete, then resumes on the event loop thread. +// +// Usage: +// int status = co_await UvWork(loop, [&]() { +// // expensive computation on thread pool +// }); +// --------------------------------------------------------------------------- + +template +class UvWorkAwaitable { + public: + UvWorkAwaitable(uv_loop_t* loop, WorkFn work_fn) + : loop_(loop), work_fn_(std::move(work_fn)) { + memset(&req_, 0, sizeof(req_)); + } + + ~UvWorkAwaitable() = default; + + UvWorkAwaitable(const UvWorkAwaitable&) = delete; + UvWorkAwaitable& operator=(const UvWorkAwaitable&) = delete; + UvWorkAwaitable(UvWorkAwaitable&&) = delete; + UvWorkAwaitable& operator=(UvWorkAwaitable&&) = delete; + + bool await_ready() noexcept { return false; } + + void await_suspend(std::coroutine_handle<> h) noexcept { + handle_ = h; + req_.data = this; + + int err = uv_queue_work(loop_, &req_, OnWork, OnAfterWork); + if (err < 0) { + status_ = err; + h.resume(); + } + } + + // Returns 0 on success, UV_ECANCELED if cancelled, or negative error. + int await_resume() noexcept { return status_; } + + uv_req_t* cancelable_req() { return reinterpret_cast(&req_); } + + private: + static void OnWork(uv_work_t* req) { + auto* self = static_cast(req->data); + self->work_fn_(); + } + + static void OnAfterWork(uv_work_t* req, int status) { + auto* self = static_cast(req->data); + self->status_ = status; + self->handle_.resume(); + } + + uv_loop_t* loop_; + WorkFn work_fn_; + uv_work_t req_; + std::coroutine_handle<> handle_; + int status_ = 0; +}; + +template +auto UvWork(uv_loop_t* loop, WorkFn&& work_fn) { + return UvWorkAwaitable>(loop, + std::forward(work_fn)); +} + +// --------------------------------------------------------------------------- +// UvGetAddrInfo — Awaitable wrapper for uv_getaddrinfo. +// +// Usage: +// auto [status, info] = co_await UvGetAddrInfo(loop, "example.com", "80", +// &hints); +// if (status == 0) { +// // use info... +// uv_freeaddrinfo(info); +// } +// --------------------------------------------------------------------------- + +class UvGetAddrInfoAwaitable { + public: + UvGetAddrInfoAwaitable(uv_loop_t* loop, + const char* node, + const char* service, + const struct addrinfo* hints) + : loop_(loop), node_(node), service_(service), hints_(hints) { + memset(&req_, 0, sizeof(req_)); + } + + ~UvGetAddrInfoAwaitable() = default; + + UvGetAddrInfoAwaitable(const UvGetAddrInfoAwaitable&) = delete; + UvGetAddrInfoAwaitable& operator=(const UvGetAddrInfoAwaitable&) = delete; + + bool await_ready() noexcept { return false; } + + void await_suspend(std::coroutine_handle<> h) noexcept { + handle_ = h; + req_.data = this; + + int err = uv_getaddrinfo(loop_, &req_, OnComplete, node_, service_, hints_); + if (err < 0) { + status_ = err; + addrinfo_ = nullptr; + h.resume(); + } + } + + struct AddrInfoResult { + int status; // 0 on success, negative uv_errno_t on error + struct addrinfo* addrinfo; // caller must uv_freeaddrinfo() on success + }; + + AddrInfoResult await_resume() noexcept { return {status_, addrinfo_}; } + + uv_req_t* cancelable_req() { return reinterpret_cast(&req_); } + + private: + static void OnComplete(uv_getaddrinfo_t* req, + int status, + struct addrinfo* res) { + auto* self = static_cast(req->data); + self->status_ = status; + self->addrinfo_ = res; + self->handle_.resume(); + } + + uv_loop_t* loop_; + const char* node_; + const char* service_; + const struct addrinfo* hints_; + uv_getaddrinfo_t req_; + std::coroutine_handle<> handle_; + int status_ = 0; + struct addrinfo* addrinfo_ = nullptr; +}; + +inline auto UvGetAddrInfo(uv_loop_t* loop, + const char* node, + const char* service, + const struct addrinfo* hints = nullptr) { + return UvGetAddrInfoAwaitable(loop, node, service, hints); +} + +} // namespace coro +} // namespace node + +#endif // SRC_CORO_UV_AWAITABLE_H_ diff --git a/src/coro/uv_promise.h b/src/coro/uv_promise.h new file mode 100644 index 00000000000000..159afc5be9374d --- /dev/null +++ b/src/coro/uv_promise.h @@ -0,0 +1,87 @@ +#ifndef SRC_CORO_UV_PROMISE_H_ +#define SRC_CORO_UV_PROMISE_H_ + +#include "env-inl.h" +#include "node_errors.h" +#include "v8.h" + +namespace node { +namespace coro { + +// --------------------------------------------------------------------------- +// ResolvePromise / RejectPromise — helpers for resolving/rejecting a +// v8::Global from inside a coroutine. +// +// When used inside a UvTrackedTask, the InternalCallbackScope from +// on_resume() already provides a HandleScope and will drain microtasks +// and nextTick on close (in on_suspend / final_suspend). +// +// These guard against calling V8 APIs when the environment is shutting +// down (can_call_into_js() is false). +// --------------------------------------------------------------------------- + +inline void ResolvePromise(Environment* env, + v8::Global& resolver, + v8::Local value) { + if (!env->can_call_into_js()) return; + v8::HandleScope scope(env->isolate()); + auto local = resolver.Get(env->isolate()); + USE(local->Resolve(env->context(), value)); +} + +inline void ResolvePromiseUndefined( + Environment* env, v8::Global& resolver) { + ResolvePromise(env, resolver, v8::Undefined(env->isolate())); +} + +inline void RejectPromiseWithUVError( + Environment* env, + v8::Global& resolver, + int uv_err, + const char* syscall, + const char* path = nullptr) { + if (!env->can_call_into_js()) return; + v8::HandleScope scope(env->isolate()); + auto local = resolver.Get(env->isolate()); + v8::Local exception = + UVException(env->isolate(), uv_err, syscall, nullptr, path); + USE(local->Reject(env->context(), exception)); +} + +// --------------------------------------------------------------------------- +// MakePromise — Creates a v8::Promise::Resolver, sets the return value +// on `args`, and returns a Global handle for use in the coroutine. +// --------------------------------------------------------------------------- + +inline v8::Global MakePromise( + Environment* env, const v8::FunctionCallbackInfo& args) { + auto resolver = v8::Promise::Resolver::New(env->context()).ToLocalChecked(); + args.GetReturnValue().Set(resolver->GetPromise()); + return v8::Global(env->isolate(), resolver); +} + +// --------------------------------------------------------------------------- +// CoroDispatch — Creates a Promise, constructs the coroutine task by +// calling `impl(env, resolver, extra_args...)`, initializes tracking, +// and starts the coroutine in fire-and-forget mode. +// +// Usage: +// CoroDispatch(env, args, CoroAccessImpl, +// std::string(*path, path.length()), mode); +// --------------------------------------------------------------------------- + +template +void CoroDispatch(Environment* env, + const v8::FunctionCallbackInfo& args, + Impl&& impl, + Args&&... extra_args) { + auto resolver = MakePromise(env, args); + auto task = impl(env, std::move(resolver), std::forward(extra_args)...); + task.InitTracking(env); + task.Start(); +} + +} // namespace coro +} // namespace node + +#endif // SRC_CORO_UV_PROMISE_H_ diff --git a/src/coro/uv_task.h b/src/coro/uv_task.h new file mode 100644 index 00000000000000..a7a381d915858d --- /dev/null +++ b/src/coro/uv_task.h @@ -0,0 +1,210 @@ +#ifndef SRC_CORO_UV_TASK_H_ +#define SRC_CORO_UV_TASK_H_ + +#include +#include +#include +#include +#include + +namespace node { +namespace coro { + +// --------------------------------------------------------------------------- +// UvTask — A coroutine return type for libuv-based async operations. +// +// Supports two usage modes: +// +// 1. Composition: co_await a UvTask from another coroutine. +// UvTask inner() { co_return 42; } +// UvTask outer() { int v = co_await inner(); } +// +// 2. Fire-and-forget: call Start() from non-coroutine code. +// The coroutine self-destructs when it completes. +// auto task = my_coroutine(args...); +// task.Start(); // runs until first co_await, then returns +// +// The task uses lazy initialization (suspends at initial_suspend). +// Execution begins when the task is co_awaited or Start() is called. +// --------------------------------------------------------------------------- + +template +class UvTask; + +namespace detail { + +// FinalAwaiter handles coroutine completion. If the coroutine was +// co_awaited by a parent, symmetric-transfer to the parent. If it +// was Start()ed (fire-and-forget / detached), allow the frame to +// self-destruct. +template +struct FinalAwaiter { + bool await_ready() noexcept { + // If detached, "ready" means don't suspend — the frame is destroyed + // automatically when final_suspend's co_await doesn't suspend. + return promise_.detached_; + } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle) noexcept { + // Not detached — symmetric transfer to the continuation (parent + // coroutine), or noop if there is none. + if (auto cont = promise_.continuation_) return cont; + return std::noop_coroutine(); + } + + void await_resume() noexcept {} + + Promise& promise_; +}; + +// Common promise base — holds the continuation, detached flag, and +// exception state shared by both T and void specializations. +struct PromiseBase { + std::coroutine_handle<> continuation_; + std::exception_ptr exception_; + bool detached_ = false; + + std::suspend_always initial_suspend() noexcept { return {}; } + + void unhandled_exception() noexcept { exception_ = std::current_exception(); } + + void rethrow_if_exception() { + if (exception_) std::rethrow_exception(exception_); + } +}; + +} // namespace detail + +// =========================================================================== +// UvTask — coroutine that produces a T value +// =========================================================================== +template +class UvTask { + public: + struct promise_type : detail::PromiseBase { + std::optional result_; + + UvTask get_return_object() { + return UvTask{std::coroutine_handle::from_promise(*this)}; + } + + auto final_suspend() noexcept { + return detail::FinalAwaiter{*this}; + } + + void return_value(T value) { result_.emplace(std::move(value)); } + }; + + // -- Awaitable interface (for co_await from another coroutine) -- + + bool await_ready() const noexcept { return false; } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle<> caller) noexcept { + handle_.promise().continuation_ = caller; + return handle_; // symmetric transfer — start this task + } + + T await_resume() { + handle_.promise().rethrow_if_exception(); + return std::move(*handle_.promise().result_); + } + + // -- Fire-and-forget interface (for non-coroutine callers) -- + + // Start the coroutine. It will run until its first co_await + // suspension point, then control returns here. When the coroutine + // eventually completes, the frame self-destructs. + void Start() { + handle_.promise().detached_ = true; + auto h = std::exchange(handle_, nullptr); + h.resume(); + } + + bool Done() const { return handle_ && handle_.done(); } + + // -- Lifecycle -- + + ~UvTask() { + if (handle_) handle_.destroy(); + } + + UvTask(UvTask&& o) noexcept : handle_(std::exchange(o.handle_, nullptr)) {} + UvTask& operator=(UvTask&& o) noexcept { + if (this != &o) { + if (handle_) handle_.destroy(); + handle_ = std::exchange(o.handle_, nullptr); + } + return *this; + } + + UvTask(const UvTask&) = delete; + UvTask& operator=(const UvTask&) = delete; + + private: + explicit UvTask(std::coroutine_handle h) : handle_(h) {} + std::coroutine_handle handle_; +}; + +// =========================================================================== +// UvTask — coroutine that produces no value +// =========================================================================== +template <> +class UvTask { + public: + struct promise_type : detail::PromiseBase { + UvTask get_return_object() { + return UvTask{std::coroutine_handle::from_promise(*this)}; + } + + auto final_suspend() noexcept { + return detail::FinalAwaiter{*this}; + } + + void return_void() {} + }; + + bool await_ready() const noexcept { return false; } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle<> caller) noexcept { + handle_.promise().continuation_ = caller; + return handle_; + } + + void await_resume() { handle_.promise().rethrow_if_exception(); } + + void Start() { + handle_.promise().detached_ = true; + auto h = std::exchange(handle_, nullptr); + h.resume(); + } + + bool Done() const { return handle_ && handle_.done(); } + + ~UvTask() { + if (handle_) handle_.destroy(); + } + + UvTask(UvTask&& o) noexcept : handle_(std::exchange(o.handle_, nullptr)) {} + UvTask& operator=(UvTask&& o) noexcept { + if (this != &o) { + if (handle_) handle_.destroy(); + handle_ = std::exchange(o.handle_, nullptr); + } + return *this; + } + + UvTask(const UvTask&) = delete; + UvTask& operator=(const UvTask&) = delete; + + private: + explicit UvTask(std::coroutine_handle h) : handle_(h) {} + std::coroutine_handle handle_; +}; + +} // namespace coro +} // namespace node + +#endif // SRC_CORO_UV_TASK_H_ diff --git a/src/coro/uv_tracked_task.h b/src/coro/uv_tracked_task.h new file mode 100644 index 00000000000000..fe5b5493360f47 --- /dev/null +++ b/src/coro/uv_tracked_task.h @@ -0,0 +1,475 @@ +#ifndef SRC_CORO_UV_TRACKED_TASK_H_ +#define SRC_CORO_UV_TRACKED_TASK_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_context_frame.h" +#include "async_wrap.h" +#include "env-inl.h" +#include "node_internals.h" +#include "tracing/traced_value.h" +#include "v8.h" + +#include "coro/uv_task.h" // for detail::PromiseBase + +namespace node { +namespace coro { + +// --------------------------------------------------------------------------- +// CoroFrameAllocator -- thread-local free-list allocator for coroutine +// frames. Since all coroutines run on the event loop thread, this is +// single-threaded and lock-free. +// +// Freed frames are kept on a per-size-class free list instead of being +// returned to malloc. Size classes are rounded up to the next 256-byte +// boundary (coroutine frames are typically 500-2000 bytes). +// --------------------------------------------------------------------------- + +namespace detail { + +struct FreeBlock { + FreeBlock* next; + size_t size; +}; + +inline constexpr size_t kMaxCachedFrameSize = 4096; +inline constexpr size_t kSizeClassGranularity = 256; +inline constexpr size_t kNumSizeClasses = + kMaxCachedFrameSize / kSizeClassGranularity; +inline constexpr size_t kMaxCachedPerBucket = 32; + +inline size_t SizeClassIndex(size_t n) { + return (n + kSizeClassGranularity - 1) / kSizeClassGranularity - 1; +} + +inline size_t RoundUpToSizeClass(size_t n) { + return (SizeClassIndex(n) + 1) * kSizeClassGranularity; +} + +struct FreeBucket { + FreeBlock* head = nullptr; + size_t count = 0; +}; + +inline FreeBucket& GetBucket(size_t index) { + static thread_local FreeBucket buckets[kNumSizeClasses] = {}; + return buckets[index]; +} + +inline void* CoroFrameAlloc(size_t n) { + if (n <= kMaxCachedFrameSize) { + size_t idx = SizeClassIndex(n); + FreeBucket& bucket = GetBucket(idx); + if (bucket.head != nullptr) { + FreeBlock* block = bucket.head; + bucket.head = block->next; + bucket.count--; + return block; + } + return ::operator new(RoundUpToSizeClass(n)); + } + return ::operator new(n); +} + +inline void CoroFrameFree(void* p, size_t n) { + if (n <= kMaxCachedFrameSize) { + size_t idx = SizeClassIndex(n); + FreeBucket& bucket = GetBucket(idx); + if (bucket.count >= kMaxCachedPerBucket) { + ::operator delete(p, RoundUpToSizeClass(n)); + return; + } + auto* block = static_cast(p); + block->next = bucket.head; + bucket.head = block; + bucket.count++; + return; + } + ::operator delete(p, n); +} + +// Provider name lookup for trace events. Matches the static array in +// async_wrap.cc but accessible from this header. +inline const char* ProviderName(AsyncWrap::ProviderType provider) { + static const char* const names[] = { +#define V(PROVIDER) #PROVIDER, + NODE_ASYNC_PROVIDER_TYPES(V) +#undef V + }; + return names[provider]; +} + +} // namespace detail + +// --------------------------------------------------------------------------- +// UvTrackedTask -- A coroutine return type with full +// integration: +// +// - InternalCallbackScope per resume segment (async_hooks, context frame, +// microtask + nextTick draining) +// - Event loop liveness via request_waiting_ counter +// - Cancellation support via cancelable_req() +// - Trace event emission +// - Fatal error on silently swallowed exceptions +// - Thread-local frame allocator for reduced malloc overhead +// - Lazy resource object creation (only when hooks need it) +// - Pre-cached per-Isolate type name string via IsolateData +// +// The Provider template parameter is an AsyncWrap::ProviderType enum value +// that identifies the async resource type visible to async_hooks and trace +// events. +// --------------------------------------------------------------------------- + +template +class UvTrackedTask; + +namespace detail { + +// TrackedAwaitable -- wraps any awaitable with InternalCallbackScope +// management, request counter, and cancellation tracking. +template +struct TrackedAwaitable { + Inner inner_; + Promise* promise_; + + bool await_ready() noexcept { return inner_.await_ready(); } + + template + auto await_suspend(H h) noexcept { + promise_->on_suspend(); + + if constexpr (requires { inner_.cancelable_req(); }) { + promise_->on_dispatch(inner_.cancelable_req()); + } + + return inner_.await_suspend(h); + } + + auto await_resume() noexcept(noexcept(inner_.await_resume())) { + if constexpr (requires { inner_.cancelable_req(); }) { + promise_->on_io_complete(); + } + + promise_->on_resume(); + return inner_.await_resume(); + } +}; + +// TrackedPromiseBase -- extends PromiseBase with full Node.js integration. +struct TrackedPromiseBase : PromiseBase { + Environment* env_ = nullptr; + AsyncWrap::ProviderType provider_ = AsyncWrap::PROVIDER_NONE; + double async_id_ = -1; + double trigger_async_id_ = -1; + + v8::Global context_frame_; + v8::Global resource_; + + std::optional handle_scope_; + std::optional scope_; + + uv_req_t* active_req_ = nullptr; + + std::list::iterator coro_list_iter_; + + static void* operator new(size_t n) { return CoroFrameAlloc(n); } + static void operator delete(void* p, size_t n) { CoroFrameFree(p, n); } + + void init_tracking(Environment* env, AsyncWrap::ProviderType provider) { + env_ = env; + provider_ = provider; + v8::Isolate* isolate = env->isolate(); + v8::HandleScope handle_scope(isolate); + + async_id_ = env->new_async_id(); + trigger_async_id_ = env->get_default_trigger_async_id(); + + context_frame_.Reset(isolate, async_context_frame::current(isolate)); + + env->coro_task_list()->push_back(this); + coro_list_iter_ = std::prev(env->coro_task_list()->end()); + + // Trace event: use the provider name from the enum. + TRACE_EVENT_NESTABLE_ASYNC_BEGIN1(TRACING_CATEGORY_NODE1(async_hooks), + ProviderName(provider), + static_cast(async_id_), + "triggerAsyncId", + static_cast(trigger_async_id_)); + + AsyncHooks* hooks = env->async_hooks(); + bool needs_resource = + hooks->fields()[AsyncHooks::kInit] > 0 || + hooks->fields()[AsyncHooks::kUsesExecutionAsyncResource] > 0; + + if (needs_resource) { + resource_.Reset(isolate, v8::Object::New(isolate)); + } + + if (hooks->fields()[AsyncHooks::kInit] > 0) { + // The type name V8 string is pre-cached per Isolate in + // IsolateData::async_wrap_providers_, created at Isolate startup. + // Zero per-coroutine allocation cost. + v8::Local type = + env->isolate_data()->async_wrap_provider(provider); + AsyncWrap::EmitAsyncInit( + env, resource_.Get(isolate), type, async_id_, trigger_async_id_); + } + } + + void on_resume() noexcept { + if (env_ == nullptr) return; + + handle_scope_.emplace(env_->isolate()); + + v8::Global* res_ptr = + resource_.IsEmpty() ? nullptr : &resource_; + scope_.emplace(env_, + res_ptr, + async_context{async_id_, trigger_async_id_}, + InternalCallbackScope::kNoFlags, + context_frame_.Get(env_->isolate())); + } + + void on_suspend() noexcept { + if (!scope_.has_value()) return; + scope_->Close(); + scope_.reset(); + handle_scope_.reset(); + } + + void on_dispatch(uv_req_t* req) noexcept { + active_req_ = req; + if (env_ != nullptr) { + env_->IncreaseWaitingRequestCounter(); + } + } + + void on_io_complete() noexcept { + active_req_ = nullptr; + if (env_ != nullptr) { + env_->DecreaseWaitingRequestCounter(); + } + } + + void on_destroy() noexcept { + if (env_ == nullptr) return; + + env_->coro_task_list()->erase(coro_list_iter_); + + AsyncWrap::EmitDestroy(env_, async_id_); + + TRACE_EVENT_NESTABLE_ASYNC_END0(TRACING_CATEGORY_NODE1(async_hooks), + ProviderName(provider_), + static_cast(async_id_)); + + context_frame_.Reset(); + resource_.Reset(); + } + + void Cancel() { + if (active_req_ != nullptr) { + uv_cancel(active_req_); + } + } +}; + +template +struct TrackedInitialAwaiter { + bool await_ready() noexcept { return false; } + void await_suspend(std::coroutine_handle<>) noexcept {} + void await_resume() noexcept { promise_.on_resume(); } + + Promise& promise_; +}; + +template +struct TrackedFinalAwaiter { + bool await_ready() noexcept { + promise_.on_suspend(); + + if (promise_.detached_ && promise_.exception_) { + std::fprintf(stderr, + "FATAL: Unhandled C++ exception in detached coroutine\n"); + std::fflush(stderr); + std::terminate(); + } + + promise_.on_destroy(); + return promise_.detached_; + } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle) noexcept { + if (auto cont = promise_.continuation_) return cont; + return std::noop_coroutine(); + } + + void await_resume() noexcept {} + + Promise& promise_; +}; + +} // namespace detail + +// =========================================================================== +// UvTrackedTask -- tracked coroutine that produces a T value +// =========================================================================== +template +class UvTrackedTask { + public: + struct promise_type : detail::TrackedPromiseBase { + std::optional result_; + + UvTrackedTask get_return_object() { + return UvTrackedTask{ + std::coroutine_handle::from_promise(*this)}; + } + + auto initial_suspend() noexcept { + return detail::TrackedInitialAwaiter{*this}; + } + + auto final_suspend() noexcept { + return detail::TrackedFinalAwaiter{*this}; + } + + void return_value(T value) { result_.emplace(std::move(value)); } + + template + auto await_transform(Awaitable&& aw) { + return detail::TrackedAwaitable, promise_type>{ + std::forward(aw), this}; + } + }; + + bool await_ready() const noexcept { return false; } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle<> caller) noexcept { + handle_.promise().continuation_ = caller; + return handle_; + } + + T await_resume() { + handle_.promise().rethrow_if_exception(); + return std::move(*handle_.promise().result_); + } + + void InitTracking(Environment* env) { + handle_.promise().init_tracking(env, Provider); + } + + void Start() { + handle_.promise().detached_ = true; + auto h = std::exchange(handle_, nullptr); + h.resume(); + } + + bool Done() const { return handle_ && handle_.done(); } + + ~UvTrackedTask() { + if (handle_) handle_.destroy(); + } + + UvTrackedTask(UvTrackedTask&& o) noexcept + : handle_(std::exchange(o.handle_, nullptr)) {} + UvTrackedTask& operator=(UvTrackedTask&& o) noexcept { + if (this != &o) { + if (handle_) handle_.destroy(); + handle_ = std::exchange(o.handle_, nullptr); + } + return *this; + } + + UvTrackedTask(const UvTrackedTask&) = delete; + UvTrackedTask& operator=(const UvTrackedTask&) = delete; + + private: + explicit UvTrackedTask(std::coroutine_handle h) : handle_(h) {} + std::coroutine_handle handle_; +}; + +// =========================================================================== +// Partial specialization for T = void +// =========================================================================== +template +class UvTrackedTask { + public: + struct promise_type : detail::TrackedPromiseBase { + UvTrackedTask get_return_object() { + return UvTrackedTask{ + std::coroutine_handle::from_promise(*this)}; + } + + auto initial_suspend() noexcept { + return detail::TrackedInitialAwaiter{*this}; + } + + auto final_suspend() noexcept { + return detail::TrackedFinalAwaiter{*this}; + } + + void return_void() {} + + template + auto await_transform(Awaitable&& aw) { + return detail::TrackedAwaitable, promise_type>{ + std::forward(aw), this}; + } + }; + + bool await_ready() const noexcept { return false; } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle<> caller) noexcept { + handle_.promise().continuation_ = caller; + return handle_; + } + + void await_resume() { handle_.promise().rethrow_if_exception(); } + + void InitTracking(Environment* env) { + handle_.promise().init_tracking(env, Provider); + } + + void Start() { + handle_.promise().detached_ = true; + auto h = std::exchange(handle_, nullptr); + h.resume(); + } + + bool Done() const { return handle_ && handle_.done(); } + + ~UvTrackedTask() { + if (handle_) handle_.destroy(); + } + + UvTrackedTask(UvTrackedTask&& o) noexcept + : handle_(std::exchange(o.handle_, nullptr)) {} + UvTrackedTask& operator=(UvTrackedTask&& o) noexcept { + if (this != &o) { + if (handle_) handle_.destroy(); + handle_ = std::exchange(o.handle_, nullptr); + } + return *this; + } + + UvTrackedTask(const UvTrackedTask&) = delete; + UvTrackedTask& operator=(const UvTrackedTask&) = delete; + + private: + explicit UvTrackedTask(std::coroutine_handle h) : handle_(h) {} + std::coroutine_handle handle_; +}; + +} // namespace coro +} // namespace node + +#endif // SRC_CORO_UV_TRACKED_TASK_H_ diff --git a/src/env.cc b/src/env.cc index 04807ffae13437..06aec6886bda1a 100644 --- a/src/env.cc +++ b/src/env.cc @@ -1,6 +1,7 @@ #include "env.h" #include "async_wrap.h" #include "base_object-inl.h" +#include "coro/uv_tracked_task.h" #include "debug_utils-inl.h" #include "diagnosticfilename-inl.h" #include "memory_tracker-inl.h" @@ -524,14 +525,14 @@ void IsolateData::CreateProperties() { // Create all the provider strings that will be passed to JS. Place them in // an array so the array index matches the PROVIDER id offset. This way the // strings can be retrieved quickly. -#define V(Provider) \ - async_wrap_providers_[AsyncWrap::PROVIDER_ ## Provider].Set( \ - isolate_, \ - String::NewFromOneByte( \ - isolate_, \ - reinterpret_cast(#Provider), \ - NewStringType::kInternalized, \ - sizeof(#Provider) - 1).ToLocalChecked()); +#define V(Provider) \ + async_wrap_providers_[AsyncWrap::PROVIDER_##Provider].Set( \ + isolate_, \ + String::NewFromOneByte(isolate_, \ + reinterpret_cast(#Provider), \ + NewStringType::kInternalized, \ + sizeof(#Provider) - 1) \ + .ToLocalChecked()); NODE_ASYNC_PROVIDER_TYPES(V) #undef V @@ -663,8 +664,7 @@ void TrackingTraceStateObserver::UpdateTraceCategoryState() { Isolate* isolate = env_->isolate(); HandleScope handle_scope(isolate); Local cb = env_->trace_category_state_function(); - if (cb.IsEmpty()) - return; + if (cb.IsEmpty()) return; TryCatchScope try_catch(env_); try_catch.SetVerbose(true); Local args[] = {Boolean::New(isolate, async_hooks_enabled)}; @@ -783,8 +783,7 @@ std::string Environment::GetExecPath(const std::vector& argv) { #if defined(__OpenBSD__) uv_fs_t req; req.ptr = nullptr; - if (0 == - uv_fs_realpath(nullptr, &req, exec_path.c_str(), nullptr)) { + if (0 == uv_fs_realpath(nullptr, &req, exec_path.c_str(), nullptr)) { CHECK_NOT_NULL(req.ptr); exec_path = std::string(static_cast(req.ptr)); } @@ -861,17 +860,16 @@ Environment::Environment(IsolateData* isolate_data, // Set some flags if only kDefaultFlags was passed. This can make API version // transitions easier for embedders. if (flags_ & EnvironmentFlags::kDefaultFlags) { - flags_ = flags_ | - EnvironmentFlags::kOwnsProcessState | - EnvironmentFlags::kOwnsInspector; + flags_ = flags_ | EnvironmentFlags::kOwnsProcessState | + EnvironmentFlags::kOwnsInspector; } // We create new copies of the per-Environment option sets, so that it is // easier to modify them after Environment creation. The defaults are // part of the per-Isolate option set, for which in turn the defaults are // part of the per-process option set. - options_ = std::make_shared( - *isolate_data->options()->per_env); + options_ = + std::make_shared(*isolate_data->options()->per_env); inspector_host_port_ = std::make_shared>( options_->debug_options().host_port); @@ -1008,7 +1006,7 @@ void Environment::InitializeMainContext(Local context, if (per_process::v8_initialized) { performance_state_->Mark(performance::NODE_PERFORMANCE_MILESTONE_V8_START, - performance::performance_v8_start); + performance::performance_v8_start); } } @@ -1028,9 +1026,11 @@ Environment::~Environment() { #ifdef DEBUG bool consistency_check = false; - isolate()->RequestInterrupt([](Isolate*, void* data) { - *static_cast(data) = true; - }, &consistency_check); + isolate()->RequestInterrupt( + [](Isolate*, void* data) { // NOLINT(readability/null_usage) + *static_cast(data) = true; + }, + &consistency_check); #endif Local