Skip to main content

hydro_lang/compile/
builder.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16
17/// A compile-time directive to spawn a future on a location's `LocalSet`
18/// alongside the DFIR scheduler.
19pub enum Sidecar {
20    /// A ready-to-go future expression (e.g. telemetry metrics collection).
21    Simple {
22        location_key: LocationKey,
23        future_expr: Box<syn::Expr>,
24    },
25    /// A user-owned sidecar that returns a `(Stream, Sink)` pair to the framework.
26    /// The closure is called at startup; the returned stream feeds items into the
27    /// dataflow and the returned sink receives items from the dataflow.
28    Bidi {
29        location_key: LocationKey,
30        sidecar_id: SidecarId,
31        sidecar_closure: Box<syn::Expr>,
32    },
33}
34#[cfg(feature = "sim")]
35#[cfg(stageleft_runtime)]
36use crate::sim::flow::SimFlow;
37use crate::staging_util::Invariant;
38
39#[stageleft::export(ExternalPortId, CycleId, ClockId, SidecarId)]
40crate::newtype_counter! {
41    /// ID for an external output.
42    pub struct ExternalPortId(usize);
43
44    /// ID for a [`crate::location::Location::forward_ref`] cycle.
45    pub struct CycleId(usize);
46
47    /// ID for clocks (ticks).
48    pub struct ClockId(usize);
49
50    /// ID for user-owned sidecars.
51    pub struct SidecarId(usize);
52}
53
54impl CycleId {
55    #[cfg(feature = "build")]
56    pub(crate) fn as_ident(&self) -> syn::Ident {
57        syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
58    }
59}
60
61impl SidecarId {
62    /// Derives the two idents for a bidi sidecar: `(stream, sink)`.
63    pub fn idents(&self) -> (syn::Ident, syn::Ident) {
64        let span = proc_macro2::Span::call_site();
65        (
66            syn::Ident::new(&format!("__hydro_sidecar_{}_stream", self), span),
67            syn::Ident::new(&format!("__hydro_sidecar_{}_sink", self), span),
68        )
69    }
70}
71
72pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
73
74pub(crate) struct FlowStateInner {
75    /// Tracks the roots of the dataflow IR. This is referenced by
76    /// `Stream` and `HfCycle` to build the IR. The inner option will
77    /// be set to `None` when this builder is finalized.
78    roots: Option<Vec<HydroRoot>>,
79
80    /// Counter for generating unique external output identifiers.
81    next_external_port: ExternalPortId,
82
83    /// Counters for generating identifiers for cycles.
84    next_cycle_id: CycleId,
85
86    /// Counters for clock IDs.
87    next_clock_id: ClockId,
88
89    /// Counter for generating unique sidecar identifiers, not used for anything else.
90    next_sidecar_id: SidecarId,
91
92    /// Compile-time sidecar directives. Processed during compilation,
93    /// not part of the dataflow IR.
94    pub sidecars: Vec<Sidecar>,
95}
96
97impl FlowStateInner {
98    pub fn next_external_port(&mut self) -> ExternalPortId {
99        self.next_external_port.get_and_increment()
100    }
101
102    pub fn next_cycle_id(&mut self) -> CycleId {
103        self.next_cycle_id.get_and_increment()
104    }
105
106    pub fn next_clock_id(&mut self) -> ClockId {
107        self.next_clock_id.get_and_increment()
108    }
109
110    pub fn next_sidecar_id(&mut self) -> SidecarId {
111        self.next_sidecar_id.get_and_increment()
112    }
113
114    pub fn push_root(&mut self, root: HydroRoot) {
115        self.roots
116            .as_mut()
117            .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
118            .push(root);
119    }
120
121    pub fn try_push_root(&mut self, root: HydroRoot) {
122        if let Some(roots) = self.roots.as_mut() {
123            roots.push(root);
124        }
125    }
126}
127
128pub struct FlowBuilder<'a> {
129    /// Hydro IR and associated counters
130    flow_state: FlowState,
131
132    /// Locations and their type.
133    locations: SlotMap<LocationKey, LocationType>,
134    /// Map from raw location ID to name (including externals).
135    location_names: SecondaryMap<LocationKey, String>,
136
137    /// Application name used in telemetry.
138    #[cfg_attr(
139        not(feature = "build"),
140        expect(dead_code, reason = "unused without build")
141    )]
142    flow_name: String,
143
144    /// Tracks whether this flow has been finalized; it is an error to
145    /// drop without finalizing.
146    finalized: bool,
147
148    /// 'a on a FlowBuilder is used to ensure that staged code does not
149    /// capture more data that it is allowed to; 'a is generated at the
150    /// entrypoint of the staged code and we keep it invariant here
151    /// to enforce the appropriate constraints
152    _phantom: Invariant<'a>,
153}
154
155impl Drop for FlowBuilder<'_> {
156    fn drop(&mut self) {
157        if !self.finalized && !std::thread::panicking() {
158            panic!(
159                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
160            );
161        }
162    }
163}
164
165#[expect(missing_docs, reason = "TODO")]
166impl<'a> FlowBuilder<'a> {
167    /// Creates a new `FlowBuilder` to construct a Hydro program, using the Cargo package name as the program name.
168    #[expect(
169        clippy::new_without_default,
170        reason = "call `new` explicitly, not `default`"
171    )]
172    pub fn new() -> Self {
173        let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
174        if let Ok(bin_path) = std::env::current_exe()
175            && let Some(bin_name) = bin_path.file_stem()
176        {
177            name = format!("{}/{}", name, bin_name.display());
178        }
179        Self::with_name(name)
180    }
181
182    /// Creates a new `FlowBuilder` to construct a Hydro program, with the given program name.
183    pub fn with_name(name: impl Into<String>) -> Self {
184        Self {
185            flow_state: Rc::new(RefCell::new(FlowStateInner {
186                roots: Some(vec![]),
187                next_external_port: ExternalPortId::default(),
188                next_cycle_id: CycleId::default(),
189                next_clock_id: ClockId::default(),
190                next_sidecar_id: SidecarId::default(),
191                sidecars: Vec::new(),
192            })),
193            locations: SlotMap::with_key(),
194            location_names: SecondaryMap::new(),
195            flow_name: name.into(),
196            finalized: false,
197            _phantom: PhantomData,
198        }
199    }
200
201    pub(crate) fn flow_state(&self) -> &FlowState {
202        &self.flow_state
203    }
204
205    pub fn process<P>(&mut self) -> Process<'a, P> {
206        let key = self.locations.insert(LocationType::Process);
207        self.location_names.insert(key, type_name::<P>().to_owned());
208        Process {
209            key,
210            flow_state: self.flow_state().clone(),
211            _phantom: PhantomData,
212        }
213    }
214
215    pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
216        let key = self.locations.insert(LocationType::Cluster);
217        self.location_names.insert(key, type_name::<C>().to_owned());
218        Cluster {
219            key,
220            flow_state: self.flow_state().clone(),
221            _phantom: PhantomData,
222        }
223    }
224
225    pub fn external<E>(&mut self) -> External<'a, E> {
226        let key = self.locations.insert(LocationType::External);
227        self.location_names.insert(key, type_name::<E>().to_owned());
228        External {
229            key,
230            flow_state: self.flow_state().clone(),
231            _phantom: PhantomData,
232        }
233    }
234}
235
236#[cfg(feature = "build")]
237#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
238#[expect(missing_docs, reason = "TODO")]
239impl<'a> FlowBuilder<'a> {
240    pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
241        self.finalized = true;
242
243        let mut flow_state = self.flow_state.borrow_mut();
244        let mut ir = flow_state.roots.take().unwrap();
245        let sidecars = std::mem::take(&mut flow_state.sidecars);
246        drop(flow_state);
247
248        super::ir::unify_atomic_ticks(&mut ir);
249
250        super::built::BuiltFlow {
251            ir,
252            locations: std::mem::take(&mut self.locations),
253            location_names: std::mem::take(&mut self.location_names),
254            sidecars,
255            flow_name: std::mem::take(&mut self.flow_name),
256            _phantom: PhantomData,
257        }
258    }
259
260    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
261        self.finalize().with_default_optimize()
262    }
263
264    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
265        self.finalize().optimize_with(f)
266    }
267
268    pub fn with_process<P, D: Deploy<'a>>(
269        self,
270        process: &Process<P>,
271        spec: impl IntoProcessSpec<'a, D>,
272    ) -> DeployFlow<'a, D> {
273        self.with_default_optimize().with_process(process, spec)
274    }
275
276    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
277        self,
278        spec: impl Fn() -> S,
279    ) -> DeployFlow<'a, D> {
280        self.with_default_optimize().with_remaining_processes(spec)
281    }
282
283    pub fn with_external<P, D: Deploy<'a>>(
284        self,
285        process: &External<P>,
286        spec: impl ExternalSpec<'a, D>,
287    ) -> DeployFlow<'a, D> {
288        self.with_default_optimize().with_external(process, spec)
289    }
290
291    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
292        self,
293        spec: impl Fn() -> S,
294    ) -> DeployFlow<'a, D> {
295        self.with_default_optimize().with_remaining_externals(spec)
296    }
297
298    pub fn with_cluster<C, D: Deploy<'a>>(
299        self,
300        cluster: &Cluster<C>,
301        spec: impl ClusterSpec<'a, D>,
302    ) -> DeployFlow<'a, D> {
303        self.with_default_optimize().with_cluster(cluster, spec)
304    }
305
306    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
307        self,
308        spec: impl Fn() -> S,
309    ) -> DeployFlow<'a, D> {
310        self.with_default_optimize().with_remaining_clusters(spec)
311    }
312
313    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
314        self.with_default_optimize::<D>().compile()
315    }
316
317    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
318        self.with_default_optimize().deploy(env)
319    }
320
321    #[cfg(feature = "sim")]
322    /// Creates a simulation for this builder, which can be used to run deterministic simulations
323    /// of the Hydro program.
324    pub fn sim(self) -> SimFlow<'a> {
325        self.finalize().sim()
326    }
327
328    pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
329        FlowBuilder {
330            flow_state: Rc::new(RefCell::new(FlowStateInner {
331                roots: None,
332                next_external_port: ExternalPortId::default(),
333                next_cycle_id: CycleId::default(),
334                next_clock_id: ClockId::default(),
335                next_sidecar_id: SidecarId::default(),
336                sidecars: Vec::new(),
337            })),
338            locations: built.locations.clone(),
339            location_names: built.location_names.clone(),
340            flow_name: built.flow_name.clone(),
341            finalized: false,
342            _phantom: PhantomData,
343        }
344    }
345
346    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
347    pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
348        self.flow_state.borrow_mut().roots = Some(roots);
349    }
350
351    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
352    pub fn next_clock_id(&mut self) -> ClockId {
353        self.flow_state.borrow_mut().next_clock_id()
354    }
355
356    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
357    pub fn next_cycle_id(&mut self) -> CycleId {
358        self.flow_state.borrow_mut().next_cycle_id()
359    }
360}