neon/sys/
tsfn.rs

1//! Idiomatic Rust wrappers for N-API threadsafe functions
2
3use std::{
4    ffi::c_void,
5    mem::MaybeUninit,
6    ptr,
7    sync::{Arc, Mutex},
8};
9
10use super::{bindings as napi, no_panic::FailureBoundary, raw::Env};
11
12const BOUNDARY: FailureBoundary = FailureBoundary {
13    both: "A panic and exception occurred while executing a `neon::event::Channel::send` callback",
14    exception: "An exception occurred while executing a `neon::event::Channel::send` callback",
15    panic: "A panic occurred while executing a `neon::event::Channel::send` callback",
16};
17
18#[derive(Debug)]
19struct Tsfn(napi::ThreadsafeFunction);
20
21unsafe impl Send for Tsfn {}
22
23unsafe impl Sync for Tsfn {}
24
25#[derive(Debug)]
26/// Threadsafe Function encapsulate a Rust function pointer and N-API threadsafe
27/// function for scheduling tasks to execute on a JavaScript thread.
28pub struct ThreadsafeFunction<T> {
29    tsfn: Tsfn,
30    is_finalized: Arc<Mutex<bool>>,
31    callback: fn(Option<Env>, T),
32}
33
34#[derive(Debug)]
35struct Callback<T> {
36    callback: fn(Option<Env>, T),
37    data: T,
38}
39
40/// Error returned when scheduling a threadsafe function with some data
41pub struct CallError;
42
43impl<T: Send + 'static> ThreadsafeFunction<T> {
44    /// Creates a new unbounded N-API Threadsafe Function
45    /// Safety: `Env` must be valid for the current thread
46    pub unsafe fn new(env: Env, callback: fn(Option<Env>, T)) -> Self {
47        Self::with_capacity(env, 0, callback)
48    }
49
50    /// Creates a bounded N-API Threadsafe Function
51    /// Safety: `Env` must be valid for the current thread
52    pub unsafe fn with_capacity(
53        env: Env,
54        max_queue_size: usize,
55        callback: fn(Option<Env>, T),
56    ) -> Self {
57        let mut result = MaybeUninit::uninit();
58        let is_finalized = Arc::new(Mutex::new(false));
59
60        assert_eq!(
61            napi::create_threadsafe_function(
62                env,
63                std::ptr::null_mut(),
64                std::ptr::null_mut(),
65                super::string(env, "neon threadsafe function"),
66                max_queue_size,
67                // Always set the reference count to 1. Prefer using
68                // Rust `Arc` to maintain the struct.
69                1,
70                Arc::into_raw(is_finalized.clone()) as *mut _,
71                Some(Self::finalize),
72                std::ptr::null_mut(),
73                Some(Self::callback),
74                result.as_mut_ptr(),
75            ),
76            Ok(()),
77        );
78
79        Self {
80            tsfn: Tsfn(result.assume_init()),
81            is_finalized,
82            callback,
83        }
84    }
85
86    /// Schedule a threadsafe function to be executed with some data
87    pub fn call(
88        &self,
89        data: T,
90        is_blocking: Option<napi::ThreadsafeFunctionCallMode>,
91    ) -> Result<(), CallError> {
92        let is_blocking = is_blocking.unwrap_or(napi::ThreadsafeFunctionCallMode::Blocking);
93
94        let callback = Box::into_raw(Box::new(Callback {
95            callback: self.callback,
96            data,
97        }));
98
99        // Hold the lock before entering `call_threadsafe_function` so that
100        // `finalize_cb` would never complete.
101        let mut is_finalized = self.is_finalized.lock().unwrap();
102
103        let status = {
104            if *is_finalized {
105                Err(napi::Status::Closing)
106            } else {
107                unsafe {
108                    napi::call_threadsafe_function(self.tsfn.0, callback as *mut _, is_blocking)
109                }
110            }
111        };
112
113        match status {
114            Ok(()) => Ok(()),
115            Err(status) => {
116                // Prevent further calls to `call_threadsafe_function`
117                if status == napi::Status::Closing {
118                    *is_finalized = true;
119                }
120
121                // If the call failed, the callback won't execute
122                let _ = unsafe { Box::from_raw(callback) };
123
124                Err(CallError)
125            }
126        }
127    }
128
129    /// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default)
130    /// Safety: `Env` must be valid for the current thread
131    pub unsafe fn reference(&self, env: Env) {
132        napi::ref_threadsafe_function(env, self.tsfn.0).unwrap();
133    }
134
135    /// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped.
136    /// Safety: `Env` must be valid for the current thread
137    pub unsafe fn unref(&self, env: Env) {
138        napi::unref_threadsafe_function(env, self.tsfn.0).unwrap();
139    }
140
141    // Provides a C ABI wrapper for a napi callback notifying us about tsfn
142    // being finalized.
143    unsafe extern "C" fn finalize(_env: Env, data: *mut c_void, _hint: *mut c_void) {
144        let is_finalized = Arc::from_raw(data as *mut Mutex<bool>);
145
146        *is_finalized.lock().unwrap() = true;
147    }
148
149    // Provides a C ABI wrapper for invoking the user supplied function pointer
150    // On panic or exception, creates a fatal exception of the form:
151    // Error(msg: string) {
152    //     // Exception thrown
153    //     cause?: Error,
154    //     // Panic occurred
155    //     panic?: Error(msg: string) {
156    //         // Opaque panic type if it wasn't a string
157    //         cause?: JsBox<Panic>
158    //     }
159    // }
160    unsafe extern "C" fn callback(
161        env: Env,
162        _js_callback: napi::Value,
163        _context: *mut c_void,
164        data: *mut c_void,
165    ) {
166        let Callback { callback, data } = *Box::from_raw(data as *mut Callback<T>);
167
168        BOUNDARY.catch_failure(env, None, move |env| {
169            callback(env, data);
170            ptr::null_mut()
171        });
172    }
173}
174
175impl<T> Drop for ThreadsafeFunction<T> {
176    fn drop(&mut self) {
177        let is_finalized = self.is_finalized.lock().unwrap();
178
179        // tsfn was already finalized by `Environment::CleanupHandles()` in Node.js
180        if *is_finalized {
181            return;
182        }
183
184        unsafe {
185            debug_assert_eq!(
186                napi::release_threadsafe_function(
187                    self.tsfn.0,
188                    napi::ThreadsafeFunctionReleaseMode::Release,
189                ),
190                Ok(())
191            );
192        };
193    }
194}