Skip to main content

hydro_lang/
singleton_ref.rs

1//! Singleton reference handle for capturing singletons in `q!()` closures.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::rc::Rc;
6
7use proc_macro2::Span;
8use quote::quote;
9use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
10
11use crate::compile::ir::{HydroNode, SharedNode};
12use crate::location::Location;
13
14/// A lightweight handle to a singleton that can be captured inside `q!()` closures.
15///
16/// Created via [`Singleton::by_ref()`](crate::live_collections::Singleton::by_ref). When used
17/// inside a `q!()` closure, resolves to a reference to the singleton's value (`&T`) at runtime.
18///
19/// This type is `Copy` (required by `q!()` macro internals).
20/// TODO(mingwei): <https://github.com/hydro-project/stageleft/issues/73>
21pub struct SingletonRef<'a, T, L> {
22    pub(crate) node: *const RefCell<HydroNode>,
23    _phantom: PhantomData<(&'a (), T, L)>,
24}
25impl<T, L> SingletonRef<'_, T, L> {
26    /// Creates a `SingletonRef` from a shared node.
27    ///
28    /// Note that this will permanently keep the `Rc` alive, intentionally creating a memory leak
29    /// (like [`Box::leak`]).
30    pub(crate) fn new(rc_ptr: Rc<RefCell<HydroNode>>) -> Self {
31        // SAFETY: `rc_ptr` will now never be dropped, and therefore the count cannot reach zero.
32        let node = Rc::into_raw(rc_ptr);
33        Self {
34            node,
35            _phantom: PhantomData,
36        }
37    }
38}
39
40impl<T, L> Copy for SingletonRef<'_, T, L> {}
41impl<T, L> Clone for SingletonRef<'_, T, L> {
42    fn clone(&self) -> Self {
43        *self
44    }
45}
46
47// Thread-local storage for singleton references captured during `q!()` expansion.
48// Maps local ident name -> SharedNode for each singleton captured in the current closure.
49thread_local! {
50    static SINGLETON_REFS: RefCell<Option<Vec<(syn::Ident, HydroNode)>>> = const { RefCell::new(None) };
51}
52
53/// Activate the singleton reference capture context. Must be called before `q!()` expansion
54/// that may capture singletons. Returns the captured references when the scope ends.
55pub fn with_singleton_capture<R>(f: impl FnOnce() -> R) -> (R, Vec<(syn::Ident, HydroNode)>) {
56    SINGLETON_REFS.with(|cell| {
57        let prev = cell.borrow_mut().replace(Vec::new());
58        assert!(
59            prev.is_none(),
60            "nested singleton capture scopes are not supported"
61        );
62    });
63    let result = f();
64    let captured = SINGLETON_REFS.with(|cell| cell.borrow_mut().take().unwrap());
65    (result, captured)
66}
67
68static SINGLETON_REF_COUNTER: std::sync::atomic::AtomicUsize =
69    std::sync::atomic::AtomicUsize::new(0);
70
71impl<'a, T: 'a, L> FreeVariableWithContextWithProps<L, ()> for SingletonRef<'a, T, L>
72where
73    L: Location<'a>,
74{
75    type O = &'a T;
76
77    fn to_tokens(self, _ctx: &L) -> (QuoteTokens, ()) {
78        let id = SINGLETON_REF_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
79        let ident = syn::Ident::new(&format!("__hydro_singleton_ref_{}", id), Span::call_site());
80
81        SINGLETON_REFS.with(|cell| {
82            let mut guard = cell.borrow_mut();
83            let refs = guard.as_mut().expect(
84                "SingletonRef used inside q!() but no singleton capture scope is active. \
85                 This is a bug — singleton capture should be set up by the operator that uses q!().",
86            );
87            // Reconstruct the Rc from the raw pointer.
88            // SAFETY: The `Rc` is leaked by `Rc::into_raw` in `Self::new` and is forever valid.
89            // The created `Rc`s `Drop` must not run, that would remove the original refcount.
90            let rc = unsafe { Rc::from_raw(self.node) };
91            let cloned = rc.clone();
92            std::mem::forget(rc); // Don't decrement the original refcount
93
94            let metadata = cloned.borrow().metadata().clone(); // TODO(mingwei): wrong metadata!
95            refs.push((
96                ident.clone(),
97                HydroNode::Singleton {
98                    inner: SharedNode(cloned),
99                    metadata,
100                },
101            ));
102        });
103
104        (
105            QuoteTokens {
106                prelude: None,
107                expr: Some(quote!(#ident)),
108            },
109            (),
110        )
111    }
112}
113
114#[cfg(test)]
115#[cfg(feature = "build")]
116mod tests {
117    use stageleft::q;
118
119    use crate::compile::builder::FlowBuilder;
120    use crate::location::Location;
121
122    struct P1 {}
123
124    /// Compile-only test: verifies that `by_ref()` + `q!()` produces valid IR
125    /// that can be finalized without panicking.
126    #[test]
127    fn singleton_by_ref_compiles() {
128        let mut flow = FlowBuilder::new();
129        let node = flow.process::<P1>();
130
131        let my_count = node
132            .source_iter(q!(0..5i32))
133            .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
134        let count_ref = my_count.by_ref();
135
136        node.source_iter(q!(1..=3i32))
137            .map(q!(|x| x + *count_ref))
138            .for_each(q!(|_| {}));
139
140        // Also consume the singleton via pipe (tests Tee works correctly).
141        my_count.into_stream().for_each(q!(|_| {}));
142
143        // If this doesn't panic, the IR was built successfully with singleton refs.
144        let _built = flow.finalize();
145    }
146
147    /// Test with a non-Copy type (Vec) to ensure we're borrowing, not copying.
148    #[test]
149    fn singleton_by_ref_non_copy() {
150        let mut flow = FlowBuilder::new();
151        let node = flow.process::<P1>();
152
153        let my_vec = node.source_iter(q!(0..5i32)).fold(
154            q!(|| Vec::<i32>::new()),
155            q!(|acc: &mut Vec<i32>, x| acc.push(x)),
156        );
157        let vec_ref = my_vec.by_ref();
158
159        node.source_iter(q!(1..=3i32))
160            .map(q!(|x| x + vec_ref.len() as i32))
161            .for_each(q!(|_| {}));
162
163        // Also consume the singleton via pipe.
164        my_vec.into_stream().for_each(q!(|_| {}));
165
166        let _built = flow.finalize();
167    }
168}