Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37/// Wrapper that displays only the tokens of a parsed expr.
38///
39/// Boxes `syn::Type` which is ~240 bytes.
40#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45        serializer.serialize_str(&self.to_string())
46    }
47}
48
49impl From<syn::Expr> for DebugExpr {
50    fn from(expr: syn::Expr) -> Self {
51        Self(Box::new(expr))
52    }
53}
54
55impl Deref for DebugExpr {
56    type Target = syn::Expr;
57
58    fn deref(&self) -> &Self::Target {
59        &self.0
60    }
61}
62
63impl ToTokens for DebugExpr {
64    fn to_tokens(&self, tokens: &mut TokenStream) {
65        self.0.to_tokens(tokens);
66    }
67}
68
69impl Debug for DebugExpr {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "{}", self.0.to_token_stream())
72    }
73}
74
75impl Display for DebugExpr {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        let original = self.0.as_ref().clone();
78        let simplified = simplify_q_macro(original);
79
80        // For now, just use quote formatting without trying to parse as a statement
81        // This avoids the syn::parse_quote! issues entirely
82        write!(f, "q!({})", quote::quote!(#simplified))
83    }
84}
85
86/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
87fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88    // Try to parse the token string as a syn::Expr
89    // Use a visitor to simplify q! macro expansions
90    let mut simplifier = QMacroSimplifier::new();
91    simplifier.visit_expr_mut(&mut expr);
92
93    // If we found and simplified a q! macro, return the simplified version
94    if let Some(simplified) = simplifier.simplified_result {
95        simplified
96    } else {
97        expr
98    }
99}
100
101/// AST visitor that simplifies q! macro expansions
102#[derive(Default)]
103pub struct QMacroSimplifier {
104    pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108    pub fn new() -> Self {
109        Self::default()
110    }
111}
112
113impl VisitMut for QMacroSimplifier {
114    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115        // Check if we already found a result to avoid further processing
116        if self.simplified_result.is_some() {
117            return;
118        }
119
120        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121            // Look for calls to stageleft::runtime_support::fn*
122            && self.is_stageleft_runtime_support_call(&path_expr.path)
123            // Try to extract the closure from the arguments
124            && let Some(closure) = self.extract_closure_from_args(&call.args)
125        {
126            self.simplified_result = Some(closure);
127            return;
128        }
129
130        // Continue visiting child expressions using the default implementation
131        // Use the default visitor to avoid infinite recursion
132        syn::visit_mut::visit_expr_mut(self, expr);
133    }
134}
135
136impl QMacroSimplifier {
137    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138        // Check if this is a call to stageleft::runtime_support::fn*
139        if let Some(last_segment) = path.segments.last() {
140            let fn_name = last_segment.ident.to_string();
141            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
142            fn_name.contains("_type_hint")
143                && path.segments.len() > 2
144                && path.segments[0].ident == "stageleft"
145                && path.segments[1].ident == "runtime_support"
146        } else {
147            false
148        }
149    }
150
151    fn extract_closure_from_args(
152        &self,
153        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154    ) -> Option<syn::Expr> {
155        // Look through the arguments for a closure expression
156        for arg in args {
157            if let syn::Expr::Closure(_) = arg {
158                return Some(arg.clone());
159            }
160            // Also check for closures nested in other expressions (like blocks)
161            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162                return Some(closure_expr);
163            }
164        }
165        None
166    }
167
168    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169        let mut visitor = ClosureFinder {
170            found_closure: None,
171            prefer_inner_blocks: true,
172        };
173        visitor.visit_expr(expr);
174        visitor.found_closure
175    }
176}
177
178/// Visitor that finds closures in expressions with special block handling
179struct ClosureFinder {
180    found_closure: Option<syn::Expr>,
181    prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186        // If we already found a closure, don't continue searching
187        if self.found_closure.is_some() {
188            return;
189        }
190
191        match expr {
192            syn::Expr::Closure(_) => {
193                self.found_closure = Some(expr.clone());
194            }
195            syn::Expr::Block(block) if self.prefer_inner_blocks => {
196                // Special handling for blocks - look for inner blocks that contain closures
197                for stmt in &block.block.stmts {
198                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
199                        && let syn::Expr::Block(_) = stmt_expr
200                    {
201                        // Check if this nested block contains a closure
202                        let mut inner_visitor = ClosureFinder {
203                            found_closure: None,
204                            prefer_inner_blocks: false, // Avoid infinite recursion
205                        };
206                        inner_visitor.visit_expr(stmt_expr);
207                        if inner_visitor.found_closure.is_some() {
208                            // Found a closure in an inner block, return that block
209                            self.found_closure = Some(stmt_expr.clone());
210                            return;
211                        }
212                    }
213                }
214
215                // If no inner block with closure found, continue with normal visitation
216                visit::visit_expr(self, expr);
217
218                // If we found a closure, just return the closure itself, not the whole block
219                // unless we're in the special case where we want the containing block
220                if self.found_closure.is_some() {
221                    // The closure was found during visitation, no need to wrap in block
222                }
223            }
224            _ => {
225                // Use default visitor behavior for all other expressions
226                visit::visit_expr(self, expr);
227            }
228        }
229    }
230}
231
232/// Debug displays the type's tokens.
233///
234/// Boxes `syn::Type` which is ~320 bytes.
235#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239    fn from(t: syn::Type) -> Self {
240        Self(Box::new(t))
241    }
242}
243
244impl Deref for DebugType {
245    type Target = syn::Type;
246
247    fn deref(&self) -> &Self::Target {
248        &self.0
249    }
250}
251
252impl ToTokens for DebugType {
253    fn to_tokens(&self, tokens: &mut TokenStream) {
254        self.0.to_tokens(tokens);
255    }
256}
257
258impl Debug for DebugType {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        write!(f, "{}", self.0.to_token_stream())
261    }
262}
263
264impl serde::Serialize for DebugType {
265    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267    }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271    backtrace: &Backtrace,
272    serializer: S,
273) -> Result<S::Ok, S::Error> {
274    match backtrace.format_span() {
275        Some(span) => serializer.serialize_some(&span),
276        None => serializer.serialize_none(),
277    }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281    ident: &syn::Ident,
282    serializer: S,
283) -> Result<S::Ok, S::Error> {
284    serializer.serialize_str(&ident.to_string())
285}
286
287fn serialize_singleton_refs<S: serde::Serializer>(
288    refs: &[(syn::Ident, HydroNode)],
289    serializer: S,
290) -> Result<S::Ok, S::Error> {
291    use serde::ser::SerializeSeq;
292    let mut seq = serializer.serialize_seq(Some(refs.len()))?;
293    for (ident, node) in refs {
294        seq.serialize_element(&(ident.to_string(), node))?;
295    }
296    seq.end()
297}
298
299pub enum DebugInstantiate {
300    Building,
301    Finalized(Box<DebugInstantiateFinalized>),
302}
303
304impl serde::Serialize for DebugInstantiate {
305    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
306        match self {
307            DebugInstantiate::Building => {
308                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
309            }
310            DebugInstantiate::Finalized(_) => {
311                panic!(
312                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
313                )
314            }
315        }
316    }
317}
318
319#[cfg_attr(
320    not(feature = "build"),
321    expect(
322        dead_code,
323        reason = "sink, source unused without `feature = \"build\"`."
324    )
325)]
326pub struct DebugInstantiateFinalized {
327    sink: syn::Expr,
328    source: syn::Expr,
329    connect_fn: Option<Box<dyn FnOnce()>>,
330}
331
332impl From<DebugInstantiateFinalized> for DebugInstantiate {
333    fn from(f: DebugInstantiateFinalized) -> Self {
334        Self::Finalized(Box::new(f))
335    }
336}
337
338impl Debug for DebugInstantiate {
339    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340        write!(f, "<network instantiate>")
341    }
342}
343
344impl Hash for DebugInstantiate {
345    fn hash<H: Hasher>(&self, _state: &mut H) {
346        // Do nothing
347    }
348}
349
350impl Clone for DebugInstantiate {
351    fn clone(&self) -> Self {
352        match self {
353            DebugInstantiate::Building => DebugInstantiate::Building,
354            DebugInstantiate::Finalized(_) => {
355                panic!("DebugInstantiate::Finalized should not be cloned")
356            }
357        }
358    }
359}
360
361/// Tracks the instantiation state of a `ClusterMembers` source.
362///
363/// During `compile_network`, the first `ClusterMembers` node for a given
364/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
365/// receives the expression returned by `Deploy::cluster_membership_stream`.
366/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
367/// during code-gen they simply reference the tee output of the first node
368/// instead of creating a redundant `source_stream`.
369#[derive(Debug, Hash, Clone, serde::Serialize)]
370pub enum ClusterMembersState {
371    /// Not yet instantiated.
372    Uninit,
373    /// The primary instance: holds the stream expression and will emit
374    /// `source_stream(expr) -> tee()` during code-gen.
375    Stream(DebugExpr),
376    /// A secondary instance that references the tee output of the primary.
377    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
378    /// can derive the deterministic tee ident without extra state.
379    Tee(LocationId, LocationId),
380}
381
382/// A source in a Hydro graph, where data enters the graph.
383#[derive(Debug, Hash, Clone, serde::Serialize)]
384pub enum HydroSource {
385    Stream(DebugExpr),
386    ExternalNetwork(),
387    Iter(DebugExpr),
388    Spin(),
389    ClusterMembers(LocationId, ClusterMembersState),
390    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
391    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
392}
393
394#[cfg(feature = "build")]
395/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
396/// and simulations.
397///
398/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
399/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
400pub trait DfirBuilder {
401    /// Whether the representation of singletons should include intermediate states.
402    fn singleton_intermediates(&self) -> bool;
403
404    /// Gets the DFIR builder for the given location, creating it if necessary.
405    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
406
407    #[expect(clippy::too_many_arguments, reason = "TODO")]
408    fn batch(
409        &mut self,
410        in_ident: syn::Ident,
411        in_location: &LocationId,
412        in_kind: &CollectionKind,
413        out_ident: &syn::Ident,
414        out_location: &LocationId,
415        op_meta: &HydroIrOpMetadata,
416        fold_hooked_idents: &HashSet<String>,
417    );
418    fn yield_from_tick(
419        &mut self,
420        in_ident: syn::Ident,
421        in_location: &LocationId,
422        in_kind: &CollectionKind,
423        out_ident: &syn::Ident,
424        out_location: &LocationId,
425    );
426
427    fn begin_atomic(
428        &mut self,
429        in_ident: syn::Ident,
430        in_location: &LocationId,
431        in_kind: &CollectionKind,
432        out_ident: &syn::Ident,
433        out_location: &LocationId,
434        op_meta: &HydroIrOpMetadata,
435    );
436    fn end_atomic(
437        &mut self,
438        in_ident: syn::Ident,
439        in_location: &LocationId,
440        in_kind: &CollectionKind,
441        out_ident: &syn::Ident,
442    );
443
444    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
445    fn observe_nondet(
446        &mut self,
447        trusted: bool,
448        location: &LocationId,
449        in_ident: syn::Ident,
450        in_kind: &CollectionKind,
451        out_ident: &syn::Ident,
452        out_kind: &CollectionKind,
453        op_meta: &HydroIrOpMetadata,
454    );
455
456    #[expect(clippy::too_many_arguments, reason = "TODO")]
457    fn merge_ordered(
458        &mut self,
459        location: &LocationId,
460        first_ident: syn::Ident,
461        second_ident: syn::Ident,
462        out_ident: &syn::Ident,
463        in_kind: &CollectionKind,
464        op_meta: &HydroIrOpMetadata,
465        operator_tag: Option<&str>,
466    );
467
468    #[expect(clippy::too_many_arguments, reason = "TODO")]
469    fn create_network(
470        &mut self,
471        from: &LocationId,
472        to: &LocationId,
473        input_ident: syn::Ident,
474        out_ident: &syn::Ident,
475        serialize: Option<&DebugExpr>,
476        sink: syn::Expr,
477        source: syn::Expr,
478        deserialize: Option<&DebugExpr>,
479        tag_id: usize,
480        networking_info: &crate::networking::NetworkingInfo,
481    );
482
483    fn create_external_source(
484        &mut self,
485        on: &LocationId,
486        source_expr: syn::Expr,
487        out_ident: &syn::Ident,
488        deserialize: Option<&DebugExpr>,
489        tag_id: usize,
490    );
491
492    fn create_external_output(
493        &mut self,
494        on: &LocationId,
495        sink_expr: syn::Expr,
496        input_ident: &syn::Ident,
497        serialize: Option<&DebugExpr>,
498        tag_id: usize,
499    );
500
501    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
502    /// Returns the new input ident to use for the fold if a hook was emitted.
503    fn emit_fold_hook(
504        &mut self,
505        location: &LocationId,
506        in_ident: &syn::Ident,
507        in_kind: &CollectionKind,
508        op_meta: &HydroIrOpMetadata,
509    ) -> Option<syn::Ident>;
510}
511
512#[cfg(feature = "build")]
513impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
514    fn singleton_intermediates(&self) -> bool {
515        false
516    }
517
518    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
519        self.entry(location.root().key())
520            .expect("location was removed")
521            .or_default()
522    }
523
524    fn batch(
525        &mut self,
526        in_ident: syn::Ident,
527        in_location: &LocationId,
528        in_kind: &CollectionKind,
529        out_ident: &syn::Ident,
530        _out_location: &LocationId,
531        _op_meta: &HydroIrOpMetadata,
532        _fold_hooked_idents: &HashSet<String>,
533    ) {
534        let builder = self.get_dfir_mut(in_location.root());
535        if in_kind.is_bounded()
536            && matches!(
537                in_kind,
538                CollectionKind::Singleton { .. }
539                    | CollectionKind::Optional { .. }
540                    | CollectionKind::KeyedSingleton { .. }
541            )
542        {
543            assert!(in_location.is_top_level());
544            builder.add_dfir(
545                parse_quote! {
546                    #out_ident = #in_ident -> persist::<'static>();
547                },
548                None,
549                None,
550            );
551        } else {
552            builder.add_dfir(
553                parse_quote! {
554                    #out_ident = #in_ident;
555                },
556                None,
557                None,
558            );
559        }
560    }
561
562    fn yield_from_tick(
563        &mut self,
564        in_ident: syn::Ident,
565        in_location: &LocationId,
566        _in_kind: &CollectionKind,
567        out_ident: &syn::Ident,
568        _out_location: &LocationId,
569    ) {
570        let builder = self.get_dfir_mut(in_location.root());
571        builder.add_dfir(
572            parse_quote! {
573                #out_ident = #in_ident;
574            },
575            None,
576            None,
577        );
578    }
579
580    fn begin_atomic(
581        &mut self,
582        in_ident: syn::Ident,
583        in_location: &LocationId,
584        _in_kind: &CollectionKind,
585        out_ident: &syn::Ident,
586        _out_location: &LocationId,
587        _op_meta: &HydroIrOpMetadata,
588    ) {
589        let builder = self.get_dfir_mut(in_location.root());
590        builder.add_dfir(
591            parse_quote! {
592                #out_ident = #in_ident;
593            },
594            None,
595            None,
596        );
597    }
598
599    fn end_atomic(
600        &mut self,
601        in_ident: syn::Ident,
602        in_location: &LocationId,
603        _in_kind: &CollectionKind,
604        out_ident: &syn::Ident,
605    ) {
606        let builder = self.get_dfir_mut(in_location.root());
607        builder.add_dfir(
608            parse_quote! {
609                #out_ident = #in_ident;
610            },
611            None,
612            None,
613        );
614    }
615
616    fn observe_nondet(
617        &mut self,
618        _trusted: bool,
619        location: &LocationId,
620        in_ident: syn::Ident,
621        _in_kind: &CollectionKind,
622        out_ident: &syn::Ident,
623        _out_kind: &CollectionKind,
624        _op_meta: &HydroIrOpMetadata,
625    ) {
626        let builder = self.get_dfir_mut(location);
627        builder.add_dfir(
628            parse_quote! {
629                #out_ident = #in_ident;
630            },
631            None,
632            None,
633        );
634    }
635
636    fn merge_ordered(
637        &mut self,
638        location: &LocationId,
639        first_ident: syn::Ident,
640        second_ident: syn::Ident,
641        out_ident: &syn::Ident,
642        _in_kind: &CollectionKind,
643        _op_meta: &HydroIrOpMetadata,
644        operator_tag: Option<&str>,
645    ) {
646        let builder = self.get_dfir_mut(location);
647        builder.add_dfir(
648            parse_quote! {
649                #out_ident = union();
650                #first_ident -> [0]#out_ident;
651                #second_ident -> [1]#out_ident;
652            },
653            None,
654            operator_tag,
655        );
656    }
657
658    fn create_network(
659        &mut self,
660        from: &LocationId,
661        to: &LocationId,
662        input_ident: syn::Ident,
663        out_ident: &syn::Ident,
664        serialize: Option<&DebugExpr>,
665        sink: syn::Expr,
666        source: syn::Expr,
667        deserialize: Option<&DebugExpr>,
668        tag_id: usize,
669        _networking_info: &crate::networking::NetworkingInfo,
670    ) {
671        let sender_builder = self.get_dfir_mut(from);
672        if let Some(serialize_pipeline) = serialize {
673            sender_builder.add_dfir(
674                parse_quote! {
675                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
676                },
677                None,
678                // operator tag separates send and receive, which otherwise have the same next_stmt_id
679                Some(&format!("send{}", tag_id)),
680            );
681        } else {
682            sender_builder.add_dfir(
683                parse_quote! {
684                    #input_ident -> dest_sink(#sink);
685                },
686                None,
687                Some(&format!("send{}", tag_id)),
688            );
689        }
690
691        let receiver_builder = self.get_dfir_mut(to);
692        if let Some(deserialize_pipeline) = deserialize {
693            receiver_builder.add_dfir(
694                parse_quote! {
695                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
696                },
697                None,
698                Some(&format!("recv{}", tag_id)),
699            );
700        } else {
701            receiver_builder.add_dfir(
702                parse_quote! {
703                    #out_ident = source_stream(#source);
704                },
705                None,
706                Some(&format!("recv{}", tag_id)),
707            );
708        }
709    }
710
711    fn create_external_source(
712        &mut self,
713        on: &LocationId,
714        source_expr: syn::Expr,
715        out_ident: &syn::Ident,
716        deserialize: Option<&DebugExpr>,
717        tag_id: usize,
718    ) {
719        let receiver_builder = self.get_dfir_mut(on);
720        if let Some(deserialize_pipeline) = deserialize {
721            receiver_builder.add_dfir(
722                parse_quote! {
723                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
724                },
725                None,
726                Some(&format!("recv{}", tag_id)),
727            );
728        } else {
729            receiver_builder.add_dfir(
730                parse_quote! {
731                    #out_ident = source_stream(#source_expr);
732                },
733                None,
734                Some(&format!("recv{}", tag_id)),
735            );
736        }
737    }
738
739    fn create_external_output(
740        &mut self,
741        on: &LocationId,
742        sink_expr: syn::Expr,
743        input_ident: &syn::Ident,
744        serialize: Option<&DebugExpr>,
745        tag_id: usize,
746    ) {
747        let sender_builder = self.get_dfir_mut(on);
748        if let Some(serialize_fn) = serialize {
749            sender_builder.add_dfir(
750                parse_quote! {
751                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
752                },
753                None,
754                // operator tag separates send and receive, which otherwise have the same next_stmt_id
755                Some(&format!("send{}", tag_id)),
756            );
757        } else {
758            sender_builder.add_dfir(
759                parse_quote! {
760                    #input_ident -> dest_sink(#sink_expr);
761                },
762                None,
763                Some(&format!("send{}", tag_id)),
764            );
765        }
766    }
767
768    fn emit_fold_hook(
769        &mut self,
770        _location: &LocationId,
771        _in_ident: &syn::Ident,
772        _in_kind: &CollectionKind,
773        _op_meta: &HydroIrOpMetadata,
774    ) -> Option<syn::Ident> {
775        None
776    }
777}
778
779#[cfg(feature = "build")]
780pub enum BuildersOrCallback<'a, L, N>
781where
782    L: FnMut(&mut HydroRoot, &mut usize),
783    N: FnMut(&mut HydroNode, &mut usize),
784{
785    Builders(&'a mut dyn DfirBuilder),
786    Callback(L, N),
787}
788
789/// An root in a Hydro graph, which is an pipeline that doesn't emit
790/// any downstream values. Traversals over the dataflow graph and
791/// generating DFIR IR start from roots.
792#[derive(Debug, Hash, serde::Serialize)]
793pub enum HydroRoot {
794    ForEach {
795        f: DebugExpr,
796        input: Box<HydroNode>,
797        op_metadata: HydroIrOpMetadata,
798    },
799    SendExternal {
800        to_external_key: LocationKey,
801        to_port_id: ExternalPortId,
802        to_many: bool,
803        unpaired: bool,
804        serialize_fn: Option<DebugExpr>,
805        instantiate_fn: DebugInstantiate,
806        input: Box<HydroNode>,
807        op_metadata: HydroIrOpMetadata,
808    },
809    DestSink {
810        sink: DebugExpr,
811        input: Box<HydroNode>,
812        op_metadata: HydroIrOpMetadata,
813    },
814    CycleSink {
815        cycle_id: CycleId,
816        input: Box<HydroNode>,
817        op_metadata: HydroIrOpMetadata,
818    },
819    EmbeddedOutput {
820        #[serde(serialize_with = "serialize_ident")]
821        ident: syn::Ident,
822        input: Box<HydroNode>,
823        op_metadata: HydroIrOpMetadata,
824    },
825    Null {
826        input: Box<HydroNode>,
827        op_metadata: HydroIrOpMetadata,
828    },
829}
830
831impl HydroRoot {
832    #[cfg(feature = "build")]
833    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
834    pub fn compile_network<'a, D>(
835        &mut self,
836        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
837        seen_tees: &mut SeenSharedNodes,
838        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
839        processes: &SparseSecondaryMap<LocationKey, D::Process>,
840        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
841        externals: &SparseSecondaryMap<LocationKey, D::External>,
842        env: &mut D::InstantiateEnv,
843    ) where
844        D: Deploy<'a>,
845    {
846        let refcell_extra_stmts = RefCell::new(extra_stmts);
847        let refcell_env = RefCell::new(env);
848        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
849        self.transform_bottom_up(
850            &mut |l| {
851                if let HydroRoot::SendExternal {
852                    input,
853                    to_external_key,
854                    to_port_id,
855                    to_many,
856                    unpaired,
857                    instantiate_fn,
858                    ..
859                } = l
860                {
861                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
862                        DebugInstantiate::Building => {
863                            let to_node = externals
864                                .get(*to_external_key)
865                                .unwrap_or_else(|| {
866                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
867                                })
868                                .clone();
869
870                            match input.metadata().location_id.root() {
871                                &LocationId::Process(process_key) => {
872                                    if *to_many {
873                                        (
874                                            (
875                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
876                                                parse_quote!(DUMMY),
877                                            ),
878                                            Box::new(|| {}) as Box<dyn FnOnce()>,
879                                        )
880                                    } else {
881                                        let from_node = processes
882                                            .get(process_key)
883                                            .unwrap_or_else(|| {
884                                                panic!("A process used in the graph was not instantiated: {}", process_key)
885                                            })
886                                            .clone();
887
888                                        let sink_port = from_node.next_port();
889                                        let source_port = to_node.next_port();
890
891                                        if *unpaired {
892                                            use stageleft::quote_type;
893                                            use tokio_util::codec::LengthDelimitedCodec;
894
895                                            to_node.register(*to_port_id, source_port.clone());
896
897                                            let _ = D::e2o_source(
898                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
899                                                &to_node, &source_port,
900                                                &from_node, &sink_port,
901                                                &quote_type::<LengthDelimitedCodec>(),
902                                                format!("{}_{}", *to_external_key, *to_port_id)
903                                            );
904                                        }
905
906                                        (
907                                            (
908                                                D::o2e_sink(
909                                                    &from_node,
910                                                    &sink_port,
911                                                    &to_node,
912                                                    &source_port,
913                                                    format!("{}_{}", *to_external_key, *to_port_id)
914                                                ),
915                                                parse_quote!(DUMMY),
916                                            ),
917                                            if *unpaired {
918                                                D::e2o_connect(
919                                                    &to_node,
920                                                    &source_port,
921                                                    &from_node,
922                                                    &sink_port,
923                                                    *to_many,
924                                                    NetworkHint::Auto,
925                                                )
926                                            } else {
927                                                Box::new(|| {}) as Box<dyn FnOnce()>
928                                            },
929                                        )
930                                    }
931                                }
932                                LocationId::Cluster(cluster_key) => {
933                                    let from_node = clusters
934                                        .get(*cluster_key)
935                                        .unwrap_or_else(|| {
936                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
937                                        })
938                                        .clone();
939
940                                    let sink_port = from_node.next_port();
941                                    let source_port = to_node.next_port();
942
943                                    if *unpaired {
944                                        to_node.register(*to_port_id, source_port.clone());
945                                    }
946
947                                    (
948                                        (
949                                            D::m2e_sink(
950                                                &from_node,
951                                                &sink_port,
952                                                &to_node,
953                                                &source_port,
954                                                format!("{}_{}", *to_external_key, *to_port_id)
955                                            ),
956                                            parse_quote!(DUMMY),
957                                        ),
958                                        Box::new(|| {}) as Box<dyn FnOnce()>,
959                                    )
960                                }
961                                _ => panic!()
962                            }
963                        },
964
965                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
966                    };
967
968                    *instantiate_fn = DebugInstantiateFinalized {
969                        sink: sink_expr,
970                        source: source_expr,
971                        connect_fn: Some(connect_fn),
972                    }
973                    .into();
974                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
975                    let element_type = match &input.metadata().collection_kind {
976                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
977                        _ => panic!("Embedded output must have Stream collection kind"),
978                    };
979                    let location_key = match input.metadata().location_id.root() {
980                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
981                        _ => panic!("Embedded output must be on a process or cluster"),
982                    };
983                    D::register_embedded_output(
984                        &mut refcell_env.borrow_mut(),
985                        location_key,
986                        ident,
987                        &element_type,
988                    );
989                }
990            },
991            &mut |n| {
992                if let HydroNode::Network {
993                    name,
994                    networking_info,
995                    input,
996                    instantiate_fn,
997                    metadata,
998                    ..
999                } = n
1000                {
1001                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1002                        DebugInstantiate::Building => instantiate_network::<D>(
1003                            &mut refcell_env.borrow_mut(),
1004                            input.metadata().location_id.root(),
1005                            metadata.location_id.root(),
1006                            processes,
1007                            clusters,
1008                            name.as_deref(),
1009                            networking_info,
1010                        ),
1011
1012                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1013                    };
1014
1015                    *instantiate_fn = DebugInstantiateFinalized {
1016                        sink: sink_expr,
1017                        source: source_expr,
1018                        connect_fn: Some(connect_fn),
1019                    }
1020                    .into();
1021                } else if let HydroNode::ExternalInput {
1022                    from_external_key,
1023                    from_port_id,
1024                    from_many,
1025                    codec_type,
1026                    port_hint,
1027                    instantiate_fn,
1028                    metadata,
1029                    ..
1030                } = n
1031                {
1032                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1033                        DebugInstantiate::Building => {
1034                            let from_node = externals
1035                                .get(*from_external_key)
1036                                .unwrap_or_else(|| {
1037                                    panic!(
1038                                        "A external used in the graph was not instantiated: {}",
1039                                        from_external_key,
1040                                    )
1041                                })
1042                                .clone();
1043
1044                            match metadata.location_id.root() {
1045                                &LocationId::Process(process_key) => {
1046                                    let to_node = processes
1047                                        .get(process_key)
1048                                        .unwrap_or_else(|| {
1049                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1050                                        })
1051                                        .clone();
1052
1053                                    let sink_port = from_node.next_port();
1054                                    let source_port = to_node.next_port();
1055
1056                                    from_node.register(*from_port_id, sink_port.clone());
1057
1058                                    (
1059                                        (
1060                                            parse_quote!(DUMMY),
1061                                            if *from_many {
1062                                                D::e2o_many_source(
1063                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1064                                                    &to_node, &source_port,
1065                                                    codec_type.0.as_ref(),
1066                                                    format!("{}_{}", *from_external_key, *from_port_id)
1067                                                )
1068                                            } else {
1069                                                D::e2o_source(
1070                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1071                                                    &from_node, &sink_port,
1072                                                    &to_node, &source_port,
1073                                                    codec_type.0.as_ref(),
1074                                                    format!("{}_{}", *from_external_key, *from_port_id)
1075                                                )
1076                                            },
1077                                        ),
1078                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1079                                    )
1080                                }
1081                                LocationId::Cluster(cluster_key) => {
1082                                    let to_node = clusters
1083                                        .get(*cluster_key)
1084                                        .unwrap_or_else(|| {
1085                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1086                                        })
1087                                        .clone();
1088
1089                                    let sink_port = from_node.next_port();
1090                                    let source_port = to_node.next_port();
1091
1092                                    from_node.register(*from_port_id, sink_port.clone());
1093
1094                                    (
1095                                        (
1096                                            parse_quote!(DUMMY),
1097                                            D::e2m_source(
1098                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1099                                                &from_node, &sink_port,
1100                                                &to_node, &source_port,
1101                                                codec_type.0.as_ref(),
1102                                                format!("{}_{}", *from_external_key, *from_port_id)
1103                                            ),
1104                                        ),
1105                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1106                                    )
1107                                }
1108                                _ => panic!()
1109                            }
1110                        },
1111
1112                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1113                    };
1114
1115                    *instantiate_fn = DebugInstantiateFinalized {
1116                        sink: sink_expr,
1117                        source: source_expr,
1118                        connect_fn: Some(connect_fn),
1119                    }
1120                    .into();
1121                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1122                    let element_type = match &metadata.collection_kind {
1123                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1124                        _ => panic!("Embedded source must have Stream collection kind"),
1125                    };
1126                    let location_key = match metadata.location_id.root() {
1127                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1128                        _ => panic!("Embedded source must be on a process or cluster"),
1129                    };
1130                    D::register_embedded_stream_input(
1131                        &mut refcell_env.borrow_mut(),
1132                        location_key,
1133                        ident,
1134                        &element_type,
1135                    );
1136                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1137                    let element_type = match &metadata.collection_kind {
1138                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1139                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1140                    };
1141                    let location_key = match metadata.location_id.root() {
1142                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1143                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1144                    };
1145                    D::register_embedded_singleton_input(
1146                        &mut refcell_env.borrow_mut(),
1147                        location_key,
1148                        ident,
1149                        &element_type,
1150                    );
1151                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1152                    match state {
1153                        ClusterMembersState::Uninit => {
1154                            let at_location = metadata.location_id.root().clone();
1155                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1156                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1157                                // First occurrence: call cluster_membership_stream and mark as Stream.
1158                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1159                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1160                                    &(),
1161                                );
1162                                *state = ClusterMembersState::Stream(expr.into());
1163                            } else {
1164                                // Already instantiated for this (at, target) pair: just tee.
1165                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1166                            }
1167                        }
1168                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1169                            panic!("cluster members already finalized");
1170                        }
1171                    }
1172                }
1173            },
1174            seen_tees,
1175            false,
1176        );
1177    }
1178
1179    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1180        self.transform_bottom_up(
1181            &mut |l| {
1182                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1183                    match instantiate_fn {
1184                        DebugInstantiate::Building => panic!("network not built"),
1185
1186                        DebugInstantiate::Finalized(finalized) => {
1187                            (finalized.connect_fn.take().unwrap())();
1188                        }
1189                    }
1190                }
1191            },
1192            &mut |n| {
1193                if let HydroNode::Network { instantiate_fn, .. }
1194                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1195                {
1196                    match instantiate_fn {
1197                        DebugInstantiate::Building => panic!("network not built"),
1198
1199                        DebugInstantiate::Finalized(finalized) => {
1200                            (finalized.connect_fn.take().unwrap())();
1201                        }
1202                    }
1203                }
1204            },
1205            seen_tees,
1206            false,
1207        );
1208    }
1209
1210    pub fn transform_bottom_up(
1211        &mut self,
1212        transform_root: &mut impl FnMut(&mut HydroRoot),
1213        transform_node: &mut impl FnMut(&mut HydroNode),
1214        seen_tees: &mut SeenSharedNodes,
1215        check_well_formed: bool,
1216    ) {
1217        self.transform_children(
1218            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1219            seen_tees,
1220        );
1221
1222        transform_root(self);
1223    }
1224
1225    pub fn transform_children(
1226        &mut self,
1227        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1228        seen_tees: &mut SeenSharedNodes,
1229    ) {
1230        match self {
1231            HydroRoot::ForEach { input, .. }
1232            | HydroRoot::SendExternal { input, .. }
1233            | HydroRoot::DestSink { input, .. }
1234            | HydroRoot::CycleSink { input, .. }
1235            | HydroRoot::EmbeddedOutput { input, .. }
1236            | HydroRoot::Null { input, .. } => {
1237                transform(input, seen_tees);
1238            }
1239        }
1240    }
1241
1242    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1243        match self {
1244            HydroRoot::ForEach {
1245                f,
1246                input,
1247                op_metadata,
1248            } => HydroRoot::ForEach {
1249                f: f.clone(),
1250                input: Box::new(input.deep_clone(seen_tees)),
1251                op_metadata: op_metadata.clone(),
1252            },
1253            HydroRoot::SendExternal {
1254                to_external_key,
1255                to_port_id,
1256                to_many,
1257                unpaired,
1258                serialize_fn,
1259                instantiate_fn,
1260                input,
1261                op_metadata,
1262            } => HydroRoot::SendExternal {
1263                to_external_key: *to_external_key,
1264                to_port_id: *to_port_id,
1265                to_many: *to_many,
1266                unpaired: *unpaired,
1267                serialize_fn: serialize_fn.clone(),
1268                instantiate_fn: instantiate_fn.clone(),
1269                input: Box::new(input.deep_clone(seen_tees)),
1270                op_metadata: op_metadata.clone(),
1271            },
1272            HydroRoot::DestSink {
1273                sink,
1274                input,
1275                op_metadata,
1276            } => HydroRoot::DestSink {
1277                sink: sink.clone(),
1278                input: Box::new(input.deep_clone(seen_tees)),
1279                op_metadata: op_metadata.clone(),
1280            },
1281            HydroRoot::CycleSink {
1282                cycle_id,
1283                input,
1284                op_metadata,
1285            } => HydroRoot::CycleSink {
1286                cycle_id: *cycle_id,
1287                input: Box::new(input.deep_clone(seen_tees)),
1288                op_metadata: op_metadata.clone(),
1289            },
1290            HydroRoot::EmbeddedOutput {
1291                ident,
1292                input,
1293                op_metadata,
1294            } => HydroRoot::EmbeddedOutput {
1295                ident: ident.clone(),
1296                input: Box::new(input.deep_clone(seen_tees)),
1297                op_metadata: op_metadata.clone(),
1298            },
1299            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1300                input: Box::new(input.deep_clone(seen_tees)),
1301                op_metadata: op_metadata.clone(),
1302            },
1303        }
1304    }
1305
1306    #[cfg(feature = "build")]
1307    pub fn emit(
1308        &mut self,
1309        graph_builders: &mut dyn DfirBuilder,
1310        seen_tees: &mut SeenSharedNodes,
1311        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1312        next_stmt_id: &mut usize,
1313        fold_hooked_idents: &mut HashSet<String>,
1314    ) {
1315        self.emit_core(
1316            &mut BuildersOrCallback::<
1317                fn(&mut HydroRoot, &mut usize),
1318                fn(&mut HydroNode, &mut usize),
1319            >::Builders(graph_builders),
1320            seen_tees,
1321            built_tees,
1322            next_stmt_id,
1323            fold_hooked_idents,
1324        );
1325    }
1326
1327    #[cfg(feature = "build")]
1328    pub fn emit_core(
1329        &mut self,
1330        builders_or_callback: &mut BuildersOrCallback<
1331            impl FnMut(&mut HydroRoot, &mut usize),
1332            impl FnMut(&mut HydroNode, &mut usize),
1333        >,
1334        seen_tees: &mut SeenSharedNodes,
1335        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1336        next_stmt_id: &mut usize,
1337        fold_hooked_idents: &mut HashSet<String>,
1338    ) {
1339        match self {
1340            HydroRoot::ForEach { f, input, .. } => {
1341                let input_ident = input.emit_core(
1342                    builders_or_callback,
1343                    seen_tees,
1344                    built_tees,
1345                    next_stmt_id,
1346                    fold_hooked_idents,
1347                );
1348
1349                match builders_or_callback {
1350                    BuildersOrCallback::Builders(graph_builders) => {
1351                        graph_builders
1352                            .get_dfir_mut(&input.metadata().location_id)
1353                            .add_dfir(
1354                                parse_quote! {
1355                                    #input_ident -> for_each(#f);
1356                                },
1357                                None,
1358                                Some(&next_stmt_id.to_string()),
1359                            );
1360                    }
1361                    BuildersOrCallback::Callback(leaf_callback, _) => {
1362                        leaf_callback(self, next_stmt_id);
1363                    }
1364                }
1365
1366                *next_stmt_id += 1;
1367            }
1368
1369            HydroRoot::SendExternal {
1370                serialize_fn,
1371                instantiate_fn,
1372                input,
1373                ..
1374            } => {
1375                let input_ident = input.emit_core(
1376                    builders_or_callback,
1377                    seen_tees,
1378                    built_tees,
1379                    next_stmt_id,
1380                    fold_hooked_idents,
1381                );
1382
1383                match builders_or_callback {
1384                    BuildersOrCallback::Builders(graph_builders) => {
1385                        let (sink_expr, _) = match instantiate_fn {
1386                            DebugInstantiate::Building => (
1387                                syn::parse_quote!(DUMMY_SINK),
1388                                syn::parse_quote!(DUMMY_SOURCE),
1389                            ),
1390
1391                            DebugInstantiate::Finalized(finalized) => {
1392                                (finalized.sink.clone(), finalized.source.clone())
1393                            }
1394                        };
1395
1396                        graph_builders.create_external_output(
1397                            &input.metadata().location_id,
1398                            sink_expr,
1399                            &input_ident,
1400                            serialize_fn.as_ref(),
1401                            *next_stmt_id,
1402                        );
1403                    }
1404                    BuildersOrCallback::Callback(leaf_callback, _) => {
1405                        leaf_callback(self, next_stmt_id);
1406                    }
1407                }
1408
1409                *next_stmt_id += 1;
1410            }
1411
1412            HydroRoot::DestSink { sink, input, .. } => {
1413                let input_ident = input.emit_core(
1414                    builders_or_callback,
1415                    seen_tees,
1416                    built_tees,
1417                    next_stmt_id,
1418                    fold_hooked_idents,
1419                );
1420
1421                match builders_or_callback {
1422                    BuildersOrCallback::Builders(graph_builders) => {
1423                        graph_builders
1424                            .get_dfir_mut(&input.metadata().location_id)
1425                            .add_dfir(
1426                                parse_quote! {
1427                                    #input_ident -> dest_sink(#sink);
1428                                },
1429                                None,
1430                                Some(&next_stmt_id.to_string()),
1431                            );
1432                    }
1433                    BuildersOrCallback::Callback(leaf_callback, _) => {
1434                        leaf_callback(self, next_stmt_id);
1435                    }
1436                }
1437
1438                *next_stmt_id += 1;
1439            }
1440
1441            HydroRoot::CycleSink {
1442                cycle_id, input, ..
1443            } => {
1444                let input_ident = input.emit_core(
1445                    builders_or_callback,
1446                    seen_tees,
1447                    built_tees,
1448                    next_stmt_id,
1449                    fold_hooked_idents,
1450                );
1451
1452                match builders_or_callback {
1453                    BuildersOrCallback::Builders(graph_builders) => {
1454                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1455                            CollectionKind::KeyedSingleton {
1456                                key_type,
1457                                value_type,
1458                                ..
1459                            }
1460                            | CollectionKind::KeyedStream {
1461                                key_type,
1462                                value_type,
1463                                ..
1464                            } => {
1465                                parse_quote!((#key_type, #value_type))
1466                            }
1467                            CollectionKind::Stream { element_type, .. }
1468                            | CollectionKind::Singleton { element_type, .. }
1469                            | CollectionKind::Optional { element_type, .. } => {
1470                                parse_quote!(#element_type)
1471                            }
1472                        };
1473
1474                        let cycle_id_ident = cycle_id.as_ident();
1475                        graph_builders
1476                            .get_dfir_mut(&input.metadata().location_id)
1477                            .add_dfir(
1478                                parse_quote! {
1479                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1480                                },
1481                                None,
1482                                None,
1483                            );
1484                    }
1485                    // No ID, no callback
1486                    BuildersOrCallback::Callback(_, _) => {}
1487                }
1488            }
1489
1490            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1491                let input_ident = input.emit_core(
1492                    builders_or_callback,
1493                    seen_tees,
1494                    built_tees,
1495                    next_stmt_id,
1496                    fold_hooked_idents,
1497                );
1498
1499                match builders_or_callback {
1500                    BuildersOrCallback::Builders(graph_builders) => {
1501                        graph_builders
1502                            .get_dfir_mut(&input.metadata().location_id)
1503                            .add_dfir(
1504                                parse_quote! {
1505                                    #input_ident -> for_each(&mut #ident);
1506                                },
1507                                None,
1508                                Some(&next_stmt_id.to_string()),
1509                            );
1510                    }
1511                    BuildersOrCallback::Callback(leaf_callback, _) => {
1512                        leaf_callback(self, next_stmt_id);
1513                    }
1514                }
1515
1516                *next_stmt_id += 1;
1517            }
1518
1519            HydroRoot::Null { input, .. } => {
1520                let input_ident = input.emit_core(
1521                    builders_or_callback,
1522                    seen_tees,
1523                    built_tees,
1524                    next_stmt_id,
1525                    fold_hooked_idents,
1526                );
1527
1528                match builders_or_callback {
1529                    BuildersOrCallback::Builders(graph_builders) => {
1530                        graph_builders
1531                            .get_dfir_mut(&input.metadata().location_id)
1532                            .add_dfir(
1533                                parse_quote! {
1534                                    #input_ident -> for_each(|_| {});
1535                                },
1536                                None,
1537                                Some(&next_stmt_id.to_string()),
1538                            );
1539                    }
1540                    BuildersOrCallback::Callback(leaf_callback, _) => {
1541                        leaf_callback(self, next_stmt_id);
1542                    }
1543                }
1544
1545                *next_stmt_id += 1;
1546            }
1547        }
1548    }
1549
1550    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1551        match self {
1552            HydroRoot::ForEach { op_metadata, .. }
1553            | HydroRoot::SendExternal { op_metadata, .. }
1554            | HydroRoot::DestSink { op_metadata, .. }
1555            | HydroRoot::CycleSink { op_metadata, .. }
1556            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1557            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1558        }
1559    }
1560
1561    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1562        match self {
1563            HydroRoot::ForEach { op_metadata, .. }
1564            | HydroRoot::SendExternal { op_metadata, .. }
1565            | HydroRoot::DestSink { op_metadata, .. }
1566            | HydroRoot::CycleSink { op_metadata, .. }
1567            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1568            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1569        }
1570    }
1571
1572    pub fn input(&self) -> &HydroNode {
1573        match self {
1574            HydroRoot::ForEach { input, .. }
1575            | HydroRoot::SendExternal { input, .. }
1576            | HydroRoot::DestSink { input, .. }
1577            | HydroRoot::CycleSink { input, .. }
1578            | HydroRoot::EmbeddedOutput { input, .. }
1579            | HydroRoot::Null { input, .. } => input,
1580        }
1581    }
1582
1583    pub fn input_metadata(&self) -> &HydroIrMetadata {
1584        self.input().metadata()
1585    }
1586
1587    pub fn print_root(&self) -> String {
1588        match self {
1589            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1590            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1591            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1592            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1593            HydroRoot::EmbeddedOutput { ident, .. } => {
1594                format!("EmbeddedOutput({})", ident)
1595            }
1596            HydroRoot::Null { .. } => "Null".to_owned(),
1597        }
1598    }
1599
1600    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1601        match self {
1602            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1603                transform(f);
1604            }
1605            HydroRoot::SendExternal { .. }
1606            | HydroRoot::CycleSink { .. }
1607            | HydroRoot::EmbeddedOutput { .. }
1608            | HydroRoot::Null { .. } => {}
1609        }
1610    }
1611}
1612
1613#[cfg(feature = "build")]
1614fn tick_of(loc: &LocationId) -> Option<ClockId> {
1615    match loc {
1616        LocationId::Tick(id, _) => Some(*id),
1617        LocationId::Atomic(inner) => tick_of(inner),
1618        _ => None,
1619    }
1620}
1621
1622#[cfg(feature = "build")]
1623fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1624    match loc {
1625        LocationId::Tick(id, inner) => {
1626            *id = uf_find(uf, *id);
1627            remap_location(inner, uf);
1628        }
1629        LocationId::Atomic(inner) => {
1630            remap_location(inner, uf);
1631        }
1632        LocationId::Process(_) | LocationId::Cluster(_) => {}
1633    }
1634}
1635
1636#[cfg(feature = "build")]
1637fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1638    let p = *parent.get(&x).unwrap_or(&x);
1639    if p == x {
1640        return x;
1641    }
1642    let root = uf_find(parent, p);
1643    parent.insert(x, root);
1644    root
1645}
1646
1647#[cfg(feature = "build")]
1648fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1649    let ra = uf_find(parent, a);
1650    let rb = uf_find(parent, b);
1651    if ra != rb {
1652        parent.insert(ra, rb);
1653    }
1654}
1655
1656/// Traverse the IR to build a union-find that unifies tick IDs connected
1657/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1658/// rewrite all `LocationId`s to use the representative tick ID.
1659#[cfg(feature = "build")]
1660pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1661    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1662
1663    // Pass 1: collect unifications.
1664    transform_bottom_up(
1665        ir,
1666        &mut |_| {},
1667        &mut |node: &mut HydroNode| {
1668            if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1669                node
1670                && let (Some(a), Some(b)) = (
1671                    tick_of(&inner.metadata().location_id),
1672                    tick_of(&metadata.location_id),
1673                )
1674            {
1675                uf_union(&mut uf, a, b);
1676            }
1677        },
1678        false,
1679    );
1680
1681    // Pass 2: rewrite all LocationIds.
1682    transform_bottom_up(
1683        ir,
1684        &mut |_| {},
1685        &mut |node: &mut HydroNode| {
1686            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1687        },
1688        false,
1689    );
1690}
1691
1692#[cfg(feature = "build")]
1693pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1694    let mut builders = SecondaryMap::new();
1695    let mut seen_tees = HashMap::new();
1696    let mut built_tees = HashMap::new();
1697    let mut next_stmt_id = 0;
1698    let mut fold_hooked_idents = HashSet::new();
1699    for leaf in ir {
1700        leaf.emit(
1701            &mut builders,
1702            &mut seen_tees,
1703            &mut built_tees,
1704            &mut next_stmt_id,
1705            &mut fold_hooked_idents,
1706        );
1707    }
1708    builders
1709}
1710
1711#[cfg(feature = "build")]
1712pub fn traverse_dfir(
1713    ir: &mut [HydroRoot],
1714    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1715    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1716) {
1717    let mut seen_tees = HashMap::new();
1718    let mut built_tees = HashMap::new();
1719    let mut next_stmt_id = 0;
1720    let mut fold_hooked_idents = HashSet::new();
1721    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1722    ir.iter_mut().for_each(|leaf| {
1723        leaf.emit_core(
1724            &mut callback,
1725            &mut seen_tees,
1726            &mut built_tees,
1727            &mut next_stmt_id,
1728            &mut fold_hooked_idents,
1729        );
1730    });
1731}
1732
1733pub fn transform_bottom_up(
1734    ir: &mut [HydroRoot],
1735    transform_root: &mut impl FnMut(&mut HydroRoot),
1736    transform_node: &mut impl FnMut(&mut HydroNode),
1737    check_well_formed: bool,
1738) {
1739    let mut seen_tees = HashMap::new();
1740    ir.iter_mut().for_each(|leaf| {
1741        leaf.transform_bottom_up(
1742            transform_root,
1743            transform_node,
1744            &mut seen_tees,
1745            check_well_formed,
1746        );
1747    });
1748}
1749
1750pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1751    let mut seen_tees = HashMap::new();
1752    ir.iter()
1753        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1754        .collect()
1755}
1756
1757type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1758thread_local! {
1759    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1760    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1761    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1762    /// on subsequent encounters, preventing infinite loops.
1763    static SERIALIZED_SHARED: PrintedTees
1764        = const { RefCell::new(None) };
1765}
1766
1767pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1768    PRINTED_TEES.with(|printed_tees| {
1769        let mut printed_tees_mut = printed_tees.borrow_mut();
1770        *printed_tees_mut = Some((0, HashMap::new()));
1771        drop(printed_tees_mut);
1772
1773        let ret = f();
1774
1775        let mut printed_tees_mut = printed_tees.borrow_mut();
1776        *printed_tees_mut = None;
1777
1778        ret
1779    })
1780}
1781
1782/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1783/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1784/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1785/// back-reference.  The tracking state is restored when `f` returns or panics.
1786pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1787    let _guard = SerializedSharedGuard::enter();
1788    f()
1789}
1790
1791/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1792/// making `serialize_dedup_shared` re-entrant and panic-safe.
1793struct SerializedSharedGuard {
1794    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1795}
1796
1797impl SerializedSharedGuard {
1798    fn enter() -> Self {
1799        let previous = SERIALIZED_SHARED.with(|cell| {
1800            let mut guard = cell.borrow_mut();
1801            guard.replace((0, HashMap::new()))
1802        });
1803        Self { previous }
1804    }
1805}
1806
1807impl Drop for SerializedSharedGuard {
1808    fn drop(&mut self) {
1809        SERIALIZED_SHARED.with(|cell| {
1810            *cell.borrow_mut() = self.previous.take();
1811        });
1812    }
1813}
1814
1815pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1816
1817impl serde::Serialize for SharedNode {
1818    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1819    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1820    /// same subtree every time and, if the graph ever contains a cycle, loop
1821    /// forever.
1822    ///
1823    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1824    /// integer id.  The first time we see a pointer we assign it the next id and
1825    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
1826    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1827    /// recursion.  Requires an active `serialize_dedup_shared` scope.
1828    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1829        SERIALIZED_SHARED.with(|cell| {
1830            let mut guard = cell.borrow_mut();
1831            // (next_id, pointer → assigned_id)
1832            let state = guard.as_mut().ok_or_else(|| {
1833                serde::ser::Error::custom(
1834                    "SharedNode serialization requires an active serialize_dedup_shared scope",
1835                )
1836            })?;
1837            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1838
1839            if let Some(&id) = state.1.get(&ptr) {
1840                drop(guard);
1841                use serde::ser::SerializeMap;
1842                let mut map = serializer.serialize_map(Some(1))?;
1843                map.serialize_entry("$shared_ref", &id)?;
1844                map.end()
1845            } else {
1846                let id = state.0;
1847                state.0 += 1;
1848                state.1.insert(ptr, id);
1849                drop(guard);
1850
1851                use serde::ser::SerializeMap;
1852                let mut map = serializer.serialize_map(Some(2))?;
1853                map.serialize_entry("$shared", &id)?;
1854                map.serialize_entry("node", &*self.0.borrow())?;
1855                map.end()
1856            }
1857        })
1858    }
1859}
1860
1861impl SharedNode {
1862    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1863        Rc::as_ptr(&self.0)
1864    }
1865}
1866
1867impl Debug for SharedNode {
1868    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1869        PRINTED_TEES.with(|printed_tees| {
1870            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1871            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1872
1873            if let Some(printed_tees_mut) = printed_tees_mut {
1874                if let Some(existing) = printed_tees_mut
1875                    .1
1876                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1877                {
1878                    write!(f, "<shared {}>", existing)
1879                } else {
1880                    let next_id = printed_tees_mut.0;
1881                    printed_tees_mut.0 += 1;
1882                    printed_tees_mut
1883                        .1
1884                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1885                    drop(printed_tees_mut_borrow);
1886                    write!(f, "<shared {}>: ", next_id)?;
1887                    Debug::fmt(&self.0.borrow(), f)
1888                }
1889            } else {
1890                drop(printed_tees_mut_borrow);
1891                write!(f, "<shared>: ")?;
1892                Debug::fmt(&self.0.borrow(), f)
1893            }
1894        })
1895    }
1896}
1897
1898impl Hash for SharedNode {
1899    fn hash<H: Hasher>(&self, state: &mut H) {
1900        self.0.borrow_mut().hash(state);
1901    }
1902}
1903
1904#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1905pub enum BoundKind {
1906    Unbounded,
1907    Bounded,
1908}
1909
1910#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1911pub enum StreamOrder {
1912    NoOrder,
1913    TotalOrder,
1914}
1915
1916#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1917pub enum StreamRetry {
1918    AtLeastOnce,
1919    ExactlyOnce,
1920}
1921
1922#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1923pub enum KeyedSingletonBoundKind {
1924    Unbounded,
1925    MonotonicValue,
1926    BoundedValue,
1927    Bounded,
1928}
1929
1930#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1931pub enum SingletonBoundKind {
1932    Unbounded,
1933    Monotonic,
1934    Bounded,
1935}
1936
1937#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1938pub enum CollectionKind {
1939    Stream {
1940        bound: BoundKind,
1941        order: StreamOrder,
1942        retry: StreamRetry,
1943        element_type: DebugType,
1944    },
1945    Singleton {
1946        bound: SingletonBoundKind,
1947        element_type: DebugType,
1948    },
1949    Optional {
1950        bound: BoundKind,
1951        element_type: DebugType,
1952    },
1953    KeyedStream {
1954        bound: BoundKind,
1955        value_order: StreamOrder,
1956        value_retry: StreamRetry,
1957        key_type: DebugType,
1958        value_type: DebugType,
1959    },
1960    KeyedSingleton {
1961        bound: KeyedSingletonBoundKind,
1962        key_type: DebugType,
1963        value_type: DebugType,
1964    },
1965}
1966
1967impl CollectionKind {
1968    pub fn is_bounded(&self) -> bool {
1969        matches!(
1970            self,
1971            CollectionKind::Stream {
1972                bound: BoundKind::Bounded,
1973                ..
1974            } | CollectionKind::Singleton {
1975                bound: SingletonBoundKind::Bounded,
1976                ..
1977            } | CollectionKind::Optional {
1978                bound: BoundKind::Bounded,
1979                ..
1980            } | CollectionKind::KeyedStream {
1981                bound: BoundKind::Bounded,
1982                ..
1983            } | CollectionKind::KeyedSingleton {
1984                bound: KeyedSingletonBoundKind::Bounded,
1985                ..
1986            }
1987        )
1988    }
1989}
1990
1991#[derive(Clone, serde::Serialize)]
1992pub struct HydroIrMetadata {
1993    pub location_id: LocationId,
1994    pub collection_kind: CollectionKind,
1995    pub cardinality: Option<usize>,
1996    pub tag: Option<String>,
1997    pub op: HydroIrOpMetadata,
1998}
1999
2000// HydroIrMetadata shouldn't be used to hash or compare
2001impl Hash for HydroIrMetadata {
2002    fn hash<H: Hasher>(&self, _: &mut H) {}
2003}
2004
2005impl PartialEq for HydroIrMetadata {
2006    fn eq(&self, _: &Self) -> bool {
2007        true
2008    }
2009}
2010
2011impl Eq for HydroIrMetadata {}
2012
2013impl Debug for HydroIrMetadata {
2014    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2015        f.debug_struct("HydroIrMetadata")
2016            .field("location_id", &self.location_id)
2017            .field("collection_kind", &self.collection_kind)
2018            .finish()
2019    }
2020}
2021
2022/// Metadata that is specific to the operator itself, rather than its outputs.
2023/// This is available on _both_ inner nodes and roots.
2024#[derive(Clone, serde::Serialize)]
2025pub struct HydroIrOpMetadata {
2026    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2027    pub backtrace: Backtrace,
2028    pub cpu_usage: Option<f64>,
2029    pub network_recv_cpu_usage: Option<f64>,
2030    pub id: Option<usize>,
2031}
2032
2033impl HydroIrOpMetadata {
2034    #[expect(
2035        clippy::new_without_default,
2036        reason = "explicit calls to new ensure correct backtrace bounds"
2037    )]
2038    pub fn new() -> HydroIrOpMetadata {
2039        Self::new_with_skip(1)
2040    }
2041
2042    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2043        HydroIrOpMetadata {
2044            backtrace: Backtrace::get_backtrace(2 + skip_count),
2045            cpu_usage: None,
2046            network_recv_cpu_usage: None,
2047            id: None,
2048        }
2049    }
2050}
2051
2052impl Debug for HydroIrOpMetadata {
2053    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2054        f.debug_struct("HydroIrOpMetadata").finish()
2055    }
2056}
2057
2058impl Hash for HydroIrOpMetadata {
2059    fn hash<H: Hasher>(&self, _: &mut H) {}
2060}
2061
2062/// An intermediate node in a Hydro graph, which consumes data
2063/// from upstream nodes and emits data to downstream nodes.
2064#[derive(Debug, Hash, serde::Serialize)]
2065pub enum HydroNode {
2066    Placeholder,
2067
2068    /// Manually "casts" between two different collection kinds.
2069    ///
2070    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2071    /// correctness checks. In particular, the user must ensure that every possible
2072    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2073    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2074    /// collection. This ensures that the simulator does not miss any possible outputs.
2075    Cast {
2076        inner: Box<HydroNode>,
2077        metadata: HydroIrMetadata,
2078    },
2079
2080    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2081    /// interpretation of the input stream.
2082    ///
2083    /// In production, this simply passes through the input, but in simulation, this operator
2084    /// explicitly selects a randomized interpretation.
2085    ObserveNonDet {
2086        inner: Box<HydroNode>,
2087        trusted: bool, // if true, we do not need to simulate non-determinism
2088        metadata: HydroIrMetadata,
2089    },
2090
2091    Source {
2092        source: HydroSource,
2093        metadata: HydroIrMetadata,
2094    },
2095
2096    SingletonSource {
2097        value: DebugExpr,
2098        first_tick_only: bool,
2099        metadata: HydroIrMetadata,
2100    },
2101
2102    CycleSource {
2103        cycle_id: CycleId,
2104        metadata: HydroIrMetadata,
2105    },
2106
2107    Tee {
2108        inner: SharedNode,
2109        metadata: HydroIrMetadata,
2110    },
2111
2112    /// A singleton materialization point. Wraps a SharedNode so that:
2113    /// - The pipe output delivers the single item to one consumer
2114    /// - `#var` references can borrow the value from the singleton slot
2115    ///
2116    /// In DFIR codegen, emits `ident = inner_ident -> singleton()`.
2117    ///
2118    /// Uses the same `built_tees` dedup pattern as `Tee`.
2119    Singleton {
2120        inner: SharedNode,
2121        metadata: HydroIrMetadata,
2122    },
2123
2124    Partition {
2125        inner: SharedNode,
2126        f: DebugExpr,
2127        is_true: bool,
2128        metadata: HydroIrMetadata,
2129    },
2130
2131    BeginAtomic {
2132        inner: Box<HydroNode>,
2133        metadata: HydroIrMetadata,
2134    },
2135
2136    EndAtomic {
2137        inner: Box<HydroNode>,
2138        metadata: HydroIrMetadata,
2139    },
2140
2141    Batch {
2142        inner: Box<HydroNode>,
2143        metadata: HydroIrMetadata,
2144    },
2145
2146    YieldConcat {
2147        inner: Box<HydroNode>,
2148        metadata: HydroIrMetadata,
2149    },
2150
2151    Chain {
2152        first: Box<HydroNode>,
2153        second: Box<HydroNode>,
2154        metadata: HydroIrMetadata,
2155    },
2156
2157    MergeOrdered {
2158        first: Box<HydroNode>,
2159        second: Box<HydroNode>,
2160        metadata: HydroIrMetadata,
2161    },
2162
2163    ChainFirst {
2164        first: Box<HydroNode>,
2165        second: Box<HydroNode>,
2166        metadata: HydroIrMetadata,
2167    },
2168
2169    CrossProduct {
2170        left: Box<HydroNode>,
2171        right: Box<HydroNode>,
2172        metadata: HydroIrMetadata,
2173    },
2174
2175    CrossSingleton {
2176        left: Box<HydroNode>,
2177        right: Box<HydroNode>,
2178        metadata: HydroIrMetadata,
2179    },
2180
2181    Join {
2182        left: Box<HydroNode>,
2183        right: Box<HydroNode>,
2184        metadata: HydroIrMetadata,
2185    },
2186
2187    /// Asymmetric join where the right (build) side is bounded.
2188    /// The build side is accumulated (stratum-delayed) into a hash table,
2189    /// then the left (probe) side streams through preserving its ordering.
2190    JoinHalf {
2191        left: Box<HydroNode>,
2192        right: Box<HydroNode>,
2193        metadata: HydroIrMetadata,
2194    },
2195
2196    Difference {
2197        pos: Box<HydroNode>,
2198        neg: Box<HydroNode>,
2199        metadata: HydroIrMetadata,
2200    },
2201
2202    AntiJoin {
2203        pos: Box<HydroNode>,
2204        neg: Box<HydroNode>,
2205        metadata: HydroIrMetadata,
2206    },
2207
2208    ResolveFutures {
2209        input: Box<HydroNode>,
2210        metadata: HydroIrMetadata,
2211    },
2212    ResolveFuturesBlocking {
2213        input: Box<HydroNode>,
2214        metadata: HydroIrMetadata,
2215    },
2216    ResolveFuturesOrdered {
2217        input: Box<HydroNode>,
2218        metadata: HydroIrMetadata,
2219    },
2220
2221    Map {
2222        f: DebugExpr,
2223        /// Singleton references captured by the closure `f` via `SingletonRef`.
2224        /// Each entry maps a local ident (used in the closure body) to the IR node
2225        /// of the referenced singleton.
2226        ///
2227        /// TODO: Currently only `Map` has this field. Other closure-bearing variants
2228        /// (Filter, FlatMap, Fold, etc.) should be extended similarly. For nodes with
2229        /// multiple closures (e.g. Fold has `init` and `acc`), we may need per-closure
2230        /// tracking rather than per-node.
2231        #[serde(serialize_with = "serialize_singleton_refs")]
2232        singleton_refs: Vec<(syn::Ident, HydroNode)>,
2233        input: Box<HydroNode>,
2234        metadata: HydroIrMetadata,
2235    },
2236    FlatMap {
2237        f: DebugExpr,
2238        input: Box<HydroNode>,
2239        metadata: HydroIrMetadata,
2240    },
2241    FlatMapStreamBlocking {
2242        f: DebugExpr,
2243        input: Box<HydroNode>,
2244        metadata: HydroIrMetadata,
2245    },
2246    Filter {
2247        f: DebugExpr,
2248        input: Box<HydroNode>,
2249        metadata: HydroIrMetadata,
2250    },
2251    FilterMap {
2252        f: DebugExpr,
2253        input: Box<HydroNode>,
2254        metadata: HydroIrMetadata,
2255    },
2256
2257    DeferTick {
2258        input: Box<HydroNode>,
2259        metadata: HydroIrMetadata,
2260    },
2261    Enumerate {
2262        input: Box<HydroNode>,
2263        metadata: HydroIrMetadata,
2264    },
2265    Inspect {
2266        f: DebugExpr,
2267        input: Box<HydroNode>,
2268        metadata: HydroIrMetadata,
2269    },
2270
2271    Unique {
2272        input: Box<HydroNode>,
2273        metadata: HydroIrMetadata,
2274    },
2275
2276    Sort {
2277        input: Box<HydroNode>,
2278        metadata: HydroIrMetadata,
2279    },
2280    Fold {
2281        init: DebugExpr,
2282        acc: DebugExpr,
2283        input: Box<HydroNode>,
2284        metadata: HydroIrMetadata,
2285    },
2286
2287    Scan {
2288        init: DebugExpr,
2289        acc: DebugExpr,
2290        input: Box<HydroNode>,
2291        metadata: HydroIrMetadata,
2292    },
2293    ScanAsyncBlocking {
2294        init: DebugExpr,
2295        acc: DebugExpr,
2296        input: Box<HydroNode>,
2297        metadata: HydroIrMetadata,
2298    },
2299    FoldKeyed {
2300        init: DebugExpr,
2301        acc: DebugExpr,
2302        input: Box<HydroNode>,
2303        metadata: HydroIrMetadata,
2304    },
2305
2306    Reduce {
2307        f: DebugExpr,
2308        input: Box<HydroNode>,
2309        metadata: HydroIrMetadata,
2310    },
2311    ReduceKeyed {
2312        f: DebugExpr,
2313        input: Box<HydroNode>,
2314        metadata: HydroIrMetadata,
2315    },
2316    ReduceKeyedWatermark {
2317        f: DebugExpr,
2318        input: Box<HydroNode>,
2319        watermark: Box<HydroNode>,
2320        metadata: HydroIrMetadata,
2321    },
2322
2323    Network {
2324        name: Option<String>,
2325        networking_info: crate::networking::NetworkingInfo,
2326        serialize_fn: Option<DebugExpr>,
2327        instantiate_fn: DebugInstantiate,
2328        deserialize_fn: Option<DebugExpr>,
2329        input: Box<HydroNode>,
2330        metadata: HydroIrMetadata,
2331    },
2332
2333    ExternalInput {
2334        from_external_key: LocationKey,
2335        from_port_id: ExternalPortId,
2336        from_many: bool,
2337        codec_type: DebugType,
2338        #[serde(skip)]
2339        port_hint: NetworkHint,
2340        instantiate_fn: DebugInstantiate,
2341        deserialize_fn: Option<DebugExpr>,
2342        metadata: HydroIrMetadata,
2343    },
2344
2345    Counter {
2346        tag: String,
2347        duration: DebugExpr,
2348        prefix: String,
2349        input: Box<HydroNode>,
2350        metadata: HydroIrMetadata,
2351    },
2352}
2353
2354pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2355pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2356
2357impl HydroNode {
2358    pub fn transform_bottom_up(
2359        &mut self,
2360        transform: &mut impl FnMut(&mut HydroNode),
2361        seen_tees: &mut SeenSharedNodes,
2362        check_well_formed: bool,
2363    ) {
2364        self.transform_children(
2365            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2366            seen_tees,
2367        );
2368
2369        transform(self);
2370
2371        let self_location = self.metadata().location_id.root();
2372
2373        if check_well_formed {
2374            match &*self {
2375                HydroNode::Network { .. } => {}
2376                _ => {
2377                    self.input_metadata().iter().for_each(|i| {
2378                        if i.location_id.root() != self_location {
2379                            panic!(
2380                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2381                                i,
2382                                i.location_id.root(),
2383                                self,
2384                                self_location
2385                            )
2386                        }
2387                    });
2388                }
2389            }
2390        }
2391    }
2392
2393    #[inline(always)]
2394    pub fn transform_children(
2395        &mut self,
2396        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2397        seen_tees: &mut SeenSharedNodes,
2398    ) {
2399        match self {
2400            HydroNode::Placeholder => {
2401                panic!();
2402            }
2403
2404            HydroNode::Source { .. }
2405            | HydroNode::SingletonSource { .. }
2406            | HydroNode::CycleSource { .. }
2407            | HydroNode::ExternalInput { .. } => {}
2408
2409            HydroNode::Tee { inner, .. }
2410            | HydroNode::Singleton { inner, .. }
2411            | HydroNode::Partition { inner, .. } => {
2412                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2413                    *inner = SharedNode(transformed.clone());
2414                } else {
2415                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2416                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2417                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2418                    transform(&mut orig, seen_tees);
2419                    *transformed_cell.borrow_mut() = orig;
2420                    *inner = SharedNode(transformed_cell);
2421                }
2422            }
2423
2424            HydroNode::Cast { inner, .. }
2425            | HydroNode::ObserveNonDet { inner, .. }
2426            | HydroNode::BeginAtomic { inner, .. }
2427            | HydroNode::EndAtomic { inner, .. }
2428            | HydroNode::Batch { inner, .. }
2429            | HydroNode::YieldConcat { inner, .. } => {
2430                transform(inner.as_mut(), seen_tees);
2431            }
2432
2433            HydroNode::Chain { first, second, .. } => {
2434                transform(first.as_mut(), seen_tees);
2435                transform(second.as_mut(), seen_tees);
2436            }
2437
2438            HydroNode::MergeOrdered { first, second, .. } => {
2439                transform(first.as_mut(), seen_tees);
2440                transform(second.as_mut(), seen_tees);
2441            }
2442
2443            HydroNode::ChainFirst { first, second, .. } => {
2444                transform(first.as_mut(), seen_tees);
2445                transform(second.as_mut(), seen_tees);
2446            }
2447
2448            HydroNode::CrossSingleton { left, right, .. }
2449            | HydroNode::CrossProduct { left, right, .. }
2450            | HydroNode::Join { left, right, .. }
2451            | HydroNode::JoinHalf { left, right, .. } => {
2452                transform(left.as_mut(), seen_tees);
2453                transform(right.as_mut(), seen_tees);
2454            }
2455
2456            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2457                transform(pos.as_mut(), seen_tees);
2458                transform(neg.as_mut(), seen_tees);
2459            }
2460
2461            HydroNode::ReduceKeyedWatermark {
2462                input, watermark, ..
2463            } => {
2464                transform(input.as_mut(), seen_tees);
2465                transform(watermark.as_mut(), seen_tees);
2466            }
2467
2468            HydroNode::Map {
2469                input,
2470                singleton_refs,
2471                ..
2472            } => {
2473                // Transform each singleton ref node (these are HydroNode::Singleton wrappers).
2474                for (_ident, ref_node) in singleton_refs.iter_mut() {
2475                    transform(ref_node, seen_tees);
2476                }
2477                transform(input.as_mut(), seen_tees);
2478            }
2479            HydroNode::ResolveFutures { input, .. }
2480            | HydroNode::ResolveFuturesBlocking { input, .. }
2481            | HydroNode::ResolveFuturesOrdered { input, .. }
2482            | HydroNode::FlatMap { input, .. }
2483            | HydroNode::FlatMapStreamBlocking { input, .. }
2484            | HydroNode::Filter { input, .. }
2485            | HydroNode::FilterMap { input, .. }
2486            | HydroNode::Sort { input, .. }
2487            | HydroNode::DeferTick { input, .. }
2488            | HydroNode::Enumerate { input, .. }
2489            | HydroNode::Inspect { input, .. }
2490            | HydroNode::Unique { input, .. }
2491            | HydroNode::Network { input, .. }
2492            | HydroNode::Fold { input, .. }
2493            | HydroNode::Scan { input, .. }
2494            | HydroNode::ScanAsyncBlocking { input, .. }
2495            | HydroNode::FoldKeyed { input, .. }
2496            | HydroNode::Reduce { input, .. }
2497            | HydroNode::ReduceKeyed { input, .. }
2498            | HydroNode::Counter { input, .. } => {
2499                transform(input.as_mut(), seen_tees);
2500            }
2501        }
2502    }
2503
2504    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2505        match self {
2506            HydroNode::Placeholder => HydroNode::Placeholder,
2507            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2508                inner: Box::new(inner.deep_clone(seen_tees)),
2509                metadata: metadata.clone(),
2510            },
2511            HydroNode::ObserveNonDet {
2512                inner,
2513                trusted,
2514                metadata,
2515            } => HydroNode::ObserveNonDet {
2516                inner: Box::new(inner.deep_clone(seen_tees)),
2517                trusted: *trusted,
2518                metadata: metadata.clone(),
2519            },
2520            HydroNode::Source { source, metadata } => HydroNode::Source {
2521                source: source.clone(),
2522                metadata: metadata.clone(),
2523            },
2524            HydroNode::SingletonSource {
2525                value,
2526                first_tick_only,
2527                metadata,
2528            } => HydroNode::SingletonSource {
2529                value: value.clone(),
2530                first_tick_only: *first_tick_only,
2531                metadata: metadata.clone(),
2532            },
2533            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2534                cycle_id: *cycle_id,
2535                metadata: metadata.clone(),
2536            },
2537            HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2538                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2539                    SharedNode(transformed.clone())
2540                } else {
2541                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2542                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2543                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2544                    *new_rc.borrow_mut() = cloned;
2545                    SharedNode(new_rc)
2546                };
2547                if matches!(self, HydroNode::Singleton { .. }) {
2548                    HydroNode::Singleton {
2549                        inner: cloned_inner,
2550                        metadata: metadata.clone(),
2551                    }
2552                } else {
2553                    HydroNode::Tee {
2554                        inner: cloned_inner,
2555                        metadata: metadata.clone(),
2556                    }
2557                }
2558            }
2559            HydroNode::Partition {
2560                inner,
2561                f,
2562                is_true,
2563                metadata,
2564            } => {
2565                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2566                    HydroNode::Partition {
2567                        inner: SharedNode(transformed.clone()),
2568                        f: f.clone(),
2569                        is_true: *is_true,
2570                        metadata: metadata.clone(),
2571                    }
2572                } else {
2573                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2574                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2575                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2576                    *new_rc.borrow_mut() = cloned;
2577                    HydroNode::Partition {
2578                        inner: SharedNode(new_rc),
2579                        f: f.clone(),
2580                        is_true: *is_true,
2581                        metadata: metadata.clone(),
2582                    }
2583                }
2584            }
2585            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2586                inner: Box::new(inner.deep_clone(seen_tees)),
2587                metadata: metadata.clone(),
2588            },
2589            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2590                inner: Box::new(inner.deep_clone(seen_tees)),
2591                metadata: metadata.clone(),
2592            },
2593            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2594                inner: Box::new(inner.deep_clone(seen_tees)),
2595                metadata: metadata.clone(),
2596            },
2597            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2598                inner: Box::new(inner.deep_clone(seen_tees)),
2599                metadata: metadata.clone(),
2600            },
2601            HydroNode::Chain {
2602                first,
2603                second,
2604                metadata,
2605            } => HydroNode::Chain {
2606                first: Box::new(first.deep_clone(seen_tees)),
2607                second: Box::new(second.deep_clone(seen_tees)),
2608                metadata: metadata.clone(),
2609            },
2610            HydroNode::MergeOrdered {
2611                first,
2612                second,
2613                metadata,
2614            } => HydroNode::MergeOrdered {
2615                first: Box::new(first.deep_clone(seen_tees)),
2616                second: Box::new(second.deep_clone(seen_tees)),
2617                metadata: metadata.clone(),
2618            },
2619            HydroNode::ChainFirst {
2620                first,
2621                second,
2622                metadata,
2623            } => HydroNode::ChainFirst {
2624                first: Box::new(first.deep_clone(seen_tees)),
2625                second: Box::new(second.deep_clone(seen_tees)),
2626                metadata: metadata.clone(),
2627            },
2628            HydroNode::CrossProduct {
2629                left,
2630                right,
2631                metadata,
2632            } => HydroNode::CrossProduct {
2633                left: Box::new(left.deep_clone(seen_tees)),
2634                right: Box::new(right.deep_clone(seen_tees)),
2635                metadata: metadata.clone(),
2636            },
2637            HydroNode::CrossSingleton {
2638                left,
2639                right,
2640                metadata,
2641            } => HydroNode::CrossSingleton {
2642                left: Box::new(left.deep_clone(seen_tees)),
2643                right: Box::new(right.deep_clone(seen_tees)),
2644                metadata: metadata.clone(),
2645            },
2646            HydroNode::Join {
2647                left,
2648                right,
2649                metadata,
2650            } => HydroNode::Join {
2651                left: Box::new(left.deep_clone(seen_tees)),
2652                right: Box::new(right.deep_clone(seen_tees)),
2653                metadata: metadata.clone(),
2654            },
2655            HydroNode::JoinHalf {
2656                left,
2657                right,
2658                metadata,
2659            } => HydroNode::JoinHalf {
2660                left: Box::new(left.deep_clone(seen_tees)),
2661                right: Box::new(right.deep_clone(seen_tees)),
2662                metadata: metadata.clone(),
2663            },
2664            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2665                pos: Box::new(pos.deep_clone(seen_tees)),
2666                neg: Box::new(neg.deep_clone(seen_tees)),
2667                metadata: metadata.clone(),
2668            },
2669            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2670                pos: Box::new(pos.deep_clone(seen_tees)),
2671                neg: Box::new(neg.deep_clone(seen_tees)),
2672                metadata: metadata.clone(),
2673            },
2674            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2675                input: Box::new(input.deep_clone(seen_tees)),
2676                metadata: metadata.clone(),
2677            },
2678            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2679                HydroNode::ResolveFuturesBlocking {
2680                    input: Box::new(input.deep_clone(seen_tees)),
2681                    metadata: metadata.clone(),
2682                }
2683            }
2684            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2685                HydroNode::ResolveFuturesOrdered {
2686                    input: Box::new(input.deep_clone(seen_tees)),
2687                    metadata: metadata.clone(),
2688                }
2689            }
2690            HydroNode::Map {
2691                f,
2692                singleton_refs,
2693                input,
2694                metadata,
2695            } => HydroNode::Map {
2696                f: f.clone(),
2697                singleton_refs: singleton_refs
2698                    .iter()
2699                    .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
2700                    .collect(),
2701                input: Box::new(input.deep_clone(seen_tees)),
2702                metadata: metadata.clone(),
2703            },
2704            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2705                f: f.clone(),
2706                input: Box::new(input.deep_clone(seen_tees)),
2707                metadata: metadata.clone(),
2708            },
2709            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2710                HydroNode::FlatMapStreamBlocking {
2711                    f: f.clone(),
2712                    input: Box::new(input.deep_clone(seen_tees)),
2713                    metadata: metadata.clone(),
2714                }
2715            }
2716            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2717                f: f.clone(),
2718                input: Box::new(input.deep_clone(seen_tees)),
2719                metadata: metadata.clone(),
2720            },
2721            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2722                f: f.clone(),
2723                input: Box::new(input.deep_clone(seen_tees)),
2724                metadata: metadata.clone(),
2725            },
2726            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2727                input: Box::new(input.deep_clone(seen_tees)),
2728                metadata: metadata.clone(),
2729            },
2730            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2731                input: Box::new(input.deep_clone(seen_tees)),
2732                metadata: metadata.clone(),
2733            },
2734            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2735                f: f.clone(),
2736                input: Box::new(input.deep_clone(seen_tees)),
2737                metadata: metadata.clone(),
2738            },
2739            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2740                input: Box::new(input.deep_clone(seen_tees)),
2741                metadata: metadata.clone(),
2742            },
2743            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2744                input: Box::new(input.deep_clone(seen_tees)),
2745                metadata: metadata.clone(),
2746            },
2747            HydroNode::Fold {
2748                init,
2749                acc,
2750                input,
2751                metadata,
2752            } => HydroNode::Fold {
2753                init: init.clone(),
2754                acc: acc.clone(),
2755                input: Box::new(input.deep_clone(seen_tees)),
2756                metadata: metadata.clone(),
2757            },
2758            HydroNode::Scan {
2759                init,
2760                acc,
2761                input,
2762                metadata,
2763            } => HydroNode::Scan {
2764                init: init.clone(),
2765                acc: acc.clone(),
2766                input: Box::new(input.deep_clone(seen_tees)),
2767                metadata: metadata.clone(),
2768            },
2769            HydroNode::ScanAsyncBlocking {
2770                init,
2771                acc,
2772                input,
2773                metadata,
2774            } => HydroNode::ScanAsyncBlocking {
2775                init: init.clone(),
2776                acc: acc.clone(),
2777                input: Box::new(input.deep_clone(seen_tees)),
2778                metadata: metadata.clone(),
2779            },
2780            HydroNode::FoldKeyed {
2781                init,
2782                acc,
2783                input,
2784                metadata,
2785            } => HydroNode::FoldKeyed {
2786                init: init.clone(),
2787                acc: acc.clone(),
2788                input: Box::new(input.deep_clone(seen_tees)),
2789                metadata: metadata.clone(),
2790            },
2791            HydroNode::ReduceKeyedWatermark {
2792                f,
2793                input,
2794                watermark,
2795                metadata,
2796            } => HydroNode::ReduceKeyedWatermark {
2797                f: f.clone(),
2798                input: Box::new(input.deep_clone(seen_tees)),
2799                watermark: Box::new(watermark.deep_clone(seen_tees)),
2800                metadata: metadata.clone(),
2801            },
2802            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2803                f: f.clone(),
2804                input: Box::new(input.deep_clone(seen_tees)),
2805                metadata: metadata.clone(),
2806            },
2807            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2808                f: f.clone(),
2809                input: Box::new(input.deep_clone(seen_tees)),
2810                metadata: metadata.clone(),
2811            },
2812            HydroNode::Network {
2813                name,
2814                networking_info,
2815                serialize_fn,
2816                instantiate_fn,
2817                deserialize_fn,
2818                input,
2819                metadata,
2820            } => HydroNode::Network {
2821                name: name.clone(),
2822                networking_info: networking_info.clone(),
2823                serialize_fn: serialize_fn.clone(),
2824                instantiate_fn: instantiate_fn.clone(),
2825                deserialize_fn: deserialize_fn.clone(),
2826                input: Box::new(input.deep_clone(seen_tees)),
2827                metadata: metadata.clone(),
2828            },
2829            HydroNode::ExternalInput {
2830                from_external_key,
2831                from_port_id,
2832                from_many,
2833                codec_type,
2834                port_hint,
2835                instantiate_fn,
2836                deserialize_fn,
2837                metadata,
2838            } => HydroNode::ExternalInput {
2839                from_external_key: *from_external_key,
2840                from_port_id: *from_port_id,
2841                from_many: *from_many,
2842                codec_type: codec_type.clone(),
2843                port_hint: *port_hint,
2844                instantiate_fn: instantiate_fn.clone(),
2845                deserialize_fn: deserialize_fn.clone(),
2846                metadata: metadata.clone(),
2847            },
2848            HydroNode::Counter {
2849                tag,
2850                duration,
2851                prefix,
2852                input,
2853                metadata,
2854            } => HydroNode::Counter {
2855                tag: tag.clone(),
2856                duration: duration.clone(),
2857                prefix: prefix.clone(),
2858                input: Box::new(input.deep_clone(seen_tees)),
2859                metadata: metadata.clone(),
2860            },
2861        }
2862    }
2863
2864    #[cfg(feature = "build")]
2865    pub fn emit_core(
2866        &mut self,
2867        builders_or_callback: &mut BuildersOrCallback<
2868            impl FnMut(&mut HydroRoot, &mut usize),
2869            impl FnMut(&mut HydroNode, &mut usize),
2870        >,
2871        seen_tees: &mut SeenSharedNodes,
2872        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2873        next_stmt_id: &mut usize,
2874        fold_hooked_idents: &mut HashSet<String>,
2875    ) -> syn::Ident {
2876        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2877
2878        self.transform_bottom_up(
2879            &mut |node: &mut HydroNode| {
2880                let out_location = node.metadata().location_id.clone();
2881                match node {
2882                    HydroNode::Placeholder => {
2883                        panic!()
2884                    }
2885
2886                    HydroNode::Cast { .. } => {
2887                        // Cast passes through the input ident unchanged
2888                        // The input ident is already on the stack from processing the child
2889                        match builders_or_callback {
2890                            BuildersOrCallback::Builders(_) => {}
2891                            BuildersOrCallback::Callback(_, node_callback) => {
2892                                node_callback(node, next_stmt_id);
2893                            }
2894                        }
2895
2896                        *next_stmt_id += 1;
2897                        // input_ident stays on stack as output
2898                    }
2899
2900                    HydroNode::ObserveNonDet {
2901                        inner,
2902                        trusted,
2903                        metadata,
2904                        ..
2905                    } => {
2906                        let inner_ident = ident_stack.pop().unwrap();
2907
2908                        let observe_ident =
2909                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2910
2911                        match builders_or_callback {
2912                            BuildersOrCallback::Builders(graph_builders) => {
2913                                graph_builders.observe_nondet(
2914                                    *trusted,
2915                                    &inner.metadata().location_id,
2916                                    inner_ident,
2917                                    &inner.metadata().collection_kind,
2918                                    &observe_ident,
2919                                    &metadata.collection_kind,
2920                                    &metadata.op,
2921                                );
2922                            }
2923                            BuildersOrCallback::Callback(_, node_callback) => {
2924                                node_callback(node, next_stmt_id);
2925                            }
2926                        }
2927
2928                        *next_stmt_id += 1;
2929
2930                        ident_stack.push(observe_ident);
2931                    }
2932
2933                    HydroNode::Batch {
2934                        inner, metadata, ..
2935                    } => {
2936                        let inner_ident = ident_stack.pop().unwrap();
2937
2938                        let batch_ident =
2939                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2940
2941                        match builders_or_callback {
2942                            BuildersOrCallback::Builders(graph_builders) => {
2943                                graph_builders.batch(
2944                                    inner_ident,
2945                                    &inner.metadata().location_id,
2946                                    &inner.metadata().collection_kind,
2947                                    &batch_ident,
2948                                    &out_location,
2949                                    &metadata.op,
2950                                    fold_hooked_idents,
2951                                );
2952                            }
2953                            BuildersOrCallback::Callback(_, node_callback) => {
2954                                node_callback(node, next_stmt_id);
2955                            }
2956                        }
2957
2958                        *next_stmt_id += 1;
2959
2960                        ident_stack.push(batch_ident);
2961                    }
2962
2963                    HydroNode::YieldConcat { inner, .. } => {
2964                        let inner_ident = ident_stack.pop().unwrap();
2965
2966                        let yield_ident =
2967                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2968
2969                        match builders_or_callback {
2970                            BuildersOrCallback::Builders(graph_builders) => {
2971                                graph_builders.yield_from_tick(
2972                                    inner_ident,
2973                                    &inner.metadata().location_id,
2974                                    &inner.metadata().collection_kind,
2975                                    &yield_ident,
2976                                    &out_location,
2977                                );
2978                            }
2979                            BuildersOrCallback::Callback(_, node_callback) => {
2980                                node_callback(node, next_stmt_id);
2981                            }
2982                        }
2983
2984                        *next_stmt_id += 1;
2985
2986                        ident_stack.push(yield_ident);
2987                    }
2988
2989                    HydroNode::BeginAtomic { inner, metadata } => {
2990                        let inner_ident = ident_stack.pop().unwrap();
2991
2992                        let begin_ident =
2993                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2994
2995                        match builders_or_callback {
2996                            BuildersOrCallback::Builders(graph_builders) => {
2997                                graph_builders.begin_atomic(
2998                                    inner_ident,
2999                                    &inner.metadata().location_id,
3000                                    &inner.metadata().collection_kind,
3001                                    &begin_ident,
3002                                    &out_location,
3003                                    &metadata.op,
3004                                );
3005                            }
3006                            BuildersOrCallback::Callback(_, node_callback) => {
3007                                node_callback(node, next_stmt_id);
3008                            }
3009                        }
3010
3011                        *next_stmt_id += 1;
3012
3013                        ident_stack.push(begin_ident);
3014                    }
3015
3016                    HydroNode::EndAtomic { inner, .. } => {
3017                        let inner_ident = ident_stack.pop().unwrap();
3018
3019                        let end_ident =
3020                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3021
3022                        match builders_or_callback {
3023                            BuildersOrCallback::Builders(graph_builders) => {
3024                                graph_builders.end_atomic(
3025                                    inner_ident,
3026                                    &inner.metadata().location_id,
3027                                    &inner.metadata().collection_kind,
3028                                    &end_ident,
3029                                );
3030                            }
3031                            BuildersOrCallback::Callback(_, node_callback) => {
3032                                node_callback(node, next_stmt_id);
3033                            }
3034                        }
3035
3036                        *next_stmt_id += 1;
3037
3038                        ident_stack.push(end_ident);
3039                    }
3040
3041                    HydroNode::Source {
3042                        source, metadata, ..
3043                    } => {
3044                        if let HydroSource::ExternalNetwork() = source {
3045                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3046                        } else {
3047                            let source_ident =
3048                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3049
3050                            let source_stmt = match source {
3051                                HydroSource::Stream(expr) => {
3052                                    debug_assert!(metadata.location_id.is_top_level());
3053                                    parse_quote! {
3054                                        #source_ident = source_stream(#expr);
3055                                    }
3056                                }
3057
3058                                HydroSource::ExternalNetwork() => {
3059                                    unreachable!()
3060                                }
3061
3062                                HydroSource::Iter(expr) => {
3063                                    if metadata.location_id.is_top_level() {
3064                                        parse_quote! {
3065                                            #source_ident = source_iter(#expr);
3066                                        }
3067                                    } else {
3068                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3069                                        parse_quote! {
3070                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3071                                        }
3072                                    }
3073                                }
3074
3075                                HydroSource::Spin() => {
3076                                    debug_assert!(metadata.location_id.is_top_level());
3077                                    parse_quote! {
3078                                        #source_ident = spin();
3079                                    }
3080                                }
3081
3082                                HydroSource::ClusterMembers(target_loc, state) => {
3083                                    debug_assert!(metadata.location_id.is_top_level());
3084
3085                                    let members_tee_ident = syn::Ident::new(
3086                                        &format!(
3087                                            "__cluster_members_tee_{}_{}",
3088                                            metadata.location_id.root().key(),
3089                                            target_loc.key(),
3090                                        ),
3091                                        Span::call_site(),
3092                                    );
3093
3094                                    match state {
3095                                        ClusterMembersState::Stream(d) => {
3096                                            parse_quote! {
3097                                                #members_tee_ident = source_stream(#d) -> tee();
3098                                                #source_ident = #members_tee_ident;
3099                                            }
3100                                        },
3101                                        ClusterMembersState::Uninit => syn::parse_quote! {
3102                                            #source_ident = source_stream(DUMMY);
3103                                        },
3104                                        ClusterMembersState::Tee(..) => parse_quote! {
3105                                            #source_ident = #members_tee_ident;
3106                                        },
3107                                    }
3108                                }
3109
3110                                HydroSource::Embedded(ident) => {
3111                                    parse_quote! {
3112                                        #source_ident = source_stream(#ident);
3113                                    }
3114                                }
3115
3116                                HydroSource::EmbeddedSingleton(ident) => {
3117                                    parse_quote! {
3118                                        #source_ident = source_iter([#ident]);
3119                                    }
3120                                }
3121                            };
3122
3123                            match builders_or_callback {
3124                                BuildersOrCallback::Builders(graph_builders) => {
3125                                    let builder = graph_builders.get_dfir_mut(&out_location);
3126                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3127                                }
3128                                BuildersOrCallback::Callback(_, node_callback) => {
3129                                    node_callback(node, next_stmt_id);
3130                                }
3131                            }
3132
3133                            *next_stmt_id += 1;
3134
3135                            ident_stack.push(source_ident);
3136                        }
3137                    }
3138
3139                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3140                        let source_ident =
3141                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3142
3143                        match builders_or_callback {
3144                            BuildersOrCallback::Builders(graph_builders) => {
3145                                let builder = graph_builders.get_dfir_mut(&out_location);
3146
3147                                if *first_tick_only {
3148                                    assert!(
3149                                        !metadata.location_id.is_top_level(),
3150                                        "first_tick_only SingletonSource must be inside a tick"
3151                                    );
3152                                }
3153
3154                                if *first_tick_only
3155                                    || (metadata.location_id.is_top_level()
3156                                        && metadata.collection_kind.is_bounded())
3157                                {
3158                                    builder.add_dfir(
3159                                        parse_quote! {
3160                                            #source_ident = source_iter([#value]);
3161                                        },
3162                                        None,
3163                                        Some(&next_stmt_id.to_string()),
3164                                    );
3165                                } else {
3166                                    builder.add_dfir(
3167                                        parse_quote! {
3168                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3169                                        },
3170                                        None,
3171                                        Some(&next_stmt_id.to_string()),
3172                                    );
3173                                }
3174                            }
3175                            BuildersOrCallback::Callback(_, node_callback) => {
3176                                node_callback(node, next_stmt_id);
3177                            }
3178                        }
3179
3180                        *next_stmt_id += 1;
3181
3182                        ident_stack.push(source_ident);
3183                    }
3184
3185                    HydroNode::CycleSource { cycle_id, .. } => {
3186                        let ident = cycle_id.as_ident();
3187
3188                        match builders_or_callback {
3189                            BuildersOrCallback::Builders(_) => {}
3190                            BuildersOrCallback::Callback(_, node_callback) => {
3191                                node_callback(node, next_stmt_id);
3192                            }
3193                        }
3194
3195                        // consume a stmt id even though we did not emit anything so that we can instrument this
3196                        *next_stmt_id += 1;
3197
3198                        ident_stack.push(ident);
3199                    }
3200
3201                    HydroNode::Tee { inner, .. } => {
3202                        let ret_ident = if let Some(built_idents) =
3203                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3204                        {
3205                            match builders_or_callback {
3206                                BuildersOrCallback::Builders(_) => {}
3207                                BuildersOrCallback::Callback(_, node_callback) => {
3208                                    node_callback(node, next_stmt_id);
3209                                }
3210                            }
3211
3212                            built_idents[0].clone()
3213                        } else {
3214                            // The inner node was already processed by transform_bottom_up,
3215                            // so its ident is on the stack
3216                            let inner_ident = ident_stack.pop().unwrap();
3217
3218                            let tee_ident =
3219                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3220
3221                            built_tees.insert(
3222                                inner.0.as_ref() as *const RefCell<HydroNode>,
3223                                vec![tee_ident.clone()],
3224                            );
3225
3226                            match builders_or_callback {
3227                                BuildersOrCallback::Builders(graph_builders) => {
3228                                    // NOTE: With `forward_ref`, the fold codegen may not have
3229                                    // run yet when we reach this tee, so `fold_hooked_idents`
3230                                    // might not contain the inner ident. In that case we won't
3231                                    // propagate the "hooked" status to the tee and the
3232                                    // downstream singleton batch will use the normal
3233                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3234                                    // This is not a soundness issue: the fallback hook still
3235                                    // produces correct behavior, just with a redundant decision
3236                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3237                                    // fix ordering so forward_ref folds are always processed
3238                                    // before their downstream tees.
3239                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3240                                        fold_hooked_idents.insert(tee_ident.to_string());
3241                                    }
3242                                    let builder = graph_builders.get_dfir_mut(&out_location);
3243                                    builder.add_dfir(
3244                                        parse_quote! {
3245                                            #tee_ident = #inner_ident -> tee();
3246                                        },
3247                                        None,
3248                                        Some(&next_stmt_id.to_string()),
3249                                    );
3250                                }
3251                                BuildersOrCallback::Callback(_, node_callback) => {
3252                                    node_callback(node, next_stmt_id);
3253                                }
3254                            }
3255
3256                            tee_ident
3257                        };
3258
3259                        // we consume a stmt id regardless of if we emit the tee() operator,
3260                        // so that during rewrites we touch all recipients of the tee()
3261
3262                        *next_stmt_id += 1;
3263                        ident_stack.push(ret_ident);
3264                    }
3265
3266                    HydroNode::Singleton { inner, .. } => {
3267                        let ret_ident = if let Some(built_idents) =
3268                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3269                        {
3270                            built_idents[0].clone()
3271                        } else {
3272                            let inner_ident = ident_stack.pop().unwrap();
3273
3274                            let singleton_ident =
3275                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3276
3277                            built_tees.insert(
3278                                inner.0.as_ref() as *const RefCell<HydroNode>,
3279                                vec![singleton_ident.clone()],
3280                            );
3281
3282                            match builders_or_callback {
3283                                BuildersOrCallback::Builders(graph_builders) => {
3284                                    let builder = graph_builders.get_dfir_mut(&out_location);
3285                                    builder.add_dfir(
3286                                        parse_quote! {
3287                                            #singleton_ident = #inner_ident -> singleton();
3288                                        },
3289                                        None,
3290                                        Some(&next_stmt_id.to_string()),
3291                                    );
3292                                }
3293                                BuildersOrCallback::Callback(_, node_callback) => {
3294                                    node_callback(node, next_stmt_id);
3295                                }
3296                            }
3297
3298                            singleton_ident
3299                        };
3300
3301                        // we consume a stmt id regardless of if we emit the singleton() operator,
3302                        // so that during rewrites we touch all recipients of the singleton()
3303                        *next_stmt_id += 1;
3304                        ident_stack.push(ret_ident);
3305                    }
3306
3307                    HydroNode::Partition {
3308                        inner, f, is_true, ..
3309                    } => {
3310                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3311                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3312                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3313                            match builders_or_callback {
3314                                BuildersOrCallback::Builders(_) => {}
3315                                BuildersOrCallback::Callback(_, node_callback) => {
3316                                    node_callback(node, next_stmt_id);
3317                                }
3318                            }
3319
3320                            let idx = if is_true { 0 } else { 1 };
3321                            built_idents[idx].clone()
3322                        } else {
3323                            // The inner node was already processed by transform_bottom_up,
3324                            // so its ident is on the stack
3325                            let inner_ident = ident_stack.pop().unwrap();
3326
3327                            let partition_ident = syn::Ident::new(
3328                                &format!("stream_{}_partition", *next_stmt_id),
3329                                Span::call_site(),
3330                            );
3331                            let true_ident = syn::Ident::new(
3332                                &format!("stream_{}_true", *next_stmt_id),
3333                                Span::call_site(),
3334                            );
3335                            let false_ident = syn::Ident::new(
3336                                &format!("stream_{}_false", *next_stmt_id),
3337                                Span::call_site(),
3338                            );
3339
3340                            built_tees.insert(
3341                                ptr,
3342                                vec![true_ident.clone(), false_ident.clone()],
3343                            );
3344
3345                            match builders_or_callback {
3346                                BuildersOrCallback::Builders(graph_builders) => {
3347                                    let builder = graph_builders.get_dfir_mut(&out_location);
3348                                    builder.add_dfir(
3349                                        parse_quote! {
3350                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3351                                            #true_ident = #partition_ident[0];
3352                                            #false_ident = #partition_ident[1];
3353                                        },
3354                                        None,
3355                                        Some(&next_stmt_id.to_string()),
3356                                    );
3357                                }
3358                                BuildersOrCallback::Callback(_, node_callback) => {
3359                                    node_callback(node, next_stmt_id);
3360                                }
3361                            }
3362
3363                            if is_true { true_ident } else { false_ident }
3364                        };
3365
3366                        *next_stmt_id += 1;
3367                        ident_stack.push(ret_ident);
3368                    }
3369
3370                    HydroNode::Chain { .. } => {
3371                        // Children are processed left-to-right, so second is on top
3372                        let second_ident = ident_stack.pop().unwrap();
3373                        let first_ident = ident_stack.pop().unwrap();
3374
3375                        let chain_ident =
3376                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3377
3378                        match builders_or_callback {
3379                            BuildersOrCallback::Builders(graph_builders) => {
3380                                let builder = graph_builders.get_dfir_mut(&out_location);
3381                                builder.add_dfir(
3382                                    parse_quote! {
3383                                        #chain_ident = chain();
3384                                        #first_ident -> [0]#chain_ident;
3385                                        #second_ident -> [1]#chain_ident;
3386                                    },
3387                                    None,
3388                                    Some(&next_stmt_id.to_string()),
3389                                );
3390                            }
3391                            BuildersOrCallback::Callback(_, node_callback) => {
3392                                node_callback(node, next_stmt_id);
3393                            }
3394                        }
3395
3396                        *next_stmt_id += 1;
3397
3398                        ident_stack.push(chain_ident);
3399                    }
3400
3401                    HydroNode::MergeOrdered { first, metadata, .. } => {
3402                        let second_ident = ident_stack.pop().unwrap();
3403                        let first_ident = ident_stack.pop().unwrap();
3404
3405                        let merge_ident =
3406                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3407
3408                        match builders_or_callback {
3409                            BuildersOrCallback::Builders(graph_builders) => {
3410                                graph_builders.merge_ordered(
3411                                    &first.metadata().location_id,
3412                                    first_ident,
3413                                    second_ident,
3414                                    &merge_ident,
3415                                    &first.metadata().collection_kind,
3416                                    &metadata.op,
3417                                    Some(&next_stmt_id.to_string()),
3418                                );
3419                            }
3420                            BuildersOrCallback::Callback(_, node_callback) => {
3421                                node_callback(node, next_stmt_id);
3422                            }
3423                        }
3424
3425                        *next_stmt_id += 1;
3426
3427                        ident_stack.push(merge_ident);
3428                    }
3429
3430                    HydroNode::ChainFirst { .. } => {
3431                        let second_ident = ident_stack.pop().unwrap();
3432                        let first_ident = ident_stack.pop().unwrap();
3433
3434                        let chain_ident =
3435                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3436
3437                        match builders_or_callback {
3438                            BuildersOrCallback::Builders(graph_builders) => {
3439                                let builder = graph_builders.get_dfir_mut(&out_location);
3440                                builder.add_dfir(
3441                                    parse_quote! {
3442                                        #chain_ident = chain_first_n(1);
3443                                        #first_ident -> [0]#chain_ident;
3444                                        #second_ident -> [1]#chain_ident;
3445                                    },
3446                                    None,
3447                                    Some(&next_stmt_id.to_string()),
3448                                );
3449                            }
3450                            BuildersOrCallback::Callback(_, node_callback) => {
3451                                node_callback(node, next_stmt_id);
3452                            }
3453                        }
3454
3455                        *next_stmt_id += 1;
3456
3457                        ident_stack.push(chain_ident);
3458                    }
3459
3460                    HydroNode::CrossSingleton { right, .. } => {
3461                        let right_ident = ident_stack.pop().unwrap();
3462                        let left_ident = ident_stack.pop().unwrap();
3463
3464                        let cross_ident =
3465                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3466
3467                        match builders_or_callback {
3468                            BuildersOrCallback::Builders(graph_builders) => {
3469                                let builder = graph_builders.get_dfir_mut(&out_location);
3470
3471                                if right.metadata().location_id.is_top_level()
3472                                    && right.metadata().collection_kind.is_bounded()
3473                                {
3474                                    builder.add_dfir(
3475                                        parse_quote! {
3476                                            #cross_ident = cross_singleton();
3477                                            #left_ident -> [input]#cross_ident;
3478                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
3479                                        },
3480                                        None,
3481                                        Some(&next_stmt_id.to_string()),
3482                                    );
3483                                } else {
3484                                    builder.add_dfir(
3485                                        parse_quote! {
3486                                            #cross_ident = cross_singleton();
3487                                            #left_ident -> [input]#cross_ident;
3488                                            #right_ident -> [single]#cross_ident;
3489                                        },
3490                                        None,
3491                                        Some(&next_stmt_id.to_string()),
3492                                    );
3493                                }
3494                            }
3495                            BuildersOrCallback::Callback(_, node_callback) => {
3496                                node_callback(node, next_stmt_id);
3497                            }
3498                        }
3499
3500                        *next_stmt_id += 1;
3501
3502                        ident_stack.push(cross_ident);
3503                    }
3504
3505                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3506                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3507                            parse_quote!(cross_join_multiset)
3508                        } else {
3509                            parse_quote!(join_multiset)
3510                        };
3511
3512                        let (HydroNode::CrossProduct { left, right, .. }
3513                        | HydroNode::Join { left, right, .. }) = node
3514                        else {
3515                            unreachable!()
3516                        };
3517
3518                        let is_top_level = left.metadata().location_id.is_top_level()
3519                            && right.metadata().location_id.is_top_level();
3520                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3521                            quote!('static)
3522                        } else {
3523                            quote!('tick)
3524                        };
3525
3526                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3527                            quote!('static)
3528                        } else {
3529                            quote!('tick)
3530                        };
3531
3532                        let right_ident = ident_stack.pop().unwrap();
3533                        let left_ident = ident_stack.pop().unwrap();
3534
3535                        let stream_ident =
3536                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3537
3538                        match builders_or_callback {
3539                            BuildersOrCallback::Builders(graph_builders) => {
3540                                let builder = graph_builders.get_dfir_mut(&out_location);
3541                                builder.add_dfir(
3542                                    if is_top_level {
3543                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3544                                        // a multiset_delta() to negate the replay behavior
3545                                        parse_quote! {
3546                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3547                                            #left_ident -> [0]#stream_ident;
3548                                            #right_ident -> [1]#stream_ident;
3549                                        }
3550                                    } else {
3551                                        parse_quote! {
3552                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3553                                            #left_ident -> [0]#stream_ident;
3554                                            #right_ident -> [1]#stream_ident;
3555                                        }
3556                                    }
3557                                    ,
3558                                    None,
3559                                    Some(&next_stmt_id.to_string()),
3560                                );
3561                            }
3562                            BuildersOrCallback::Callback(_, node_callback) => {
3563                                node_callback(node, next_stmt_id);
3564                            }
3565                        }
3566
3567                        *next_stmt_id += 1;
3568
3569                        ident_stack.push(stream_ident);
3570                    }
3571
3572                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3573                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3574                            parse_quote!(difference)
3575                        } else {
3576                            parse_quote!(anti_join)
3577                        };
3578
3579                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3580                            node
3581                        else {
3582                            unreachable!()
3583                        };
3584
3585                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3586                            quote!('static)
3587                        } else {
3588                            quote!('tick)
3589                        };
3590
3591                        let neg_ident = ident_stack.pop().unwrap();
3592                        let pos_ident = ident_stack.pop().unwrap();
3593
3594                        let stream_ident =
3595                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3596
3597                        match builders_or_callback {
3598                            BuildersOrCallback::Builders(graph_builders) => {
3599                                let builder = graph_builders.get_dfir_mut(&out_location);
3600                                builder.add_dfir(
3601                                    parse_quote! {
3602                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3603                                        #pos_ident -> [pos]#stream_ident;
3604                                        #neg_ident -> [neg]#stream_ident;
3605                                    },
3606                                    None,
3607                                    Some(&next_stmt_id.to_string()),
3608                                );
3609                            }
3610                            BuildersOrCallback::Callback(_, node_callback) => {
3611                                node_callback(node, next_stmt_id);
3612                            }
3613                        }
3614
3615                        *next_stmt_id += 1;
3616
3617                        ident_stack.push(stream_ident);
3618                    }
3619
3620                    HydroNode::JoinHalf { .. } => {
3621                        let HydroNode::JoinHalf { right, .. } = node else {
3622                            unreachable!()
3623                        };
3624
3625                        assert!(
3626                            right.metadata().collection_kind.is_bounded(),
3627                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3628                            right.metadata().collection_kind
3629                        );
3630
3631                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3632                            quote!('static)
3633                        } else {
3634                            quote!('tick)
3635                        };
3636
3637                        let build_ident = ident_stack.pop().unwrap();
3638                        let probe_ident = ident_stack.pop().unwrap();
3639
3640                        let stream_ident =
3641                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3642
3643                        match builders_or_callback {
3644                            BuildersOrCallback::Builders(graph_builders) => {
3645                                let builder = graph_builders.get_dfir_mut(&out_location);
3646                                builder.add_dfir(
3647                                    parse_quote! {
3648                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3649                                        #probe_ident -> [probe]#stream_ident;
3650                                        #build_ident -> [build]#stream_ident;
3651                                    },
3652                                    None,
3653                                    Some(&next_stmt_id.to_string()),
3654                                );
3655                            }
3656                            BuildersOrCallback::Callback(_, node_callback) => {
3657                                node_callback(node, next_stmt_id);
3658                            }
3659                        }
3660
3661                        *next_stmt_id += 1;
3662
3663                        ident_stack.push(stream_ident);
3664                    }
3665
3666                    HydroNode::ResolveFutures { .. } => {
3667                        let input_ident = ident_stack.pop().unwrap();
3668
3669                        let futures_ident =
3670                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3671
3672                        match builders_or_callback {
3673                            BuildersOrCallback::Builders(graph_builders) => {
3674                                let builder = graph_builders.get_dfir_mut(&out_location);
3675                                builder.add_dfir(
3676                                    parse_quote! {
3677                                        #futures_ident = #input_ident -> resolve_futures();
3678                                    },
3679                                    None,
3680                                    Some(&next_stmt_id.to_string()),
3681                                );
3682                            }
3683                            BuildersOrCallback::Callback(_, node_callback) => {
3684                                node_callback(node, next_stmt_id);
3685                            }
3686                        }
3687
3688                        *next_stmt_id += 1;
3689
3690                        ident_stack.push(futures_ident);
3691                    }
3692
3693                    HydroNode::ResolveFuturesBlocking { .. } => {
3694                        let input_ident = ident_stack.pop().unwrap();
3695
3696                        let futures_ident =
3697                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3698
3699                        match builders_or_callback {
3700                            BuildersOrCallback::Builders(graph_builders) => {
3701                                let builder = graph_builders.get_dfir_mut(&out_location);
3702                                builder.add_dfir(
3703                                    parse_quote! {
3704                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3705                                    },
3706                                    None,
3707                                    Some(&next_stmt_id.to_string()),
3708                                );
3709                            }
3710                            BuildersOrCallback::Callback(_, node_callback) => {
3711                                node_callback(node, next_stmt_id);
3712                            }
3713                        }
3714
3715                        *next_stmt_id += 1;
3716
3717                        ident_stack.push(futures_ident);
3718                    }
3719
3720                    HydroNode::ResolveFuturesOrdered { .. } => {
3721                        let input_ident = ident_stack.pop().unwrap();
3722
3723                        let futures_ident =
3724                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3725
3726                        match builders_or_callback {
3727                            BuildersOrCallback::Builders(graph_builders) => {
3728                                let builder = graph_builders.get_dfir_mut(&out_location);
3729                                builder.add_dfir(
3730                                    parse_quote! {
3731                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3732                                    },
3733                                    None,
3734                                    Some(&next_stmt_id.to_string()),
3735                                );
3736                            }
3737                            BuildersOrCallback::Callback(_, node_callback) => {
3738                                node_callback(node, next_stmt_id);
3739                            }
3740                        }
3741
3742                        *next_stmt_id += 1;
3743
3744                        ident_stack.push(futures_ident);
3745                    }
3746
3747                    HydroNode::Map { f, singleton_refs, .. } => {
3748                        // Pop input ident (pushed last by transform_children).
3749                        let input_ident = ident_stack.pop().unwrap();
3750                        // Pop singleton ref idents in reverse order (pushed before input).
3751                        let ref_idents = (0..singleton_refs.len())
3752                            .map(|_| ident_stack.pop().unwrap())
3753                            .collect::<Vec<_>>()
3754                            .into_iter()
3755                            .rev()
3756                            .collect::<Vec<_>>();
3757
3758                        let map_ident =
3759                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3760
3761                        match builders_or_callback {
3762                            BuildersOrCallback::Builders(graph_builders) => {
3763                                // Wrap the closure: assign local singleton ref idents to
3764                                // `#ref_ident` (the `singleton()` node's DFIR variable name).
3765                                let f_tokens = if singleton_refs.is_empty() {
3766                                    f.0.to_token_stream()
3767                                } else {
3768                                    let local_idents = singleton_refs
3769                                        .iter()
3770                                        .map(|(local_ident, _)| local_ident);
3771                                    let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
3772                                    let expr = &f.0;
3773                                    quote! {
3774                                        {
3775                                            #(
3776                                                let #local_idents = #hash #ref_idents;
3777                                            )*
3778                                            #expr
3779                                        }
3780                                    }
3781                                };
3782
3783                                let builder = graph_builders.get_dfir_mut(&out_location);
3784                                builder.add_dfir(
3785                                    parse_quote! {
3786                                        #map_ident = #input_ident -> map(#f_tokens);
3787                                    },
3788                                    None,
3789                                    Some(&next_stmt_id.to_string()),
3790                                );
3791                            }
3792                            BuildersOrCallback::Callback(_, node_callback) => {
3793                                node_callback(node, next_stmt_id);
3794                            }
3795                        }
3796
3797                        *next_stmt_id += 1;
3798
3799                        ident_stack.push(map_ident);
3800                    }
3801
3802                    HydroNode::FlatMap { f, .. } => {
3803                        let input_ident = ident_stack.pop().unwrap();
3804
3805                        let flat_map_ident =
3806                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3807
3808                        match builders_or_callback {
3809                            BuildersOrCallback::Builders(graph_builders) => {
3810                                let builder = graph_builders.get_dfir_mut(&out_location);
3811                                builder.add_dfir(
3812                                    parse_quote! {
3813                                        #flat_map_ident = #input_ident -> flat_map(#f);
3814                                    },
3815                                    None,
3816                                    Some(&next_stmt_id.to_string()),
3817                                );
3818                            }
3819                            BuildersOrCallback::Callback(_, node_callback) => {
3820                                node_callback(node, next_stmt_id);
3821                            }
3822                        }
3823
3824                        *next_stmt_id += 1;
3825
3826                        ident_stack.push(flat_map_ident);
3827                    }
3828
3829                    HydroNode::FlatMapStreamBlocking { f, .. } => {
3830                        let input_ident = ident_stack.pop().unwrap();
3831
3832                        let flat_map_stream_blocking_ident =
3833                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3834
3835                        match builders_or_callback {
3836                            BuildersOrCallback::Builders(graph_builders) => {
3837                                let builder = graph_builders.get_dfir_mut(&out_location);
3838                                builder.add_dfir(
3839                                    parse_quote! {
3840                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3841                                    },
3842                                    None,
3843                                    Some(&next_stmt_id.to_string()),
3844                                );
3845                            }
3846                            BuildersOrCallback::Callback(_, node_callback) => {
3847                                node_callback(node, next_stmt_id);
3848                            }
3849                        }
3850
3851                        *next_stmt_id += 1;
3852
3853                        ident_stack.push(flat_map_stream_blocking_ident);
3854                    }
3855
3856                    HydroNode::Filter { f, .. } => {
3857                        let input_ident = ident_stack.pop().unwrap();
3858
3859                        let filter_ident =
3860                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3861
3862                        match builders_or_callback {
3863                            BuildersOrCallback::Builders(graph_builders) => {
3864                                let builder = graph_builders.get_dfir_mut(&out_location);
3865                                builder.add_dfir(
3866                                    parse_quote! {
3867                                        #filter_ident = #input_ident -> filter(#f);
3868                                    },
3869                                    None,
3870                                    Some(&next_stmt_id.to_string()),
3871                                );
3872                            }
3873                            BuildersOrCallback::Callback(_, node_callback) => {
3874                                node_callback(node, next_stmt_id);
3875                            }
3876                        }
3877
3878                        *next_stmt_id += 1;
3879
3880                        ident_stack.push(filter_ident);
3881                    }
3882
3883                    HydroNode::FilterMap { f, .. } => {
3884                        let input_ident = ident_stack.pop().unwrap();
3885
3886                        let filter_map_ident =
3887                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3888
3889                        match builders_or_callback {
3890                            BuildersOrCallback::Builders(graph_builders) => {
3891                                let builder = graph_builders.get_dfir_mut(&out_location);
3892                                builder.add_dfir(
3893                                    parse_quote! {
3894                                        #filter_map_ident = #input_ident -> filter_map(#f);
3895                                    },
3896                                    None,
3897                                    Some(&next_stmt_id.to_string()),
3898                                );
3899                            }
3900                            BuildersOrCallback::Callback(_, node_callback) => {
3901                                node_callback(node, next_stmt_id);
3902                            }
3903                        }
3904
3905                        *next_stmt_id += 1;
3906
3907                        ident_stack.push(filter_map_ident);
3908                    }
3909
3910                    HydroNode::Sort { .. } => {
3911                        let input_ident = ident_stack.pop().unwrap();
3912
3913                        let sort_ident =
3914                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3915
3916                        match builders_or_callback {
3917                            BuildersOrCallback::Builders(graph_builders) => {
3918                                let builder = graph_builders.get_dfir_mut(&out_location);
3919                                builder.add_dfir(
3920                                    parse_quote! {
3921                                        #sort_ident = #input_ident -> sort();
3922                                    },
3923                                    None,
3924                                    Some(&next_stmt_id.to_string()),
3925                                );
3926                            }
3927                            BuildersOrCallback::Callback(_, node_callback) => {
3928                                node_callback(node, next_stmt_id);
3929                            }
3930                        }
3931
3932                        *next_stmt_id += 1;
3933
3934                        ident_stack.push(sort_ident);
3935                    }
3936
3937                    HydroNode::DeferTick { .. } => {
3938                        let input_ident = ident_stack.pop().unwrap();
3939
3940                        let defer_tick_ident =
3941                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3942
3943                        match builders_or_callback {
3944                            BuildersOrCallback::Builders(graph_builders) => {
3945                                let builder = graph_builders.get_dfir_mut(&out_location);
3946                                builder.add_dfir(
3947                                    parse_quote! {
3948                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3949                                    },
3950                                    None,
3951                                    Some(&next_stmt_id.to_string()),
3952                                );
3953                            }
3954                            BuildersOrCallback::Callback(_, node_callback) => {
3955                                node_callback(node, next_stmt_id);
3956                            }
3957                        }
3958
3959                        *next_stmt_id += 1;
3960
3961                        ident_stack.push(defer_tick_ident);
3962                    }
3963
3964                    HydroNode::Enumerate { input, .. } => {
3965                        let input_ident = ident_stack.pop().unwrap();
3966
3967                        let enumerate_ident =
3968                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3969
3970                        match builders_or_callback {
3971                            BuildersOrCallback::Builders(graph_builders) => {
3972                                let builder = graph_builders.get_dfir_mut(&out_location);
3973                                let lifetime = if input.metadata().location_id.is_top_level() {
3974                                    quote!('static)
3975                                } else {
3976                                    quote!('tick)
3977                                };
3978                                builder.add_dfir(
3979                                    parse_quote! {
3980                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3981                                    },
3982                                    None,
3983                                    Some(&next_stmt_id.to_string()),
3984                                );
3985                            }
3986                            BuildersOrCallback::Callback(_, node_callback) => {
3987                                node_callback(node, next_stmt_id);
3988                            }
3989                        }
3990
3991                        *next_stmt_id += 1;
3992
3993                        ident_stack.push(enumerate_ident);
3994                    }
3995
3996                    HydroNode::Inspect { f, .. } => {
3997                        let input_ident = ident_stack.pop().unwrap();
3998
3999                        let inspect_ident =
4000                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4001
4002                        match builders_or_callback {
4003                            BuildersOrCallback::Builders(graph_builders) => {
4004                                let builder = graph_builders.get_dfir_mut(&out_location);
4005                                builder.add_dfir(
4006                                    parse_quote! {
4007                                        #inspect_ident = #input_ident -> inspect(#f);
4008                                    },
4009                                    None,
4010                                    Some(&next_stmt_id.to_string()),
4011                                );
4012                            }
4013                            BuildersOrCallback::Callback(_, node_callback) => {
4014                                node_callback(node, next_stmt_id);
4015                            }
4016                        }
4017
4018                        *next_stmt_id += 1;
4019
4020                        ident_stack.push(inspect_ident);
4021                    }
4022
4023                    HydroNode::Unique { input, .. } => {
4024                        let input_ident = ident_stack.pop().unwrap();
4025
4026                        let unique_ident =
4027                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4028
4029                        match builders_or_callback {
4030                            BuildersOrCallback::Builders(graph_builders) => {
4031                                let builder = graph_builders.get_dfir_mut(&out_location);
4032                                let lifetime = if input.metadata().location_id.is_top_level() {
4033                                    quote!('static)
4034                                } else {
4035                                    quote!('tick)
4036                                };
4037
4038                                builder.add_dfir(
4039                                    parse_quote! {
4040                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4041                                    },
4042                                    None,
4043                                    Some(&next_stmt_id.to_string()),
4044                                );
4045                            }
4046                            BuildersOrCallback::Callback(_, node_callback) => {
4047                                node_callback(node, next_stmt_id);
4048                            }
4049                        }
4050
4051                        *next_stmt_id += 1;
4052
4053                        ident_stack.push(unique_ident);
4054                    }
4055
4056                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4057                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4058                            if input.metadata().location_id.is_top_level()
4059                                && input.metadata().collection_kind.is_bounded()
4060                            {
4061                                parse_quote!(fold_no_replay)
4062                            } else {
4063                                parse_quote!(fold)
4064                            }
4065                        } else if matches!(node, HydroNode::Scan { .. }) {
4066                            parse_quote!(scan)
4067                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4068                            parse_quote!(scan_async_blocking)
4069                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4070                            if input.metadata().location_id.is_top_level()
4071                                && input.metadata().collection_kind.is_bounded()
4072                            {
4073                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4074                            } else {
4075                                parse_quote!(fold_keyed)
4076                            }
4077                        } else {
4078                            unreachable!()
4079                        };
4080
4081                        let (HydroNode::Fold { input, .. }
4082                        | HydroNode::FoldKeyed { input, .. }
4083                        | HydroNode::Scan { input, .. }
4084                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4085                        else {
4086                            unreachable!()
4087                        };
4088
4089                        let lifetime = if input.metadata().location_id.is_top_level() {
4090                            quote!('static)
4091                        } else {
4092                            quote!('tick)
4093                        };
4094
4095                        let input_ident = ident_stack.pop().unwrap();
4096
4097                        let (HydroNode::Fold { init, acc, .. }
4098                        | HydroNode::FoldKeyed { init, acc, .. }
4099                        | HydroNode::Scan { init, acc, .. }
4100                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4101                        else {
4102                            unreachable!()
4103                        };
4104
4105                        let fold_ident =
4106                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4107
4108                        match builders_or_callback {
4109                            BuildersOrCallback::Builders(graph_builders) => {
4110                                if matches!(node, HydroNode::Fold { .. })
4111                                    && node.metadata().location_id.is_top_level()
4112                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4113                                    && graph_builders.singleton_intermediates()
4114                                    && !node.metadata().collection_kind.is_bounded()
4115                                {
4116                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4117                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4118                                        &input.metadata().location_id,
4119                                        &input_ident,
4120                                        &input.metadata().collection_kind,
4121                                        &node.metadata().op,
4122                                    );
4123
4124                                    let (effective_input, acc) = if let Some(ref hooked) = hooked_input_ident {
4125                                        let acc: syn::Expr = parse_quote!({
4126                                            let mut __inner = #acc;
4127                                            move |__state, __batch: Vec<_>| {
4128                                                if __batch.is_empty() {
4129                                                    return None;
4130                                                }
4131                                                for __value in __batch {
4132                                                    __inner(__state, __value);
4133                                                }
4134                                                Some(__state.clone())
4135                                            }
4136                                        });
4137                                        (hooked, acc)
4138                                    } else {
4139                                        let acc: syn::Expr = parse_quote!({
4140                                            let mut __inner = #acc;
4141                                            move |__state, __value| {
4142                                                __inner(__state, __value);
4143                                                Some(__state.clone())
4144                                            }
4145                                        });
4146                                        (&input_ident, acc)
4147                                    };
4148
4149                                    let builder = graph_builders.get_dfir_mut(&out_location);
4150                                    builder.add_dfir(
4151                                        parse_quote! {
4152                                            source_iter([(#init)()]) -> [0]#fold_ident;
4153                                            #effective_input -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
4154                                            #fold_ident = chain();
4155                                        },
4156                                        None,
4157                                        Some(&next_stmt_id.to_string()),
4158                                    );
4159
4160                                    if hooked_input_ident.is_some() {
4161                                        fold_hooked_idents.insert(fold_ident.to_string());
4162                                    }
4163                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4164                                    && node.metadata().location_id.is_top_level()
4165                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4166                                    && graph_builders.singleton_intermediates()
4167                                    && !node.metadata().collection_kind.is_bounded()
4168                                {
4169                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4170                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4171                                        &input.metadata().location_id,
4172                                        &input_ident,
4173                                        &input.metadata().collection_kind,
4174                                        &node.metadata().op,
4175                                    );
4176                                    let builder = graph_builders.get_dfir_mut(&out_location);
4177
4178                                    let acc: syn::Expr = parse_quote!({
4179                                        let mut __init = #init;
4180                                        let mut __inner = #acc;
4181                                        move |__state, __kv: (_, _)| {
4182                                            // TODO(shadaj): we can avoid the clone when the entry exists
4183                                            let __state = __state
4184                                                .entry(::std::clone::Clone::clone(&__kv.0))
4185                                                .or_insert_with(|| (__init)());
4186                                            __inner(__state, __kv.1);
4187                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4188                                        }
4189                                    });
4190
4191                                    if let Some(hooked_input_ident) = hooked_input_ident {
4192                                        builder.add_dfir(
4193                                            parse_quote! {
4194                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
4195                                            },
4196                                            None,
4197                                            Some(&next_stmt_id.to_string()),
4198                                        );
4199
4200                                        fold_hooked_idents.insert(fold_ident.to_string());
4201                                    } else {
4202                                        builder.add_dfir(
4203                                            parse_quote! {
4204                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
4205                                            },
4206                                            None,
4207                                            Some(&next_stmt_id.to_string()),
4208                                        );
4209                                    }
4210                                } else if (matches!(node, HydroNode::Fold { .. })
4211                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4212                                    && !node.metadata().location_id.is_top_level()
4213                                    && graph_builders.singleton_intermediates()
4214                                {
4215                                    let input_ref = match &*node {
4216                                        HydroNode::Fold { input, .. } => input,
4217                                        HydroNode::FoldKeyed { input, .. } => input,
4218                                        _ => unreachable!(),
4219                                    };
4220                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4221                                        &input_ref.metadata().location_id,
4222                                        &input_ident,
4223                                        &input_ref.metadata().collection_kind,
4224                                        &node.metadata().op,
4225                                    );
4226
4227                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4228                                    let builder = graph_builders.get_dfir_mut(&out_location);
4229                                    builder.add_dfir(
4230                                        parse_quote! {
4231                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init, #acc);
4232                                        },
4233                                        None,
4234                                        Some(&next_stmt_id.to_string()),
4235                                    );
4236                                } else {
4237                                    let builder = graph_builders.get_dfir_mut(&out_location);
4238                                    builder.add_dfir(
4239                                        parse_quote! {
4240                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
4241                                        },
4242                                        None,
4243                                        Some(&next_stmt_id.to_string()),
4244                                    );
4245                                }
4246                            }
4247                            BuildersOrCallback::Callback(_, node_callback) => {
4248                                node_callback(node, next_stmt_id);
4249                            }
4250                        }
4251
4252                        *next_stmt_id += 1;
4253
4254                        ident_stack.push(fold_ident);
4255                    }
4256
4257                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4258                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4259                            if input.metadata().location_id.is_top_level()
4260                                && input.metadata().collection_kind.is_bounded()
4261                            {
4262                                parse_quote!(reduce_no_replay)
4263                            } else {
4264                                parse_quote!(reduce)
4265                            }
4266                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4267                            if input.metadata().location_id.is_top_level()
4268                                && input.metadata().collection_kind.is_bounded()
4269                            {
4270                                todo!(
4271                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4272                                )
4273                            } else {
4274                                parse_quote!(reduce_keyed)
4275                            }
4276                        } else {
4277                            unreachable!()
4278                        };
4279
4280                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4281                        else {
4282                            unreachable!()
4283                        };
4284
4285                        let lifetime = if input.metadata().location_id.is_top_level() {
4286                            quote!('static)
4287                        } else {
4288                            quote!('tick)
4289                        };
4290
4291                        let input_ident = ident_stack.pop().unwrap();
4292
4293                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4294                        else {
4295                            unreachable!()
4296                        };
4297
4298                        let reduce_ident =
4299                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4300
4301                        match builders_or_callback {
4302                            BuildersOrCallback::Builders(graph_builders) => {
4303                                if matches!(node, HydroNode::Reduce { .. })
4304                                    && node.metadata().location_id.is_top_level()
4305                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4306                                    && graph_builders.singleton_intermediates()
4307                                    && !node.metadata().collection_kind.is_bounded()
4308                                {
4309                                    todo!(
4310                                        "Reduce with optional intermediates is not yet supported in simulator"
4311                                    );
4312                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4313                                    && node.metadata().location_id.is_top_level()
4314                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4315                                    && graph_builders.singleton_intermediates()
4316                                    && !node.metadata().collection_kind.is_bounded()
4317                                {
4318                                    todo!(
4319                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4320                                    );
4321                                } else {
4322                                    let builder = graph_builders.get_dfir_mut(&out_location);
4323                                    builder.add_dfir(
4324                                        parse_quote! {
4325                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
4326                                        },
4327                                        None,
4328                                        Some(&next_stmt_id.to_string()),
4329                                    );
4330                                }
4331                            }
4332                            BuildersOrCallback::Callback(_, node_callback) => {
4333                                node_callback(node, next_stmt_id);
4334                            }
4335                        }
4336
4337                        *next_stmt_id += 1;
4338
4339                        ident_stack.push(reduce_ident);
4340                    }
4341
4342                    HydroNode::ReduceKeyedWatermark {
4343                        f,
4344                        input,
4345                        metadata,
4346                        ..
4347                    } => {
4348                        let lifetime = if input.metadata().location_id.is_top_level() {
4349                            quote!('static)
4350                        } else {
4351                            quote!('tick)
4352                        };
4353
4354                        // watermark is processed second, so it's on top
4355                        let watermark_ident = ident_stack.pop().unwrap();
4356                        let input_ident = ident_stack.pop().unwrap();
4357
4358                        let chain_ident = syn::Ident::new(
4359                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4360                            Span::call_site(),
4361                        );
4362
4363                        let fold_ident =
4364                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4365
4366                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4367                            && input.metadata().collection_kind.is_bounded()
4368                        {
4369                            parse_quote!(fold_no_replay)
4370                        } else {
4371                            parse_quote!(fold)
4372                        };
4373
4374                        match builders_or_callback {
4375                            BuildersOrCallback::Builders(graph_builders) => {
4376                                if metadata.location_id.is_top_level()
4377                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4378                                    && graph_builders.singleton_intermediates()
4379                                    && !metadata.collection_kind.is_bounded()
4380                                {
4381                                    todo!(
4382                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4383                                    )
4384                                } else {
4385                                    let builder = graph_builders.get_dfir_mut(&out_location);
4386                                    builder.add_dfir(
4387                                        parse_quote! {
4388                                            #chain_ident = chain();
4389                                            #input_ident
4390                                                -> map(|x| (Some(x), None))
4391                                                -> [0]#chain_ident;
4392                                            #watermark_ident
4393                                                -> map(|watermark| (None, Some(watermark)))
4394                                                -> [1]#chain_ident;
4395
4396                                            #fold_ident = #chain_ident
4397                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4398                                                    let __reduce_keyed_fn = #f;
4399                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4400                                                        if let Some((k, v)) = opt_payload {
4401                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4402                                                                if k < curr_watermark {
4403                                                                    return;
4404                                                                }
4405                                                            }
4406                                                            match map.entry(k) {
4407                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4408                                                                    e.insert(v);
4409                                                                }
4410                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4411                                                                    __reduce_keyed_fn(e.get_mut(), v);
4412                                                                }
4413                                                            }
4414                                                        } else {
4415                                                            let watermark = opt_watermark.unwrap();
4416                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4417                                                                if watermark <= curr_watermark {
4418                                                                    return;
4419                                                                }
4420                                                            }
4421                                                            map.retain(|k, _| *k >= watermark);
4422                                                            *opt_curr_watermark = Some(watermark);
4423                                                        }
4424                                                    }
4425                                                })
4426                                                -> flat_map(|(map, _curr_watermark)| map);
4427                                        },
4428                                        None,
4429                                        Some(&next_stmt_id.to_string()),
4430                                    );
4431                                }
4432                            }
4433                            BuildersOrCallback::Callback(_, node_callback) => {
4434                                node_callback(node, next_stmt_id);
4435                            }
4436                        }
4437
4438                        *next_stmt_id += 1;
4439
4440                        ident_stack.push(fold_ident);
4441                    }
4442
4443                    HydroNode::Network {
4444                        networking_info,
4445                        serialize_fn: serialize_pipeline,
4446                        instantiate_fn,
4447                        deserialize_fn: deserialize_pipeline,
4448                        input,
4449                        ..
4450                    } => {
4451                        let input_ident = ident_stack.pop().unwrap();
4452
4453                        let receiver_stream_ident =
4454                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4455
4456                        match builders_or_callback {
4457                            BuildersOrCallback::Builders(graph_builders) => {
4458                                let (sink_expr, source_expr) = match instantiate_fn {
4459                                    DebugInstantiate::Building => (
4460                                        syn::parse_quote!(DUMMY_SINK),
4461                                        syn::parse_quote!(DUMMY_SOURCE),
4462                                    ),
4463
4464                                    DebugInstantiate::Finalized(finalized) => {
4465                                        (finalized.sink.clone(), finalized.source.clone())
4466                                    }
4467                                };
4468
4469                                graph_builders.create_network(
4470                                    &input.metadata().location_id,
4471                                    &out_location,
4472                                    input_ident,
4473                                    &receiver_stream_ident,
4474                                    serialize_pipeline.as_ref(),
4475                                    sink_expr,
4476                                    source_expr,
4477                                    deserialize_pipeline.as_ref(),
4478                                    *next_stmt_id,
4479                                    networking_info,
4480                                );
4481                            }
4482                            BuildersOrCallback::Callback(_, node_callback) => {
4483                                node_callback(node, next_stmt_id);
4484                            }
4485                        }
4486
4487                        *next_stmt_id += 1;
4488
4489                        ident_stack.push(receiver_stream_ident);
4490                    }
4491
4492                    HydroNode::ExternalInput {
4493                        instantiate_fn,
4494                        deserialize_fn: deserialize_pipeline,
4495                        ..
4496                    } => {
4497                        let receiver_stream_ident =
4498                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4499
4500                        match builders_or_callback {
4501                            BuildersOrCallback::Builders(graph_builders) => {
4502                                let (_, source_expr) = match instantiate_fn {
4503                                    DebugInstantiate::Building => (
4504                                        syn::parse_quote!(DUMMY_SINK),
4505                                        syn::parse_quote!(DUMMY_SOURCE),
4506                                    ),
4507
4508                                    DebugInstantiate::Finalized(finalized) => {
4509                                        (finalized.sink.clone(), finalized.source.clone())
4510                                    }
4511                                };
4512
4513                                graph_builders.create_external_source(
4514                                    &out_location,
4515                                    source_expr,
4516                                    &receiver_stream_ident,
4517                                    deserialize_pipeline.as_ref(),
4518                                    *next_stmt_id,
4519                                );
4520                            }
4521                            BuildersOrCallback::Callback(_, node_callback) => {
4522                                node_callback(node, next_stmt_id);
4523                            }
4524                        }
4525
4526                        *next_stmt_id += 1;
4527
4528                        ident_stack.push(receiver_stream_ident);
4529                    }
4530
4531                    HydroNode::Counter {
4532                        tag,
4533                        duration,
4534                        prefix,
4535                        ..
4536                    } => {
4537                        let input_ident = ident_stack.pop().unwrap();
4538
4539                        let counter_ident =
4540                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4541
4542                        match builders_or_callback {
4543                            BuildersOrCallback::Builders(graph_builders) => {
4544                                let arg = format!("{}({})", prefix, tag);
4545                                let builder = graph_builders.get_dfir_mut(&out_location);
4546                                builder.add_dfir(
4547                                    parse_quote! {
4548                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4549                                    },
4550                                    None,
4551                                    Some(&next_stmt_id.to_string()),
4552                                );
4553                            }
4554                            BuildersOrCallback::Callback(_, node_callback) => {
4555                                node_callback(node, next_stmt_id);
4556                            }
4557                        }
4558
4559                        *next_stmt_id += 1;
4560
4561                        ident_stack.push(counter_ident);
4562                    }
4563                }
4564            },
4565            seen_tees,
4566            false,
4567        );
4568
4569        ident_stack
4570            .pop()
4571            .expect("ident_stack should have exactly one element after traversal")
4572    }
4573
4574    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4575        match self {
4576            HydroNode::Placeholder => {
4577                panic!()
4578            }
4579            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4580            HydroNode::Source { source, .. } => match source {
4581                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4582                HydroSource::ExternalNetwork()
4583                | HydroSource::Spin()
4584                | HydroSource::ClusterMembers(_, _)
4585                | HydroSource::Embedded(_)
4586                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4587            },
4588            HydroNode::SingletonSource { value, .. } => {
4589                transform(value);
4590            }
4591            HydroNode::CycleSource { .. }
4592            | HydroNode::Tee { .. }
4593            | HydroNode::Singleton { .. }
4594            | HydroNode::YieldConcat { .. }
4595            | HydroNode::BeginAtomic { .. }
4596            | HydroNode::EndAtomic { .. }
4597            | HydroNode::Batch { .. }
4598            | HydroNode::Chain { .. }
4599            | HydroNode::MergeOrdered { .. }
4600            | HydroNode::ChainFirst { .. }
4601            | HydroNode::CrossProduct { .. }
4602            | HydroNode::CrossSingleton { .. }
4603            | HydroNode::ResolveFutures { .. }
4604            | HydroNode::ResolveFuturesBlocking { .. }
4605            | HydroNode::ResolveFuturesOrdered { .. }
4606            | HydroNode::Join { .. }
4607            | HydroNode::JoinHalf { .. }
4608            | HydroNode::Difference { .. }
4609            | HydroNode::AntiJoin { .. }
4610            | HydroNode::DeferTick { .. }
4611            | HydroNode::Enumerate { .. }
4612            | HydroNode::Unique { .. }
4613            | HydroNode::Sort { .. } => {}
4614            HydroNode::Map { f, .. }
4615            | HydroNode::FlatMap { f, .. }
4616            | HydroNode::FlatMapStreamBlocking { f, .. }
4617            | HydroNode::Filter { f, .. }
4618            | HydroNode::FilterMap { f, .. }
4619            | HydroNode::Inspect { f, .. }
4620            | HydroNode::Partition { f, .. }
4621            | HydroNode::Reduce { f, .. }
4622            | HydroNode::ReduceKeyed { f, .. }
4623            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4624                transform(f);
4625            }
4626            HydroNode::Fold { init, acc, .. }
4627            | HydroNode::Scan { init, acc, .. }
4628            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4629            | HydroNode::FoldKeyed { init, acc, .. } => {
4630                transform(init);
4631                transform(acc);
4632            }
4633            HydroNode::Network {
4634                serialize_fn,
4635                deserialize_fn,
4636                ..
4637            } => {
4638                if let Some(serialize_fn) = serialize_fn {
4639                    transform(serialize_fn);
4640                }
4641                if let Some(deserialize_fn) = deserialize_fn {
4642                    transform(deserialize_fn);
4643                }
4644            }
4645            HydroNode::ExternalInput { deserialize_fn, .. } => {
4646                if let Some(deserialize_fn) = deserialize_fn {
4647                    transform(deserialize_fn);
4648                }
4649            }
4650            HydroNode::Counter { duration, .. } => {
4651                transform(duration);
4652            }
4653        }
4654    }
4655
4656    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4657        &self.metadata().op
4658    }
4659
4660    pub fn metadata(&self) -> &HydroIrMetadata {
4661        match self {
4662            HydroNode::Placeholder => {
4663                panic!()
4664            }
4665            HydroNode::Cast { metadata, .. }
4666            | HydroNode::ObserveNonDet { metadata, .. }
4667            | HydroNode::Source { metadata, .. }
4668            | HydroNode::SingletonSource { metadata, .. }
4669            | HydroNode::CycleSource { metadata, .. }
4670            | HydroNode::Tee { metadata, .. }
4671            | HydroNode::Singleton { metadata, .. }
4672            | HydroNode::Partition { metadata, .. }
4673            | HydroNode::YieldConcat { metadata, .. }
4674            | HydroNode::BeginAtomic { metadata, .. }
4675            | HydroNode::EndAtomic { metadata, .. }
4676            | HydroNode::Batch { metadata, .. }
4677            | HydroNode::Chain { metadata, .. }
4678            | HydroNode::MergeOrdered { metadata, .. }
4679            | HydroNode::ChainFirst { metadata, .. }
4680            | HydroNode::CrossProduct { metadata, .. }
4681            | HydroNode::CrossSingleton { metadata, .. }
4682            | HydroNode::Join { metadata, .. }
4683            | HydroNode::JoinHalf { metadata, .. }
4684            | HydroNode::Difference { metadata, .. }
4685            | HydroNode::AntiJoin { metadata, .. }
4686            | HydroNode::ResolveFutures { metadata, .. }
4687            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4688            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4689            | HydroNode::Map { metadata, .. }
4690            | HydroNode::FlatMap { metadata, .. }
4691            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4692            | HydroNode::Filter { metadata, .. }
4693            | HydroNode::FilterMap { metadata, .. }
4694            | HydroNode::DeferTick { metadata, .. }
4695            | HydroNode::Enumerate { metadata, .. }
4696            | HydroNode::Inspect { metadata, .. }
4697            | HydroNode::Unique { metadata, .. }
4698            | HydroNode::Sort { metadata, .. }
4699            | HydroNode::Scan { metadata, .. }
4700            | HydroNode::ScanAsyncBlocking { metadata, .. }
4701            | HydroNode::Fold { metadata, .. }
4702            | HydroNode::FoldKeyed { metadata, .. }
4703            | HydroNode::Reduce { metadata, .. }
4704            | HydroNode::ReduceKeyed { metadata, .. }
4705            | HydroNode::ReduceKeyedWatermark { metadata, .. }
4706            | HydroNode::ExternalInput { metadata, .. }
4707            | HydroNode::Network { metadata, .. }
4708            | HydroNode::Counter { metadata, .. } => metadata,
4709        }
4710    }
4711
4712    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4713        &mut self.metadata_mut().op
4714    }
4715
4716    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4717        match self {
4718            HydroNode::Placeholder => {
4719                panic!()
4720            }
4721            HydroNode::Cast { metadata, .. }
4722            | HydroNode::ObserveNonDet { metadata, .. }
4723            | HydroNode::Source { metadata, .. }
4724            | HydroNode::SingletonSource { metadata, .. }
4725            | HydroNode::CycleSource { metadata, .. }
4726            | HydroNode::Tee { metadata, .. }
4727            | HydroNode::Singleton { metadata, .. }
4728            | HydroNode::Partition { metadata, .. }
4729            | HydroNode::YieldConcat { metadata, .. }
4730            | HydroNode::BeginAtomic { metadata, .. }
4731            | HydroNode::EndAtomic { metadata, .. }
4732            | HydroNode::Batch { metadata, .. }
4733            | HydroNode::Chain { metadata, .. }
4734            | HydroNode::MergeOrdered { metadata, .. }
4735            | HydroNode::ChainFirst { metadata, .. }
4736            | HydroNode::CrossProduct { metadata, .. }
4737            | HydroNode::CrossSingleton { metadata, .. }
4738            | HydroNode::Join { metadata, .. }
4739            | HydroNode::JoinHalf { metadata, .. }
4740            | HydroNode::Difference { metadata, .. }
4741            | HydroNode::AntiJoin { metadata, .. }
4742            | HydroNode::ResolveFutures { metadata, .. }
4743            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4744            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4745            | HydroNode::Map { metadata, .. }
4746            | HydroNode::FlatMap { metadata, .. }
4747            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4748            | HydroNode::Filter { metadata, .. }
4749            | HydroNode::FilterMap { metadata, .. }
4750            | HydroNode::DeferTick { metadata, .. }
4751            | HydroNode::Enumerate { metadata, .. }
4752            | HydroNode::Inspect { metadata, .. }
4753            | HydroNode::Unique { metadata, .. }
4754            | HydroNode::Sort { metadata, .. }
4755            | HydroNode::Scan { metadata, .. }
4756            | HydroNode::ScanAsyncBlocking { metadata, .. }
4757            | HydroNode::Fold { metadata, .. }
4758            | HydroNode::FoldKeyed { metadata, .. }
4759            | HydroNode::Reduce { metadata, .. }
4760            | HydroNode::ReduceKeyed { metadata, .. }
4761            | HydroNode::ReduceKeyedWatermark { metadata, .. }
4762            | HydroNode::ExternalInput { metadata, .. }
4763            | HydroNode::Network { metadata, .. }
4764            | HydroNode::Counter { metadata, .. } => metadata,
4765        }
4766    }
4767
4768    pub fn input(&self) -> Vec<&HydroNode> {
4769        match self {
4770            HydroNode::Placeholder => {
4771                panic!()
4772            }
4773            HydroNode::Source { .. }
4774            | HydroNode::SingletonSource { .. }
4775            | HydroNode::ExternalInput { .. }
4776            | HydroNode::CycleSource { .. }
4777            | HydroNode::Tee { .. }
4778            | HydroNode::Singleton { .. }
4779            | HydroNode::Partition { .. } => {
4780                // Tee/Partition should find their input in separate special ways
4781                vec![]
4782            }
4783            HydroNode::Cast { inner, .. }
4784            | HydroNode::ObserveNonDet { inner, .. }
4785            | HydroNode::YieldConcat { inner, .. }
4786            | HydroNode::BeginAtomic { inner, .. }
4787            | HydroNode::EndAtomic { inner, .. }
4788            | HydroNode::Batch { inner, .. } => {
4789                vec![inner]
4790            }
4791            HydroNode::Chain { first, second, .. } => {
4792                vec![first, second]
4793            }
4794            HydroNode::MergeOrdered { first, second, .. } => {
4795                vec![first, second]
4796            }
4797            HydroNode::ChainFirst { first, second, .. } => {
4798                vec![first, second]
4799            }
4800            HydroNode::CrossProduct { left, right, .. }
4801            | HydroNode::CrossSingleton { left, right, .. }
4802            | HydroNode::Join { left, right, .. }
4803            | HydroNode::JoinHalf { left, right, .. } => {
4804                vec![left, right]
4805            }
4806            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4807                vec![pos, neg]
4808            }
4809            HydroNode::Map { input, .. }
4810            | HydroNode::FlatMap { input, .. }
4811            | HydroNode::FlatMapStreamBlocking { input, .. }
4812            | HydroNode::Filter { input, .. }
4813            | HydroNode::FilterMap { input, .. }
4814            | HydroNode::Sort { input, .. }
4815            | HydroNode::DeferTick { input, .. }
4816            | HydroNode::Enumerate { input, .. }
4817            | HydroNode::Inspect { input, .. }
4818            | HydroNode::Unique { input, .. }
4819            | HydroNode::Network { input, .. }
4820            | HydroNode::Counter { input, .. }
4821            | HydroNode::ResolveFutures { input, .. }
4822            | HydroNode::ResolveFuturesBlocking { input, .. }
4823            | HydroNode::ResolveFuturesOrdered { input, .. }
4824            | HydroNode::Fold { input, .. }
4825            | HydroNode::FoldKeyed { input, .. }
4826            | HydroNode::Reduce { input, .. }
4827            | HydroNode::ReduceKeyed { input, .. }
4828            | HydroNode::Scan { input, .. }
4829            | HydroNode::ScanAsyncBlocking { input, .. } => {
4830                vec![input]
4831            }
4832            HydroNode::ReduceKeyedWatermark {
4833                input, watermark, ..
4834            } => {
4835                vec![input, watermark]
4836            }
4837        }
4838    }
4839
4840    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4841        self.input()
4842            .iter()
4843            .map(|input_node| input_node.metadata())
4844            .collect()
4845    }
4846
4847    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4848    /// has other live references, meaning the upstream is already driven
4849    /// by another consumer and does not need a Null sink.
4850    pub fn is_shared_with_others(&self) -> bool {
4851        match self {
4852            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4853                Rc::strong_count(&inner.0) > 1
4854            }
4855            // A zero-output singleton() is valid in DFIR (it drains itself at
4856            // end of tick), so it doesn't need to be driven by another consumer.
4857            HydroNode::Singleton { .. } => false,
4858            _ => false,
4859        }
4860    }
4861
4862    pub fn print_root(&self) -> String {
4863        match self {
4864            HydroNode::Placeholder => {
4865                panic!()
4866            }
4867            HydroNode::Cast { .. } => "Cast()".to_owned(),
4868            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4869            HydroNode::Source { source, .. } => format!("Source({:?})", source),
4870            HydroNode::SingletonSource {
4871                value,
4872                first_tick_only,
4873                ..
4874            } => format!(
4875                "SingletonSource({:?}, first_tick_only={})",
4876                value, first_tick_only
4877            ),
4878            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4879            HydroNode::Tee { inner, .. } => {
4880                format!("Tee({})", inner.0.borrow().print_root())
4881            }
4882            HydroNode::Singleton { inner, .. } => {
4883                format!("Singleton({})", inner.0.borrow().print_root())
4884            }
4885            HydroNode::Partition { f, is_true, .. } => {
4886                format!("Partition({:?}, is_true={})", f, is_true)
4887            }
4888            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4889            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4890            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4891            HydroNode::Batch { .. } => "Batch()".to_owned(),
4892            HydroNode::Chain { first, second, .. } => {
4893                format!("Chain({}, {})", first.print_root(), second.print_root())
4894            }
4895            HydroNode::MergeOrdered { first, second, .. } => {
4896                format!(
4897                    "MergeOrdered({}, {})",
4898                    first.print_root(),
4899                    second.print_root()
4900                )
4901            }
4902            HydroNode::ChainFirst { first, second, .. } => {
4903                format!(
4904                    "ChainFirst({}, {})",
4905                    first.print_root(),
4906                    second.print_root()
4907                )
4908            }
4909            HydroNode::CrossProduct { left, right, .. } => {
4910                format!(
4911                    "CrossProduct({}, {})",
4912                    left.print_root(),
4913                    right.print_root()
4914                )
4915            }
4916            HydroNode::CrossSingleton { left, right, .. } => {
4917                format!(
4918                    "CrossSingleton({}, {})",
4919                    left.print_root(),
4920                    right.print_root()
4921                )
4922            }
4923            HydroNode::Join { left, right, .. } => {
4924                format!("Join({}, {})", left.print_root(), right.print_root())
4925            }
4926            HydroNode::JoinHalf { left, right, .. } => {
4927                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4928            }
4929            HydroNode::Difference { pos, neg, .. } => {
4930                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4931            }
4932            HydroNode::AntiJoin { pos, neg, .. } => {
4933                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4934            }
4935            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4936            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4937            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4938            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4939            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4940            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4941            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4942            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4943            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4944            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4945            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4946            HydroNode::Unique { .. } => "Unique()".to_owned(),
4947            HydroNode::Sort { .. } => "Sort()".to_owned(),
4948            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4949            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4950            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4951                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4952            }
4953            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4954            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4955            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4956            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4957            HydroNode::Network { .. } => "Network()".to_owned(),
4958            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4959            HydroNode::Counter { tag, duration, .. } => {
4960                format!("Counter({:?}, {:?})", tag, duration)
4961            }
4962        }
4963    }
4964}
4965
4966#[cfg(feature = "build")]
4967fn instantiate_network<'a, D>(
4968    env: &mut D::InstantiateEnv,
4969    from_location: &LocationId,
4970    to_location: &LocationId,
4971    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4972    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4973    name: Option<&str>,
4974    networking_info: &crate::networking::NetworkingInfo,
4975) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4976where
4977    D: Deploy<'a>,
4978{
4979    let ((sink, source), connect_fn) = match (from_location, to_location) {
4980        (&LocationId::Process(from), &LocationId::Process(to)) => {
4981            let from_node = processes
4982                .get(from)
4983                .unwrap_or_else(|| {
4984                    panic!("A process used in the graph was not instantiated: {}", from)
4985                })
4986                .clone();
4987            let to_node = processes
4988                .get(to)
4989                .unwrap_or_else(|| {
4990                    panic!("A process used in the graph was not instantiated: {}", to)
4991                })
4992                .clone();
4993
4994            let sink_port = from_node.next_port();
4995            let source_port = to_node.next_port();
4996
4997            (
4998                D::o2o_sink_source(
4999                    env,
5000                    &from_node,
5001                    &sink_port,
5002                    &to_node,
5003                    &source_port,
5004                    name,
5005                    networking_info,
5006                ),
5007                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5008            )
5009        }
5010        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5011            let from_node = processes
5012                .get(from)
5013                .unwrap_or_else(|| {
5014                    panic!("A process used in the graph was not instantiated: {}", from)
5015                })
5016                .clone();
5017            let to_node = clusters
5018                .get(to)
5019                .unwrap_or_else(|| {
5020                    panic!("A cluster used in the graph was not instantiated: {}", to)
5021                })
5022                .clone();
5023
5024            let sink_port = from_node.next_port();
5025            let source_port = to_node.next_port();
5026
5027            (
5028                D::o2m_sink_source(
5029                    env,
5030                    &from_node,
5031                    &sink_port,
5032                    &to_node,
5033                    &source_port,
5034                    name,
5035                    networking_info,
5036                ),
5037                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5038            )
5039        }
5040        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5041            let from_node = clusters
5042                .get(from)
5043                .unwrap_or_else(|| {
5044                    panic!("A cluster used in the graph was not instantiated: {}", from)
5045                })
5046                .clone();
5047            let to_node = processes
5048                .get(to)
5049                .unwrap_or_else(|| {
5050                    panic!("A process used in the graph was not instantiated: {}", to)
5051                })
5052                .clone();
5053
5054            let sink_port = from_node.next_port();
5055            let source_port = to_node.next_port();
5056
5057            (
5058                D::m2o_sink_source(
5059                    env,
5060                    &from_node,
5061                    &sink_port,
5062                    &to_node,
5063                    &source_port,
5064                    name,
5065                    networking_info,
5066                ),
5067                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5068            )
5069        }
5070        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5071            let from_node = clusters
5072                .get(from)
5073                .unwrap_or_else(|| {
5074                    panic!("A cluster used in the graph was not instantiated: {}", from)
5075                })
5076                .clone();
5077            let to_node = clusters
5078                .get(to)
5079                .unwrap_or_else(|| {
5080                    panic!("A cluster used in the graph was not instantiated: {}", to)
5081                })
5082                .clone();
5083
5084            let sink_port = from_node.next_port();
5085            let source_port = to_node.next_port();
5086
5087            (
5088                D::m2m_sink_source(
5089                    env,
5090                    &from_node,
5091                    &sink_port,
5092                    &to_node,
5093                    &source_port,
5094                    name,
5095                    networking_info,
5096                ),
5097                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5098            )
5099        }
5100        (LocationId::Tick(_, _), _) => panic!(),
5101        (_, LocationId::Tick(_, _)) => panic!(),
5102        (LocationId::Atomic(_), _) => panic!(),
5103        (_, LocationId::Atomic(_)) => panic!(),
5104    };
5105    (sink, source, connect_fn)
5106}
5107
5108#[cfg(test)]
5109mod serde_test;
5110
5111#[cfg(test)]
5112mod test {
5113    use std::mem::size_of;
5114
5115    use stageleft::{QuotedWithContext, q};
5116
5117    use super::*;
5118
5119    #[test]
5120    #[cfg_attr(
5121        not(feature = "build"),
5122        ignore = "expects inclusion of feature-gated fields"
5123    )]
5124    fn hydro_node_size() {
5125        assert_eq!(size_of::<HydroNode>(), 248);
5126    }
5127
5128    #[test]
5129    #[cfg_attr(
5130        not(feature = "build"),
5131        ignore = "expects inclusion of feature-gated fields"
5132    )]
5133    fn hydro_root_size() {
5134        assert_eq!(size_of::<HydroRoot>(), 136);
5135    }
5136
5137    #[test]
5138    fn test_simplify_q_macro_basic() {
5139        // Test basic non-q! expression
5140        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5141        let result = simplify_q_macro(simple_expr.clone());
5142        assert_eq!(result, simple_expr);
5143    }
5144
5145    #[test]
5146    fn test_simplify_q_macro_actual_stageleft_call() {
5147        // Test a simplified version of what a real stageleft call might look like
5148        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5149        let result = simplify_q_macro(stageleft_call);
5150        // This should be processed by our visitor and simplified to q!(...)
5151        // since we detect the stageleft::runtime_support::fn_* pattern
5152        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5153    }
5154
5155    #[test]
5156    fn test_closure_no_pipe_at_start() {
5157        // Test a closure that does not start with a pipe
5158        let stageleft_call = q!({
5159            let foo = 123;
5160            move |b: usize| b + foo
5161        })
5162        .splice_fn1_ctx(&());
5163        let result = simplify_q_macro(stageleft_call);
5164        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5165    }
5166}