1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45 serializer.serialize_str(&self.to_string())
46 }
47}
48
49impl From<syn::Expr> for DebugExpr {
50 fn from(expr: syn::Expr) -> Self {
51 Self(Box::new(expr))
52 }
53}
54
55impl Deref for DebugExpr {
56 type Target = syn::Expr;
57
58 fn deref(&self) -> &Self::Target {
59 &self.0
60 }
61}
62
63impl ToTokens for DebugExpr {
64 fn to_tokens(&self, tokens: &mut TokenStream) {
65 self.0.to_tokens(tokens);
66 }
67}
68
69impl Debug for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "{}", self.0.to_token_stream())
72 }
73}
74
75impl Display for DebugExpr {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 let original = self.0.as_ref().clone();
78 let simplified = simplify_q_macro(original);
79
80 write!(f, "q!({})", quote::quote!(#simplified))
83 }
84}
85
86fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88 let mut simplifier = QMacroSimplifier::new();
91 simplifier.visit_expr_mut(&mut expr);
92
93 if let Some(simplified) = simplifier.simplified_result {
95 simplified
96 } else {
97 expr
98 }
99}
100
101#[derive(Default)]
103pub struct QMacroSimplifier {
104 pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108 pub fn new() -> Self {
109 Self::default()
110 }
111}
112
113impl VisitMut for QMacroSimplifier {
114 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115 if self.simplified_result.is_some() {
117 return;
118 }
119
120 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121 && self.is_stageleft_runtime_support_call(&path_expr.path)
123 && let Some(closure) = self.extract_closure_from_args(&call.args)
125 {
126 self.simplified_result = Some(closure);
127 return;
128 }
129
130 syn::visit_mut::visit_expr_mut(self, expr);
133 }
134}
135
136impl QMacroSimplifier {
137 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138 if let Some(last_segment) = path.segments.last() {
140 let fn_name = last_segment.ident.to_string();
141 fn_name.contains("_type_hint")
143 && path.segments.len() > 2
144 && path.segments[0].ident == "stageleft"
145 && path.segments[1].ident == "runtime_support"
146 } else {
147 false
148 }
149 }
150
151 fn extract_closure_from_args(
152 &self,
153 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154 ) -> Option<syn::Expr> {
155 for arg in args {
157 if let syn::Expr::Closure(_) = arg {
158 return Some(arg.clone());
159 }
160 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162 return Some(closure_expr);
163 }
164 }
165 None
166 }
167
168 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169 let mut visitor = ClosureFinder {
170 found_closure: None,
171 prefer_inner_blocks: true,
172 };
173 visitor.visit_expr(expr);
174 visitor.found_closure
175 }
176}
177
178struct ClosureFinder {
180 found_closure: Option<syn::Expr>,
181 prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186 if self.found_closure.is_some() {
188 return;
189 }
190
191 match expr {
192 syn::Expr::Closure(_) => {
193 self.found_closure = Some(expr.clone());
194 }
195 syn::Expr::Block(block) if self.prefer_inner_blocks => {
196 for stmt in &block.block.stmts {
198 if let syn::Stmt::Expr(stmt_expr, _) = stmt
199 && let syn::Expr::Block(_) = stmt_expr
200 {
201 let mut inner_visitor = ClosureFinder {
203 found_closure: None,
204 prefer_inner_blocks: false, };
206 inner_visitor.visit_expr(stmt_expr);
207 if inner_visitor.found_closure.is_some() {
208 self.found_closure = Some(stmt_expr.clone());
210 return;
211 }
212 }
213 }
214
215 visit::visit_expr(self, expr);
217
218 if self.found_closure.is_some() {
221 }
223 }
224 _ => {
225 visit::visit_expr(self, expr);
227 }
228 }
229 }
230}
231
232#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239 fn from(t: syn::Type) -> Self {
240 Self(Box::new(t))
241 }
242}
243
244impl Deref for DebugType {
245 type Target = syn::Type;
246
247 fn deref(&self) -> &Self::Target {
248 &self.0
249 }
250}
251
252impl ToTokens for DebugType {
253 fn to_tokens(&self, tokens: &mut TokenStream) {
254 self.0.to_tokens(tokens);
255 }
256}
257
258impl Debug for DebugType {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 write!(f, "{}", self.0.to_token_stream())
261 }
262}
263
264impl serde::Serialize for DebugType {
265 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267 }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271 backtrace: &Backtrace,
272 serializer: S,
273) -> Result<S::Ok, S::Error> {
274 match backtrace.format_span() {
275 Some(span) => serializer.serialize_some(&span),
276 None => serializer.serialize_none(),
277 }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281 ident: &syn::Ident,
282 serializer: S,
283) -> Result<S::Ok, S::Error> {
284 serializer.serialize_str(&ident.to_string())
285}
286
287fn serialize_singleton_refs<S: serde::Serializer>(
288 refs: &[(syn::Ident, HydroNode)],
289 serializer: S,
290) -> Result<S::Ok, S::Error> {
291 use serde::ser::SerializeSeq;
292 let mut seq = serializer.serialize_seq(Some(refs.len()))?;
293 for (ident, node) in refs {
294 seq.serialize_element(&(ident.to_string(), node))?;
295 }
296 seq.end()
297}
298
299pub enum DebugInstantiate {
300 Building,
301 Finalized(Box<DebugInstantiateFinalized>),
302}
303
304impl serde::Serialize for DebugInstantiate {
305 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
306 match self {
307 DebugInstantiate::Building => {
308 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
309 }
310 DebugInstantiate::Finalized(_) => {
311 panic!(
312 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
313 )
314 }
315 }
316 }
317}
318
319#[cfg_attr(
320 not(feature = "build"),
321 expect(
322 dead_code,
323 reason = "sink, source unused without `feature = \"build\"`."
324 )
325)]
326pub struct DebugInstantiateFinalized {
327 sink: syn::Expr,
328 source: syn::Expr,
329 connect_fn: Option<Box<dyn FnOnce()>>,
330}
331
332impl From<DebugInstantiateFinalized> for DebugInstantiate {
333 fn from(f: DebugInstantiateFinalized) -> Self {
334 Self::Finalized(Box::new(f))
335 }
336}
337
338impl Debug for DebugInstantiate {
339 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340 write!(f, "<network instantiate>")
341 }
342}
343
344impl Hash for DebugInstantiate {
345 fn hash<H: Hasher>(&self, _state: &mut H) {
346 }
348}
349
350impl Clone for DebugInstantiate {
351 fn clone(&self) -> Self {
352 match self {
353 DebugInstantiate::Building => DebugInstantiate::Building,
354 DebugInstantiate::Finalized(_) => {
355 panic!("DebugInstantiate::Finalized should not be cloned")
356 }
357 }
358 }
359}
360
361#[derive(Debug, Hash, Clone, serde::Serialize)]
370pub enum ClusterMembersState {
371 Uninit,
373 Stream(DebugExpr),
376 Tee(LocationId, LocationId),
380}
381
382#[derive(Debug, Hash, Clone, serde::Serialize)]
384pub enum HydroSource {
385 Stream(DebugExpr),
386 ExternalNetwork(),
387 Iter(DebugExpr),
388 Spin(),
389 ClusterMembers(LocationId, ClusterMembersState),
390 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
391 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
392}
393
394#[cfg(feature = "build")]
395pub trait DfirBuilder {
401 fn singleton_intermediates(&self) -> bool;
403
404 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
406
407 #[expect(clippy::too_many_arguments, reason = "TODO")]
408 fn batch(
409 &mut self,
410 in_ident: syn::Ident,
411 in_location: &LocationId,
412 in_kind: &CollectionKind,
413 out_ident: &syn::Ident,
414 out_location: &LocationId,
415 op_meta: &HydroIrOpMetadata,
416 fold_hooked_idents: &HashSet<String>,
417 );
418 fn yield_from_tick(
419 &mut self,
420 in_ident: syn::Ident,
421 in_location: &LocationId,
422 in_kind: &CollectionKind,
423 out_ident: &syn::Ident,
424 out_location: &LocationId,
425 );
426
427 fn begin_atomic(
428 &mut self,
429 in_ident: syn::Ident,
430 in_location: &LocationId,
431 in_kind: &CollectionKind,
432 out_ident: &syn::Ident,
433 out_location: &LocationId,
434 op_meta: &HydroIrOpMetadata,
435 );
436 fn end_atomic(
437 &mut self,
438 in_ident: syn::Ident,
439 in_location: &LocationId,
440 in_kind: &CollectionKind,
441 out_ident: &syn::Ident,
442 );
443
444 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
445 fn observe_nondet(
446 &mut self,
447 trusted: bool,
448 location: &LocationId,
449 in_ident: syn::Ident,
450 in_kind: &CollectionKind,
451 out_ident: &syn::Ident,
452 out_kind: &CollectionKind,
453 op_meta: &HydroIrOpMetadata,
454 );
455
456 #[expect(clippy::too_many_arguments, reason = "TODO")]
457 fn merge_ordered(
458 &mut self,
459 location: &LocationId,
460 first_ident: syn::Ident,
461 second_ident: syn::Ident,
462 out_ident: &syn::Ident,
463 in_kind: &CollectionKind,
464 op_meta: &HydroIrOpMetadata,
465 operator_tag: Option<&str>,
466 );
467
468 #[expect(clippy::too_many_arguments, reason = "TODO")]
469 fn create_network(
470 &mut self,
471 from: &LocationId,
472 to: &LocationId,
473 input_ident: syn::Ident,
474 out_ident: &syn::Ident,
475 serialize: Option<&DebugExpr>,
476 sink: syn::Expr,
477 source: syn::Expr,
478 deserialize: Option<&DebugExpr>,
479 tag_id: usize,
480 networking_info: &crate::networking::NetworkingInfo,
481 );
482
483 fn create_external_source(
484 &mut self,
485 on: &LocationId,
486 source_expr: syn::Expr,
487 out_ident: &syn::Ident,
488 deserialize: Option<&DebugExpr>,
489 tag_id: usize,
490 );
491
492 fn create_external_output(
493 &mut self,
494 on: &LocationId,
495 sink_expr: syn::Expr,
496 input_ident: &syn::Ident,
497 serialize: Option<&DebugExpr>,
498 tag_id: usize,
499 );
500
501 fn emit_fold_hook(
504 &mut self,
505 location: &LocationId,
506 in_ident: &syn::Ident,
507 in_kind: &CollectionKind,
508 op_meta: &HydroIrOpMetadata,
509 ) -> Option<syn::Ident>;
510}
511
512#[cfg(feature = "build")]
513impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
514 fn singleton_intermediates(&self) -> bool {
515 false
516 }
517
518 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
519 self.entry(location.root().key())
520 .expect("location was removed")
521 .or_default()
522 }
523
524 fn batch(
525 &mut self,
526 in_ident: syn::Ident,
527 in_location: &LocationId,
528 in_kind: &CollectionKind,
529 out_ident: &syn::Ident,
530 _out_location: &LocationId,
531 _op_meta: &HydroIrOpMetadata,
532 _fold_hooked_idents: &HashSet<String>,
533 ) {
534 let builder = self.get_dfir_mut(in_location.root());
535 if in_kind.is_bounded()
536 && matches!(
537 in_kind,
538 CollectionKind::Singleton { .. }
539 | CollectionKind::Optional { .. }
540 | CollectionKind::KeyedSingleton { .. }
541 )
542 {
543 assert!(in_location.is_top_level());
544 builder.add_dfir(
545 parse_quote! {
546 #out_ident = #in_ident -> persist::<'static>();
547 },
548 None,
549 None,
550 );
551 } else {
552 builder.add_dfir(
553 parse_quote! {
554 #out_ident = #in_ident;
555 },
556 None,
557 None,
558 );
559 }
560 }
561
562 fn yield_from_tick(
563 &mut self,
564 in_ident: syn::Ident,
565 in_location: &LocationId,
566 _in_kind: &CollectionKind,
567 out_ident: &syn::Ident,
568 _out_location: &LocationId,
569 ) {
570 let builder = self.get_dfir_mut(in_location.root());
571 builder.add_dfir(
572 parse_quote! {
573 #out_ident = #in_ident;
574 },
575 None,
576 None,
577 );
578 }
579
580 fn begin_atomic(
581 &mut self,
582 in_ident: syn::Ident,
583 in_location: &LocationId,
584 _in_kind: &CollectionKind,
585 out_ident: &syn::Ident,
586 _out_location: &LocationId,
587 _op_meta: &HydroIrOpMetadata,
588 ) {
589 let builder = self.get_dfir_mut(in_location.root());
590 builder.add_dfir(
591 parse_quote! {
592 #out_ident = #in_ident;
593 },
594 None,
595 None,
596 );
597 }
598
599 fn end_atomic(
600 &mut self,
601 in_ident: syn::Ident,
602 in_location: &LocationId,
603 _in_kind: &CollectionKind,
604 out_ident: &syn::Ident,
605 ) {
606 let builder = self.get_dfir_mut(in_location.root());
607 builder.add_dfir(
608 parse_quote! {
609 #out_ident = #in_ident;
610 },
611 None,
612 None,
613 );
614 }
615
616 fn observe_nondet(
617 &mut self,
618 _trusted: bool,
619 location: &LocationId,
620 in_ident: syn::Ident,
621 _in_kind: &CollectionKind,
622 out_ident: &syn::Ident,
623 _out_kind: &CollectionKind,
624 _op_meta: &HydroIrOpMetadata,
625 ) {
626 let builder = self.get_dfir_mut(location);
627 builder.add_dfir(
628 parse_quote! {
629 #out_ident = #in_ident;
630 },
631 None,
632 None,
633 );
634 }
635
636 fn merge_ordered(
637 &mut self,
638 location: &LocationId,
639 first_ident: syn::Ident,
640 second_ident: syn::Ident,
641 out_ident: &syn::Ident,
642 _in_kind: &CollectionKind,
643 _op_meta: &HydroIrOpMetadata,
644 operator_tag: Option<&str>,
645 ) {
646 let builder = self.get_dfir_mut(location);
647 builder.add_dfir(
648 parse_quote! {
649 #out_ident = union();
650 #first_ident -> [0]#out_ident;
651 #second_ident -> [1]#out_ident;
652 },
653 None,
654 operator_tag,
655 );
656 }
657
658 fn create_network(
659 &mut self,
660 from: &LocationId,
661 to: &LocationId,
662 input_ident: syn::Ident,
663 out_ident: &syn::Ident,
664 serialize: Option<&DebugExpr>,
665 sink: syn::Expr,
666 source: syn::Expr,
667 deserialize: Option<&DebugExpr>,
668 tag_id: usize,
669 _networking_info: &crate::networking::NetworkingInfo,
670 ) {
671 let sender_builder = self.get_dfir_mut(from);
672 if let Some(serialize_pipeline) = serialize {
673 sender_builder.add_dfir(
674 parse_quote! {
675 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
676 },
677 None,
678 Some(&format!("send{}", tag_id)),
680 );
681 } else {
682 sender_builder.add_dfir(
683 parse_quote! {
684 #input_ident -> dest_sink(#sink);
685 },
686 None,
687 Some(&format!("send{}", tag_id)),
688 );
689 }
690
691 let receiver_builder = self.get_dfir_mut(to);
692 if let Some(deserialize_pipeline) = deserialize {
693 receiver_builder.add_dfir(
694 parse_quote! {
695 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
696 },
697 None,
698 Some(&format!("recv{}", tag_id)),
699 );
700 } else {
701 receiver_builder.add_dfir(
702 parse_quote! {
703 #out_ident = source_stream(#source);
704 },
705 None,
706 Some(&format!("recv{}", tag_id)),
707 );
708 }
709 }
710
711 fn create_external_source(
712 &mut self,
713 on: &LocationId,
714 source_expr: syn::Expr,
715 out_ident: &syn::Ident,
716 deserialize: Option<&DebugExpr>,
717 tag_id: usize,
718 ) {
719 let receiver_builder = self.get_dfir_mut(on);
720 if let Some(deserialize_pipeline) = deserialize {
721 receiver_builder.add_dfir(
722 parse_quote! {
723 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
724 },
725 None,
726 Some(&format!("recv{}", tag_id)),
727 );
728 } else {
729 receiver_builder.add_dfir(
730 parse_quote! {
731 #out_ident = source_stream(#source_expr);
732 },
733 None,
734 Some(&format!("recv{}", tag_id)),
735 );
736 }
737 }
738
739 fn create_external_output(
740 &mut self,
741 on: &LocationId,
742 sink_expr: syn::Expr,
743 input_ident: &syn::Ident,
744 serialize: Option<&DebugExpr>,
745 tag_id: usize,
746 ) {
747 let sender_builder = self.get_dfir_mut(on);
748 if let Some(serialize_fn) = serialize {
749 sender_builder.add_dfir(
750 parse_quote! {
751 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
752 },
753 None,
754 Some(&format!("send{}", tag_id)),
756 );
757 } else {
758 sender_builder.add_dfir(
759 parse_quote! {
760 #input_ident -> dest_sink(#sink_expr);
761 },
762 None,
763 Some(&format!("send{}", tag_id)),
764 );
765 }
766 }
767
768 fn emit_fold_hook(
769 &mut self,
770 _location: &LocationId,
771 _in_ident: &syn::Ident,
772 _in_kind: &CollectionKind,
773 _op_meta: &HydroIrOpMetadata,
774 ) -> Option<syn::Ident> {
775 None
776 }
777}
778
779#[cfg(feature = "build")]
780pub enum BuildersOrCallback<'a, L, N>
781where
782 L: FnMut(&mut HydroRoot, &mut usize),
783 N: FnMut(&mut HydroNode, &mut usize),
784{
785 Builders(&'a mut dyn DfirBuilder),
786 Callback(L, N),
787}
788
789#[derive(Debug, Hash, serde::Serialize)]
793pub enum HydroRoot {
794 ForEach {
795 f: DebugExpr,
796 input: Box<HydroNode>,
797 op_metadata: HydroIrOpMetadata,
798 },
799 SendExternal {
800 to_external_key: LocationKey,
801 to_port_id: ExternalPortId,
802 to_many: bool,
803 unpaired: bool,
804 serialize_fn: Option<DebugExpr>,
805 instantiate_fn: DebugInstantiate,
806 input: Box<HydroNode>,
807 op_metadata: HydroIrOpMetadata,
808 },
809 DestSink {
810 sink: DebugExpr,
811 input: Box<HydroNode>,
812 op_metadata: HydroIrOpMetadata,
813 },
814 CycleSink {
815 cycle_id: CycleId,
816 input: Box<HydroNode>,
817 op_metadata: HydroIrOpMetadata,
818 },
819 EmbeddedOutput {
820 #[serde(serialize_with = "serialize_ident")]
821 ident: syn::Ident,
822 input: Box<HydroNode>,
823 op_metadata: HydroIrOpMetadata,
824 },
825 Null {
826 input: Box<HydroNode>,
827 op_metadata: HydroIrOpMetadata,
828 },
829}
830
831impl HydroRoot {
832 #[cfg(feature = "build")]
833 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
834 pub fn compile_network<'a, D>(
835 &mut self,
836 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
837 seen_tees: &mut SeenSharedNodes,
838 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
839 processes: &SparseSecondaryMap<LocationKey, D::Process>,
840 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
841 externals: &SparseSecondaryMap<LocationKey, D::External>,
842 env: &mut D::InstantiateEnv,
843 ) where
844 D: Deploy<'a>,
845 {
846 let refcell_extra_stmts = RefCell::new(extra_stmts);
847 let refcell_env = RefCell::new(env);
848 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
849 self.transform_bottom_up(
850 &mut |l| {
851 if let HydroRoot::SendExternal {
852 input,
853 to_external_key,
854 to_port_id,
855 to_many,
856 unpaired,
857 instantiate_fn,
858 ..
859 } = l
860 {
861 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
862 DebugInstantiate::Building => {
863 let to_node = externals
864 .get(*to_external_key)
865 .unwrap_or_else(|| {
866 panic!("A external used in the graph was not instantiated: {}", to_external_key)
867 })
868 .clone();
869
870 match input.metadata().location_id.root() {
871 &LocationId::Process(process_key) => {
872 if *to_many {
873 (
874 (
875 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
876 parse_quote!(DUMMY),
877 ),
878 Box::new(|| {}) as Box<dyn FnOnce()>,
879 )
880 } else {
881 let from_node = processes
882 .get(process_key)
883 .unwrap_or_else(|| {
884 panic!("A process used in the graph was not instantiated: {}", process_key)
885 })
886 .clone();
887
888 let sink_port = from_node.next_port();
889 let source_port = to_node.next_port();
890
891 if *unpaired {
892 use stageleft::quote_type;
893 use tokio_util::codec::LengthDelimitedCodec;
894
895 to_node.register(*to_port_id, source_port.clone());
896
897 let _ = D::e2o_source(
898 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
899 &to_node, &source_port,
900 &from_node, &sink_port,
901 "e_type::<LengthDelimitedCodec>(),
902 format!("{}_{}", *to_external_key, *to_port_id)
903 );
904 }
905
906 (
907 (
908 D::o2e_sink(
909 &from_node,
910 &sink_port,
911 &to_node,
912 &source_port,
913 format!("{}_{}", *to_external_key, *to_port_id)
914 ),
915 parse_quote!(DUMMY),
916 ),
917 if *unpaired {
918 D::e2o_connect(
919 &to_node,
920 &source_port,
921 &from_node,
922 &sink_port,
923 *to_many,
924 NetworkHint::Auto,
925 )
926 } else {
927 Box::new(|| {}) as Box<dyn FnOnce()>
928 },
929 )
930 }
931 }
932 LocationId::Cluster(cluster_key) => {
933 let from_node = clusters
934 .get(*cluster_key)
935 .unwrap_or_else(|| {
936 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
937 })
938 .clone();
939
940 let sink_port = from_node.next_port();
941 let source_port = to_node.next_port();
942
943 if *unpaired {
944 to_node.register(*to_port_id, source_port.clone());
945 }
946
947 (
948 (
949 D::m2e_sink(
950 &from_node,
951 &sink_port,
952 &to_node,
953 &source_port,
954 format!("{}_{}", *to_external_key, *to_port_id)
955 ),
956 parse_quote!(DUMMY),
957 ),
958 Box::new(|| {}) as Box<dyn FnOnce()>,
959 )
960 }
961 _ => panic!()
962 }
963 },
964
965 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
966 };
967
968 *instantiate_fn = DebugInstantiateFinalized {
969 sink: sink_expr,
970 source: source_expr,
971 connect_fn: Some(connect_fn),
972 }
973 .into();
974 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
975 let element_type = match &input.metadata().collection_kind {
976 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
977 _ => panic!("Embedded output must have Stream collection kind"),
978 };
979 let location_key = match input.metadata().location_id.root() {
980 LocationId::Process(key) | LocationId::Cluster(key) => *key,
981 _ => panic!("Embedded output must be on a process or cluster"),
982 };
983 D::register_embedded_output(
984 &mut refcell_env.borrow_mut(),
985 location_key,
986 ident,
987 &element_type,
988 );
989 }
990 },
991 &mut |n| {
992 if let HydroNode::Network {
993 name,
994 networking_info,
995 input,
996 instantiate_fn,
997 metadata,
998 ..
999 } = n
1000 {
1001 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1002 DebugInstantiate::Building => instantiate_network::<D>(
1003 &mut refcell_env.borrow_mut(),
1004 input.metadata().location_id.root(),
1005 metadata.location_id.root(),
1006 processes,
1007 clusters,
1008 name.as_deref(),
1009 networking_info,
1010 ),
1011
1012 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1013 };
1014
1015 *instantiate_fn = DebugInstantiateFinalized {
1016 sink: sink_expr,
1017 source: source_expr,
1018 connect_fn: Some(connect_fn),
1019 }
1020 .into();
1021 } else if let HydroNode::ExternalInput {
1022 from_external_key,
1023 from_port_id,
1024 from_many,
1025 codec_type,
1026 port_hint,
1027 instantiate_fn,
1028 metadata,
1029 ..
1030 } = n
1031 {
1032 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1033 DebugInstantiate::Building => {
1034 let from_node = externals
1035 .get(*from_external_key)
1036 .unwrap_or_else(|| {
1037 panic!(
1038 "A external used in the graph was not instantiated: {}",
1039 from_external_key,
1040 )
1041 })
1042 .clone();
1043
1044 match metadata.location_id.root() {
1045 &LocationId::Process(process_key) => {
1046 let to_node = processes
1047 .get(process_key)
1048 .unwrap_or_else(|| {
1049 panic!("A process used in the graph was not instantiated: {}", process_key)
1050 })
1051 .clone();
1052
1053 let sink_port = from_node.next_port();
1054 let source_port = to_node.next_port();
1055
1056 from_node.register(*from_port_id, sink_port.clone());
1057
1058 (
1059 (
1060 parse_quote!(DUMMY),
1061 if *from_many {
1062 D::e2o_many_source(
1063 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1064 &to_node, &source_port,
1065 codec_type.0.as_ref(),
1066 format!("{}_{}", *from_external_key, *from_port_id)
1067 )
1068 } else {
1069 D::e2o_source(
1070 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1071 &from_node, &sink_port,
1072 &to_node, &source_port,
1073 codec_type.0.as_ref(),
1074 format!("{}_{}", *from_external_key, *from_port_id)
1075 )
1076 },
1077 ),
1078 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1079 )
1080 }
1081 LocationId::Cluster(cluster_key) => {
1082 let to_node = clusters
1083 .get(*cluster_key)
1084 .unwrap_or_else(|| {
1085 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1086 })
1087 .clone();
1088
1089 let sink_port = from_node.next_port();
1090 let source_port = to_node.next_port();
1091
1092 from_node.register(*from_port_id, sink_port.clone());
1093
1094 (
1095 (
1096 parse_quote!(DUMMY),
1097 D::e2m_source(
1098 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1099 &from_node, &sink_port,
1100 &to_node, &source_port,
1101 codec_type.0.as_ref(),
1102 format!("{}_{}", *from_external_key, *from_port_id)
1103 ),
1104 ),
1105 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1106 )
1107 }
1108 _ => panic!()
1109 }
1110 },
1111
1112 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1113 };
1114
1115 *instantiate_fn = DebugInstantiateFinalized {
1116 sink: sink_expr,
1117 source: source_expr,
1118 connect_fn: Some(connect_fn),
1119 }
1120 .into();
1121 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1122 let element_type = match &metadata.collection_kind {
1123 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1124 _ => panic!("Embedded source must have Stream collection kind"),
1125 };
1126 let location_key = match metadata.location_id.root() {
1127 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1128 _ => panic!("Embedded source must be on a process or cluster"),
1129 };
1130 D::register_embedded_stream_input(
1131 &mut refcell_env.borrow_mut(),
1132 location_key,
1133 ident,
1134 &element_type,
1135 );
1136 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1137 let element_type = match &metadata.collection_kind {
1138 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1139 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1140 };
1141 let location_key = match metadata.location_id.root() {
1142 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1143 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1144 };
1145 D::register_embedded_singleton_input(
1146 &mut refcell_env.borrow_mut(),
1147 location_key,
1148 ident,
1149 &element_type,
1150 );
1151 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1152 match state {
1153 ClusterMembersState::Uninit => {
1154 let at_location = metadata.location_id.root().clone();
1155 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1156 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1157 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1159 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1160 &(),
1161 );
1162 *state = ClusterMembersState::Stream(expr.into());
1163 } else {
1164 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1166 }
1167 }
1168 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1169 panic!("cluster members already finalized");
1170 }
1171 }
1172 }
1173 },
1174 seen_tees,
1175 false,
1176 );
1177 }
1178
1179 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1180 self.transform_bottom_up(
1181 &mut |l| {
1182 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1183 match instantiate_fn {
1184 DebugInstantiate::Building => panic!("network not built"),
1185
1186 DebugInstantiate::Finalized(finalized) => {
1187 (finalized.connect_fn.take().unwrap())();
1188 }
1189 }
1190 }
1191 },
1192 &mut |n| {
1193 if let HydroNode::Network { instantiate_fn, .. }
1194 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1195 {
1196 match instantiate_fn {
1197 DebugInstantiate::Building => panic!("network not built"),
1198
1199 DebugInstantiate::Finalized(finalized) => {
1200 (finalized.connect_fn.take().unwrap())();
1201 }
1202 }
1203 }
1204 },
1205 seen_tees,
1206 false,
1207 );
1208 }
1209
1210 pub fn transform_bottom_up(
1211 &mut self,
1212 transform_root: &mut impl FnMut(&mut HydroRoot),
1213 transform_node: &mut impl FnMut(&mut HydroNode),
1214 seen_tees: &mut SeenSharedNodes,
1215 check_well_formed: bool,
1216 ) {
1217 self.transform_children(
1218 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1219 seen_tees,
1220 );
1221
1222 transform_root(self);
1223 }
1224
1225 pub fn transform_children(
1226 &mut self,
1227 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1228 seen_tees: &mut SeenSharedNodes,
1229 ) {
1230 match self {
1231 HydroRoot::ForEach { input, .. }
1232 | HydroRoot::SendExternal { input, .. }
1233 | HydroRoot::DestSink { input, .. }
1234 | HydroRoot::CycleSink { input, .. }
1235 | HydroRoot::EmbeddedOutput { input, .. }
1236 | HydroRoot::Null { input, .. } => {
1237 transform(input, seen_tees);
1238 }
1239 }
1240 }
1241
1242 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1243 match self {
1244 HydroRoot::ForEach {
1245 f,
1246 input,
1247 op_metadata,
1248 } => HydroRoot::ForEach {
1249 f: f.clone(),
1250 input: Box::new(input.deep_clone(seen_tees)),
1251 op_metadata: op_metadata.clone(),
1252 },
1253 HydroRoot::SendExternal {
1254 to_external_key,
1255 to_port_id,
1256 to_many,
1257 unpaired,
1258 serialize_fn,
1259 instantiate_fn,
1260 input,
1261 op_metadata,
1262 } => HydroRoot::SendExternal {
1263 to_external_key: *to_external_key,
1264 to_port_id: *to_port_id,
1265 to_many: *to_many,
1266 unpaired: *unpaired,
1267 serialize_fn: serialize_fn.clone(),
1268 instantiate_fn: instantiate_fn.clone(),
1269 input: Box::new(input.deep_clone(seen_tees)),
1270 op_metadata: op_metadata.clone(),
1271 },
1272 HydroRoot::DestSink {
1273 sink,
1274 input,
1275 op_metadata,
1276 } => HydroRoot::DestSink {
1277 sink: sink.clone(),
1278 input: Box::new(input.deep_clone(seen_tees)),
1279 op_metadata: op_metadata.clone(),
1280 },
1281 HydroRoot::CycleSink {
1282 cycle_id,
1283 input,
1284 op_metadata,
1285 } => HydroRoot::CycleSink {
1286 cycle_id: *cycle_id,
1287 input: Box::new(input.deep_clone(seen_tees)),
1288 op_metadata: op_metadata.clone(),
1289 },
1290 HydroRoot::EmbeddedOutput {
1291 ident,
1292 input,
1293 op_metadata,
1294 } => HydroRoot::EmbeddedOutput {
1295 ident: ident.clone(),
1296 input: Box::new(input.deep_clone(seen_tees)),
1297 op_metadata: op_metadata.clone(),
1298 },
1299 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1300 input: Box::new(input.deep_clone(seen_tees)),
1301 op_metadata: op_metadata.clone(),
1302 },
1303 }
1304 }
1305
1306 #[cfg(feature = "build")]
1307 pub fn emit(
1308 &mut self,
1309 graph_builders: &mut dyn DfirBuilder,
1310 seen_tees: &mut SeenSharedNodes,
1311 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1312 next_stmt_id: &mut usize,
1313 fold_hooked_idents: &mut HashSet<String>,
1314 ) {
1315 self.emit_core(
1316 &mut BuildersOrCallback::<
1317 fn(&mut HydroRoot, &mut usize),
1318 fn(&mut HydroNode, &mut usize),
1319 >::Builders(graph_builders),
1320 seen_tees,
1321 built_tees,
1322 next_stmt_id,
1323 fold_hooked_idents,
1324 );
1325 }
1326
1327 #[cfg(feature = "build")]
1328 pub fn emit_core(
1329 &mut self,
1330 builders_or_callback: &mut BuildersOrCallback<
1331 impl FnMut(&mut HydroRoot, &mut usize),
1332 impl FnMut(&mut HydroNode, &mut usize),
1333 >,
1334 seen_tees: &mut SeenSharedNodes,
1335 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1336 next_stmt_id: &mut usize,
1337 fold_hooked_idents: &mut HashSet<String>,
1338 ) {
1339 match self {
1340 HydroRoot::ForEach { f, input, .. } => {
1341 let input_ident = input.emit_core(
1342 builders_or_callback,
1343 seen_tees,
1344 built_tees,
1345 next_stmt_id,
1346 fold_hooked_idents,
1347 );
1348
1349 match builders_or_callback {
1350 BuildersOrCallback::Builders(graph_builders) => {
1351 graph_builders
1352 .get_dfir_mut(&input.metadata().location_id)
1353 .add_dfir(
1354 parse_quote! {
1355 #input_ident -> for_each(#f);
1356 },
1357 None,
1358 Some(&next_stmt_id.to_string()),
1359 );
1360 }
1361 BuildersOrCallback::Callback(leaf_callback, _) => {
1362 leaf_callback(self, next_stmt_id);
1363 }
1364 }
1365
1366 *next_stmt_id += 1;
1367 }
1368
1369 HydroRoot::SendExternal {
1370 serialize_fn,
1371 instantiate_fn,
1372 input,
1373 ..
1374 } => {
1375 let input_ident = input.emit_core(
1376 builders_or_callback,
1377 seen_tees,
1378 built_tees,
1379 next_stmt_id,
1380 fold_hooked_idents,
1381 );
1382
1383 match builders_or_callback {
1384 BuildersOrCallback::Builders(graph_builders) => {
1385 let (sink_expr, _) = match instantiate_fn {
1386 DebugInstantiate::Building => (
1387 syn::parse_quote!(DUMMY_SINK),
1388 syn::parse_quote!(DUMMY_SOURCE),
1389 ),
1390
1391 DebugInstantiate::Finalized(finalized) => {
1392 (finalized.sink.clone(), finalized.source.clone())
1393 }
1394 };
1395
1396 graph_builders.create_external_output(
1397 &input.metadata().location_id,
1398 sink_expr,
1399 &input_ident,
1400 serialize_fn.as_ref(),
1401 *next_stmt_id,
1402 );
1403 }
1404 BuildersOrCallback::Callback(leaf_callback, _) => {
1405 leaf_callback(self, next_stmt_id);
1406 }
1407 }
1408
1409 *next_stmt_id += 1;
1410 }
1411
1412 HydroRoot::DestSink { sink, input, .. } => {
1413 let input_ident = input.emit_core(
1414 builders_or_callback,
1415 seen_tees,
1416 built_tees,
1417 next_stmt_id,
1418 fold_hooked_idents,
1419 );
1420
1421 match builders_or_callback {
1422 BuildersOrCallback::Builders(graph_builders) => {
1423 graph_builders
1424 .get_dfir_mut(&input.metadata().location_id)
1425 .add_dfir(
1426 parse_quote! {
1427 #input_ident -> dest_sink(#sink);
1428 },
1429 None,
1430 Some(&next_stmt_id.to_string()),
1431 );
1432 }
1433 BuildersOrCallback::Callback(leaf_callback, _) => {
1434 leaf_callback(self, next_stmt_id);
1435 }
1436 }
1437
1438 *next_stmt_id += 1;
1439 }
1440
1441 HydroRoot::CycleSink {
1442 cycle_id, input, ..
1443 } => {
1444 let input_ident = input.emit_core(
1445 builders_or_callback,
1446 seen_tees,
1447 built_tees,
1448 next_stmt_id,
1449 fold_hooked_idents,
1450 );
1451
1452 match builders_or_callback {
1453 BuildersOrCallback::Builders(graph_builders) => {
1454 let elem_type: syn::Type = match &input.metadata().collection_kind {
1455 CollectionKind::KeyedSingleton {
1456 key_type,
1457 value_type,
1458 ..
1459 }
1460 | CollectionKind::KeyedStream {
1461 key_type,
1462 value_type,
1463 ..
1464 } => {
1465 parse_quote!((#key_type, #value_type))
1466 }
1467 CollectionKind::Stream { element_type, .. }
1468 | CollectionKind::Singleton { element_type, .. }
1469 | CollectionKind::Optional { element_type, .. } => {
1470 parse_quote!(#element_type)
1471 }
1472 };
1473
1474 let cycle_id_ident = cycle_id.as_ident();
1475 graph_builders
1476 .get_dfir_mut(&input.metadata().location_id)
1477 .add_dfir(
1478 parse_quote! {
1479 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1480 },
1481 None,
1482 None,
1483 );
1484 }
1485 BuildersOrCallback::Callback(_, _) => {}
1487 }
1488 }
1489
1490 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1491 let input_ident = input.emit_core(
1492 builders_or_callback,
1493 seen_tees,
1494 built_tees,
1495 next_stmt_id,
1496 fold_hooked_idents,
1497 );
1498
1499 match builders_or_callback {
1500 BuildersOrCallback::Builders(graph_builders) => {
1501 graph_builders
1502 .get_dfir_mut(&input.metadata().location_id)
1503 .add_dfir(
1504 parse_quote! {
1505 #input_ident -> for_each(&mut #ident);
1506 },
1507 None,
1508 Some(&next_stmt_id.to_string()),
1509 );
1510 }
1511 BuildersOrCallback::Callback(leaf_callback, _) => {
1512 leaf_callback(self, next_stmt_id);
1513 }
1514 }
1515
1516 *next_stmt_id += 1;
1517 }
1518
1519 HydroRoot::Null { input, .. } => {
1520 let input_ident = input.emit_core(
1521 builders_or_callback,
1522 seen_tees,
1523 built_tees,
1524 next_stmt_id,
1525 fold_hooked_idents,
1526 );
1527
1528 match builders_or_callback {
1529 BuildersOrCallback::Builders(graph_builders) => {
1530 graph_builders
1531 .get_dfir_mut(&input.metadata().location_id)
1532 .add_dfir(
1533 parse_quote! {
1534 #input_ident -> for_each(|_| {});
1535 },
1536 None,
1537 Some(&next_stmt_id.to_string()),
1538 );
1539 }
1540 BuildersOrCallback::Callback(leaf_callback, _) => {
1541 leaf_callback(self, next_stmt_id);
1542 }
1543 }
1544
1545 *next_stmt_id += 1;
1546 }
1547 }
1548 }
1549
1550 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1551 match self {
1552 HydroRoot::ForEach { op_metadata, .. }
1553 | HydroRoot::SendExternal { op_metadata, .. }
1554 | HydroRoot::DestSink { op_metadata, .. }
1555 | HydroRoot::CycleSink { op_metadata, .. }
1556 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1557 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1558 }
1559 }
1560
1561 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1562 match self {
1563 HydroRoot::ForEach { op_metadata, .. }
1564 | HydroRoot::SendExternal { op_metadata, .. }
1565 | HydroRoot::DestSink { op_metadata, .. }
1566 | HydroRoot::CycleSink { op_metadata, .. }
1567 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1568 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1569 }
1570 }
1571
1572 pub fn input(&self) -> &HydroNode {
1573 match self {
1574 HydroRoot::ForEach { input, .. }
1575 | HydroRoot::SendExternal { input, .. }
1576 | HydroRoot::DestSink { input, .. }
1577 | HydroRoot::CycleSink { input, .. }
1578 | HydroRoot::EmbeddedOutput { input, .. }
1579 | HydroRoot::Null { input, .. } => input,
1580 }
1581 }
1582
1583 pub fn input_metadata(&self) -> &HydroIrMetadata {
1584 self.input().metadata()
1585 }
1586
1587 pub fn print_root(&self) -> String {
1588 match self {
1589 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1590 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1591 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1592 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1593 HydroRoot::EmbeddedOutput { ident, .. } => {
1594 format!("EmbeddedOutput({})", ident)
1595 }
1596 HydroRoot::Null { .. } => "Null".to_owned(),
1597 }
1598 }
1599
1600 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1601 match self {
1602 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1603 transform(f);
1604 }
1605 HydroRoot::SendExternal { .. }
1606 | HydroRoot::CycleSink { .. }
1607 | HydroRoot::EmbeddedOutput { .. }
1608 | HydroRoot::Null { .. } => {}
1609 }
1610 }
1611}
1612
1613#[cfg(feature = "build")]
1614fn tick_of(loc: &LocationId) -> Option<ClockId> {
1615 match loc {
1616 LocationId::Tick(id, _) => Some(*id),
1617 LocationId::Atomic(inner) => tick_of(inner),
1618 _ => None,
1619 }
1620}
1621
1622#[cfg(feature = "build")]
1623fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1624 match loc {
1625 LocationId::Tick(id, inner) => {
1626 *id = uf_find(uf, *id);
1627 remap_location(inner, uf);
1628 }
1629 LocationId::Atomic(inner) => {
1630 remap_location(inner, uf);
1631 }
1632 LocationId::Process(_) | LocationId::Cluster(_) => {}
1633 }
1634}
1635
1636#[cfg(feature = "build")]
1637fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1638 let p = *parent.get(&x).unwrap_or(&x);
1639 if p == x {
1640 return x;
1641 }
1642 let root = uf_find(parent, p);
1643 parent.insert(x, root);
1644 root
1645}
1646
1647#[cfg(feature = "build")]
1648fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1649 let ra = uf_find(parent, a);
1650 let rb = uf_find(parent, b);
1651 if ra != rb {
1652 parent.insert(ra, rb);
1653 }
1654}
1655
1656#[cfg(feature = "build")]
1660pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1661 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1662
1663 transform_bottom_up(
1665 ir,
1666 &mut |_| {},
1667 &mut |node: &mut HydroNode| {
1668 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1669 node
1670 && let (Some(a), Some(b)) = (
1671 tick_of(&inner.metadata().location_id),
1672 tick_of(&metadata.location_id),
1673 )
1674 {
1675 uf_union(&mut uf, a, b);
1676 }
1677 },
1678 false,
1679 );
1680
1681 transform_bottom_up(
1683 ir,
1684 &mut |_| {},
1685 &mut |node: &mut HydroNode| {
1686 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1687 },
1688 false,
1689 );
1690}
1691
1692#[cfg(feature = "build")]
1693pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1694 let mut builders = SecondaryMap::new();
1695 let mut seen_tees = HashMap::new();
1696 let mut built_tees = HashMap::new();
1697 let mut next_stmt_id = 0;
1698 let mut fold_hooked_idents = HashSet::new();
1699 for leaf in ir {
1700 leaf.emit(
1701 &mut builders,
1702 &mut seen_tees,
1703 &mut built_tees,
1704 &mut next_stmt_id,
1705 &mut fold_hooked_idents,
1706 );
1707 }
1708 builders
1709}
1710
1711#[cfg(feature = "build")]
1712pub fn traverse_dfir(
1713 ir: &mut [HydroRoot],
1714 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1715 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1716) {
1717 let mut seen_tees = HashMap::new();
1718 let mut built_tees = HashMap::new();
1719 let mut next_stmt_id = 0;
1720 let mut fold_hooked_idents = HashSet::new();
1721 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1722 ir.iter_mut().for_each(|leaf| {
1723 leaf.emit_core(
1724 &mut callback,
1725 &mut seen_tees,
1726 &mut built_tees,
1727 &mut next_stmt_id,
1728 &mut fold_hooked_idents,
1729 );
1730 });
1731}
1732
1733pub fn transform_bottom_up(
1734 ir: &mut [HydroRoot],
1735 transform_root: &mut impl FnMut(&mut HydroRoot),
1736 transform_node: &mut impl FnMut(&mut HydroNode),
1737 check_well_formed: bool,
1738) {
1739 let mut seen_tees = HashMap::new();
1740 ir.iter_mut().for_each(|leaf| {
1741 leaf.transform_bottom_up(
1742 transform_root,
1743 transform_node,
1744 &mut seen_tees,
1745 check_well_formed,
1746 );
1747 });
1748}
1749
1750pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1751 let mut seen_tees = HashMap::new();
1752 ir.iter()
1753 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1754 .collect()
1755}
1756
1757type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1758thread_local! {
1759 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1760 static SERIALIZED_SHARED: PrintedTees
1764 = const { RefCell::new(None) };
1765}
1766
1767pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1768 PRINTED_TEES.with(|printed_tees| {
1769 let mut printed_tees_mut = printed_tees.borrow_mut();
1770 *printed_tees_mut = Some((0, HashMap::new()));
1771 drop(printed_tees_mut);
1772
1773 let ret = f();
1774
1775 let mut printed_tees_mut = printed_tees.borrow_mut();
1776 *printed_tees_mut = None;
1777
1778 ret
1779 })
1780}
1781
1782pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1787 let _guard = SerializedSharedGuard::enter();
1788 f()
1789}
1790
1791struct SerializedSharedGuard {
1794 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1795}
1796
1797impl SerializedSharedGuard {
1798 fn enter() -> Self {
1799 let previous = SERIALIZED_SHARED.with(|cell| {
1800 let mut guard = cell.borrow_mut();
1801 guard.replace((0, HashMap::new()))
1802 });
1803 Self { previous }
1804 }
1805}
1806
1807impl Drop for SerializedSharedGuard {
1808 fn drop(&mut self) {
1809 SERIALIZED_SHARED.with(|cell| {
1810 *cell.borrow_mut() = self.previous.take();
1811 });
1812 }
1813}
1814
1815pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1816
1817impl serde::Serialize for SharedNode {
1818 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1829 SERIALIZED_SHARED.with(|cell| {
1830 let mut guard = cell.borrow_mut();
1831 let state = guard.as_mut().ok_or_else(|| {
1833 serde::ser::Error::custom(
1834 "SharedNode serialization requires an active serialize_dedup_shared scope",
1835 )
1836 })?;
1837 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1838
1839 if let Some(&id) = state.1.get(&ptr) {
1840 drop(guard);
1841 use serde::ser::SerializeMap;
1842 let mut map = serializer.serialize_map(Some(1))?;
1843 map.serialize_entry("$shared_ref", &id)?;
1844 map.end()
1845 } else {
1846 let id = state.0;
1847 state.0 += 1;
1848 state.1.insert(ptr, id);
1849 drop(guard);
1850
1851 use serde::ser::SerializeMap;
1852 let mut map = serializer.serialize_map(Some(2))?;
1853 map.serialize_entry("$shared", &id)?;
1854 map.serialize_entry("node", &*self.0.borrow())?;
1855 map.end()
1856 }
1857 })
1858 }
1859}
1860
1861impl SharedNode {
1862 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1863 Rc::as_ptr(&self.0)
1864 }
1865}
1866
1867impl Debug for SharedNode {
1868 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1869 PRINTED_TEES.with(|printed_tees| {
1870 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1871 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1872
1873 if let Some(printed_tees_mut) = printed_tees_mut {
1874 if let Some(existing) = printed_tees_mut
1875 .1
1876 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1877 {
1878 write!(f, "<shared {}>", existing)
1879 } else {
1880 let next_id = printed_tees_mut.0;
1881 printed_tees_mut.0 += 1;
1882 printed_tees_mut
1883 .1
1884 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1885 drop(printed_tees_mut_borrow);
1886 write!(f, "<shared {}>: ", next_id)?;
1887 Debug::fmt(&self.0.borrow(), f)
1888 }
1889 } else {
1890 drop(printed_tees_mut_borrow);
1891 write!(f, "<shared>: ")?;
1892 Debug::fmt(&self.0.borrow(), f)
1893 }
1894 })
1895 }
1896}
1897
1898impl Hash for SharedNode {
1899 fn hash<H: Hasher>(&self, state: &mut H) {
1900 self.0.borrow_mut().hash(state);
1901 }
1902}
1903
1904#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1905pub enum BoundKind {
1906 Unbounded,
1907 Bounded,
1908}
1909
1910#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1911pub enum StreamOrder {
1912 NoOrder,
1913 TotalOrder,
1914}
1915
1916#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1917pub enum StreamRetry {
1918 AtLeastOnce,
1919 ExactlyOnce,
1920}
1921
1922#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1923pub enum KeyedSingletonBoundKind {
1924 Unbounded,
1925 MonotonicValue,
1926 BoundedValue,
1927 Bounded,
1928}
1929
1930#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1931pub enum SingletonBoundKind {
1932 Unbounded,
1933 Monotonic,
1934 Bounded,
1935}
1936
1937#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1938pub enum CollectionKind {
1939 Stream {
1940 bound: BoundKind,
1941 order: StreamOrder,
1942 retry: StreamRetry,
1943 element_type: DebugType,
1944 },
1945 Singleton {
1946 bound: SingletonBoundKind,
1947 element_type: DebugType,
1948 },
1949 Optional {
1950 bound: BoundKind,
1951 element_type: DebugType,
1952 },
1953 KeyedStream {
1954 bound: BoundKind,
1955 value_order: StreamOrder,
1956 value_retry: StreamRetry,
1957 key_type: DebugType,
1958 value_type: DebugType,
1959 },
1960 KeyedSingleton {
1961 bound: KeyedSingletonBoundKind,
1962 key_type: DebugType,
1963 value_type: DebugType,
1964 },
1965}
1966
1967impl CollectionKind {
1968 pub fn is_bounded(&self) -> bool {
1969 matches!(
1970 self,
1971 CollectionKind::Stream {
1972 bound: BoundKind::Bounded,
1973 ..
1974 } | CollectionKind::Singleton {
1975 bound: SingletonBoundKind::Bounded,
1976 ..
1977 } | CollectionKind::Optional {
1978 bound: BoundKind::Bounded,
1979 ..
1980 } | CollectionKind::KeyedStream {
1981 bound: BoundKind::Bounded,
1982 ..
1983 } | CollectionKind::KeyedSingleton {
1984 bound: KeyedSingletonBoundKind::Bounded,
1985 ..
1986 }
1987 )
1988 }
1989}
1990
1991#[derive(Clone, serde::Serialize)]
1992pub struct HydroIrMetadata {
1993 pub location_id: LocationId,
1994 pub collection_kind: CollectionKind,
1995 pub cardinality: Option<usize>,
1996 pub tag: Option<String>,
1997 pub op: HydroIrOpMetadata,
1998}
1999
2000impl Hash for HydroIrMetadata {
2002 fn hash<H: Hasher>(&self, _: &mut H) {}
2003}
2004
2005impl PartialEq for HydroIrMetadata {
2006 fn eq(&self, _: &Self) -> bool {
2007 true
2008 }
2009}
2010
2011impl Eq for HydroIrMetadata {}
2012
2013impl Debug for HydroIrMetadata {
2014 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2015 f.debug_struct("HydroIrMetadata")
2016 .field("location_id", &self.location_id)
2017 .field("collection_kind", &self.collection_kind)
2018 .finish()
2019 }
2020}
2021
2022#[derive(Clone, serde::Serialize)]
2025pub struct HydroIrOpMetadata {
2026 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2027 pub backtrace: Backtrace,
2028 pub cpu_usage: Option<f64>,
2029 pub network_recv_cpu_usage: Option<f64>,
2030 pub id: Option<usize>,
2031}
2032
2033impl HydroIrOpMetadata {
2034 #[expect(
2035 clippy::new_without_default,
2036 reason = "explicit calls to new ensure correct backtrace bounds"
2037 )]
2038 pub fn new() -> HydroIrOpMetadata {
2039 Self::new_with_skip(1)
2040 }
2041
2042 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2043 HydroIrOpMetadata {
2044 backtrace: Backtrace::get_backtrace(2 + skip_count),
2045 cpu_usage: None,
2046 network_recv_cpu_usage: None,
2047 id: None,
2048 }
2049 }
2050}
2051
2052impl Debug for HydroIrOpMetadata {
2053 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2054 f.debug_struct("HydroIrOpMetadata").finish()
2055 }
2056}
2057
2058impl Hash for HydroIrOpMetadata {
2059 fn hash<H: Hasher>(&self, _: &mut H) {}
2060}
2061
2062#[derive(Debug, Hash, serde::Serialize)]
2065pub enum HydroNode {
2066 Placeholder,
2067
2068 Cast {
2076 inner: Box<HydroNode>,
2077 metadata: HydroIrMetadata,
2078 },
2079
2080 ObserveNonDet {
2086 inner: Box<HydroNode>,
2087 trusted: bool, metadata: HydroIrMetadata,
2089 },
2090
2091 Source {
2092 source: HydroSource,
2093 metadata: HydroIrMetadata,
2094 },
2095
2096 SingletonSource {
2097 value: DebugExpr,
2098 first_tick_only: bool,
2099 metadata: HydroIrMetadata,
2100 },
2101
2102 CycleSource {
2103 cycle_id: CycleId,
2104 metadata: HydroIrMetadata,
2105 },
2106
2107 Tee {
2108 inner: SharedNode,
2109 metadata: HydroIrMetadata,
2110 },
2111
2112 Singleton {
2120 inner: SharedNode,
2121 metadata: HydroIrMetadata,
2122 },
2123
2124 Partition {
2125 inner: SharedNode,
2126 f: DebugExpr,
2127 is_true: bool,
2128 metadata: HydroIrMetadata,
2129 },
2130
2131 BeginAtomic {
2132 inner: Box<HydroNode>,
2133 metadata: HydroIrMetadata,
2134 },
2135
2136 EndAtomic {
2137 inner: Box<HydroNode>,
2138 metadata: HydroIrMetadata,
2139 },
2140
2141 Batch {
2142 inner: Box<HydroNode>,
2143 metadata: HydroIrMetadata,
2144 },
2145
2146 YieldConcat {
2147 inner: Box<HydroNode>,
2148 metadata: HydroIrMetadata,
2149 },
2150
2151 Chain {
2152 first: Box<HydroNode>,
2153 second: Box<HydroNode>,
2154 metadata: HydroIrMetadata,
2155 },
2156
2157 MergeOrdered {
2158 first: Box<HydroNode>,
2159 second: Box<HydroNode>,
2160 metadata: HydroIrMetadata,
2161 },
2162
2163 ChainFirst {
2164 first: Box<HydroNode>,
2165 second: Box<HydroNode>,
2166 metadata: HydroIrMetadata,
2167 },
2168
2169 CrossProduct {
2170 left: Box<HydroNode>,
2171 right: Box<HydroNode>,
2172 metadata: HydroIrMetadata,
2173 },
2174
2175 CrossSingleton {
2176 left: Box<HydroNode>,
2177 right: Box<HydroNode>,
2178 metadata: HydroIrMetadata,
2179 },
2180
2181 Join {
2182 left: Box<HydroNode>,
2183 right: Box<HydroNode>,
2184 metadata: HydroIrMetadata,
2185 },
2186
2187 JoinHalf {
2191 left: Box<HydroNode>,
2192 right: Box<HydroNode>,
2193 metadata: HydroIrMetadata,
2194 },
2195
2196 Difference {
2197 pos: Box<HydroNode>,
2198 neg: Box<HydroNode>,
2199 metadata: HydroIrMetadata,
2200 },
2201
2202 AntiJoin {
2203 pos: Box<HydroNode>,
2204 neg: Box<HydroNode>,
2205 metadata: HydroIrMetadata,
2206 },
2207
2208 ResolveFutures {
2209 input: Box<HydroNode>,
2210 metadata: HydroIrMetadata,
2211 },
2212 ResolveFuturesBlocking {
2213 input: Box<HydroNode>,
2214 metadata: HydroIrMetadata,
2215 },
2216 ResolveFuturesOrdered {
2217 input: Box<HydroNode>,
2218 metadata: HydroIrMetadata,
2219 },
2220
2221 Map {
2222 f: DebugExpr,
2223 #[serde(serialize_with = "serialize_singleton_refs")]
2232 singleton_refs: Vec<(syn::Ident, HydroNode)>,
2233 input: Box<HydroNode>,
2234 metadata: HydroIrMetadata,
2235 },
2236 FlatMap {
2237 f: DebugExpr,
2238 input: Box<HydroNode>,
2239 metadata: HydroIrMetadata,
2240 },
2241 FlatMapStreamBlocking {
2242 f: DebugExpr,
2243 input: Box<HydroNode>,
2244 metadata: HydroIrMetadata,
2245 },
2246 Filter {
2247 f: DebugExpr,
2248 input: Box<HydroNode>,
2249 metadata: HydroIrMetadata,
2250 },
2251 FilterMap {
2252 f: DebugExpr,
2253 input: Box<HydroNode>,
2254 metadata: HydroIrMetadata,
2255 },
2256
2257 DeferTick {
2258 input: Box<HydroNode>,
2259 metadata: HydroIrMetadata,
2260 },
2261 Enumerate {
2262 input: Box<HydroNode>,
2263 metadata: HydroIrMetadata,
2264 },
2265 Inspect {
2266 f: DebugExpr,
2267 input: Box<HydroNode>,
2268 metadata: HydroIrMetadata,
2269 },
2270
2271 Unique {
2272 input: Box<HydroNode>,
2273 metadata: HydroIrMetadata,
2274 },
2275
2276 Sort {
2277 input: Box<HydroNode>,
2278 metadata: HydroIrMetadata,
2279 },
2280 Fold {
2281 init: DebugExpr,
2282 acc: DebugExpr,
2283 input: Box<HydroNode>,
2284 metadata: HydroIrMetadata,
2285 },
2286
2287 Scan {
2288 init: DebugExpr,
2289 acc: DebugExpr,
2290 input: Box<HydroNode>,
2291 metadata: HydroIrMetadata,
2292 },
2293 ScanAsyncBlocking {
2294 init: DebugExpr,
2295 acc: DebugExpr,
2296 input: Box<HydroNode>,
2297 metadata: HydroIrMetadata,
2298 },
2299 FoldKeyed {
2300 init: DebugExpr,
2301 acc: DebugExpr,
2302 input: Box<HydroNode>,
2303 metadata: HydroIrMetadata,
2304 },
2305
2306 Reduce {
2307 f: DebugExpr,
2308 input: Box<HydroNode>,
2309 metadata: HydroIrMetadata,
2310 },
2311 ReduceKeyed {
2312 f: DebugExpr,
2313 input: Box<HydroNode>,
2314 metadata: HydroIrMetadata,
2315 },
2316 ReduceKeyedWatermark {
2317 f: DebugExpr,
2318 input: Box<HydroNode>,
2319 watermark: Box<HydroNode>,
2320 metadata: HydroIrMetadata,
2321 },
2322
2323 Network {
2324 name: Option<String>,
2325 networking_info: crate::networking::NetworkingInfo,
2326 serialize_fn: Option<DebugExpr>,
2327 instantiate_fn: DebugInstantiate,
2328 deserialize_fn: Option<DebugExpr>,
2329 input: Box<HydroNode>,
2330 metadata: HydroIrMetadata,
2331 },
2332
2333 ExternalInput {
2334 from_external_key: LocationKey,
2335 from_port_id: ExternalPortId,
2336 from_many: bool,
2337 codec_type: DebugType,
2338 #[serde(skip)]
2339 port_hint: NetworkHint,
2340 instantiate_fn: DebugInstantiate,
2341 deserialize_fn: Option<DebugExpr>,
2342 metadata: HydroIrMetadata,
2343 },
2344
2345 Counter {
2346 tag: String,
2347 duration: DebugExpr,
2348 prefix: String,
2349 input: Box<HydroNode>,
2350 metadata: HydroIrMetadata,
2351 },
2352}
2353
2354pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2355pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2356
2357impl HydroNode {
2358 pub fn transform_bottom_up(
2359 &mut self,
2360 transform: &mut impl FnMut(&mut HydroNode),
2361 seen_tees: &mut SeenSharedNodes,
2362 check_well_formed: bool,
2363 ) {
2364 self.transform_children(
2365 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2366 seen_tees,
2367 );
2368
2369 transform(self);
2370
2371 let self_location = self.metadata().location_id.root();
2372
2373 if check_well_formed {
2374 match &*self {
2375 HydroNode::Network { .. } => {}
2376 _ => {
2377 self.input_metadata().iter().for_each(|i| {
2378 if i.location_id.root() != self_location {
2379 panic!(
2380 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2381 i,
2382 i.location_id.root(),
2383 self,
2384 self_location
2385 )
2386 }
2387 });
2388 }
2389 }
2390 }
2391 }
2392
2393 #[inline(always)]
2394 pub fn transform_children(
2395 &mut self,
2396 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2397 seen_tees: &mut SeenSharedNodes,
2398 ) {
2399 match self {
2400 HydroNode::Placeholder => {
2401 panic!();
2402 }
2403
2404 HydroNode::Source { .. }
2405 | HydroNode::SingletonSource { .. }
2406 | HydroNode::CycleSource { .. }
2407 | HydroNode::ExternalInput { .. } => {}
2408
2409 HydroNode::Tee { inner, .. }
2410 | HydroNode::Singleton { inner, .. }
2411 | HydroNode::Partition { inner, .. } => {
2412 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2413 *inner = SharedNode(transformed.clone());
2414 } else {
2415 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2416 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2417 let mut orig = inner.0.replace(HydroNode::Placeholder);
2418 transform(&mut orig, seen_tees);
2419 *transformed_cell.borrow_mut() = orig;
2420 *inner = SharedNode(transformed_cell);
2421 }
2422 }
2423
2424 HydroNode::Cast { inner, .. }
2425 | HydroNode::ObserveNonDet { inner, .. }
2426 | HydroNode::BeginAtomic { inner, .. }
2427 | HydroNode::EndAtomic { inner, .. }
2428 | HydroNode::Batch { inner, .. }
2429 | HydroNode::YieldConcat { inner, .. } => {
2430 transform(inner.as_mut(), seen_tees);
2431 }
2432
2433 HydroNode::Chain { first, second, .. } => {
2434 transform(first.as_mut(), seen_tees);
2435 transform(second.as_mut(), seen_tees);
2436 }
2437
2438 HydroNode::MergeOrdered { first, second, .. } => {
2439 transform(first.as_mut(), seen_tees);
2440 transform(second.as_mut(), seen_tees);
2441 }
2442
2443 HydroNode::ChainFirst { first, second, .. } => {
2444 transform(first.as_mut(), seen_tees);
2445 transform(second.as_mut(), seen_tees);
2446 }
2447
2448 HydroNode::CrossSingleton { left, right, .. }
2449 | HydroNode::CrossProduct { left, right, .. }
2450 | HydroNode::Join { left, right, .. }
2451 | HydroNode::JoinHalf { left, right, .. } => {
2452 transform(left.as_mut(), seen_tees);
2453 transform(right.as_mut(), seen_tees);
2454 }
2455
2456 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2457 transform(pos.as_mut(), seen_tees);
2458 transform(neg.as_mut(), seen_tees);
2459 }
2460
2461 HydroNode::ReduceKeyedWatermark {
2462 input, watermark, ..
2463 } => {
2464 transform(input.as_mut(), seen_tees);
2465 transform(watermark.as_mut(), seen_tees);
2466 }
2467
2468 HydroNode::Map {
2469 input,
2470 singleton_refs,
2471 ..
2472 } => {
2473 for (_ident, ref_node) in singleton_refs.iter_mut() {
2475 transform(ref_node, seen_tees);
2476 }
2477 transform(input.as_mut(), seen_tees);
2478 }
2479 HydroNode::ResolveFutures { input, .. }
2480 | HydroNode::ResolveFuturesBlocking { input, .. }
2481 | HydroNode::ResolveFuturesOrdered { input, .. }
2482 | HydroNode::FlatMap { input, .. }
2483 | HydroNode::FlatMapStreamBlocking { input, .. }
2484 | HydroNode::Filter { input, .. }
2485 | HydroNode::FilterMap { input, .. }
2486 | HydroNode::Sort { input, .. }
2487 | HydroNode::DeferTick { input, .. }
2488 | HydroNode::Enumerate { input, .. }
2489 | HydroNode::Inspect { input, .. }
2490 | HydroNode::Unique { input, .. }
2491 | HydroNode::Network { input, .. }
2492 | HydroNode::Fold { input, .. }
2493 | HydroNode::Scan { input, .. }
2494 | HydroNode::ScanAsyncBlocking { input, .. }
2495 | HydroNode::FoldKeyed { input, .. }
2496 | HydroNode::Reduce { input, .. }
2497 | HydroNode::ReduceKeyed { input, .. }
2498 | HydroNode::Counter { input, .. } => {
2499 transform(input.as_mut(), seen_tees);
2500 }
2501 }
2502 }
2503
2504 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2505 match self {
2506 HydroNode::Placeholder => HydroNode::Placeholder,
2507 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2508 inner: Box::new(inner.deep_clone(seen_tees)),
2509 metadata: metadata.clone(),
2510 },
2511 HydroNode::ObserveNonDet {
2512 inner,
2513 trusted,
2514 metadata,
2515 } => HydroNode::ObserveNonDet {
2516 inner: Box::new(inner.deep_clone(seen_tees)),
2517 trusted: *trusted,
2518 metadata: metadata.clone(),
2519 },
2520 HydroNode::Source { source, metadata } => HydroNode::Source {
2521 source: source.clone(),
2522 metadata: metadata.clone(),
2523 },
2524 HydroNode::SingletonSource {
2525 value,
2526 first_tick_only,
2527 metadata,
2528 } => HydroNode::SingletonSource {
2529 value: value.clone(),
2530 first_tick_only: *first_tick_only,
2531 metadata: metadata.clone(),
2532 },
2533 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2534 cycle_id: *cycle_id,
2535 metadata: metadata.clone(),
2536 },
2537 HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2538 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2539 SharedNode(transformed.clone())
2540 } else {
2541 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2542 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2543 let cloned = inner.0.borrow().deep_clone(seen_tees);
2544 *new_rc.borrow_mut() = cloned;
2545 SharedNode(new_rc)
2546 };
2547 if matches!(self, HydroNode::Singleton { .. }) {
2548 HydroNode::Singleton {
2549 inner: cloned_inner,
2550 metadata: metadata.clone(),
2551 }
2552 } else {
2553 HydroNode::Tee {
2554 inner: cloned_inner,
2555 metadata: metadata.clone(),
2556 }
2557 }
2558 }
2559 HydroNode::Partition {
2560 inner,
2561 f,
2562 is_true,
2563 metadata,
2564 } => {
2565 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2566 HydroNode::Partition {
2567 inner: SharedNode(transformed.clone()),
2568 f: f.clone(),
2569 is_true: *is_true,
2570 metadata: metadata.clone(),
2571 }
2572 } else {
2573 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2574 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2575 let cloned = inner.0.borrow().deep_clone(seen_tees);
2576 *new_rc.borrow_mut() = cloned;
2577 HydroNode::Partition {
2578 inner: SharedNode(new_rc),
2579 f: f.clone(),
2580 is_true: *is_true,
2581 metadata: metadata.clone(),
2582 }
2583 }
2584 }
2585 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2586 inner: Box::new(inner.deep_clone(seen_tees)),
2587 metadata: metadata.clone(),
2588 },
2589 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2590 inner: Box::new(inner.deep_clone(seen_tees)),
2591 metadata: metadata.clone(),
2592 },
2593 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2594 inner: Box::new(inner.deep_clone(seen_tees)),
2595 metadata: metadata.clone(),
2596 },
2597 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2598 inner: Box::new(inner.deep_clone(seen_tees)),
2599 metadata: metadata.clone(),
2600 },
2601 HydroNode::Chain {
2602 first,
2603 second,
2604 metadata,
2605 } => HydroNode::Chain {
2606 first: Box::new(first.deep_clone(seen_tees)),
2607 second: Box::new(second.deep_clone(seen_tees)),
2608 metadata: metadata.clone(),
2609 },
2610 HydroNode::MergeOrdered {
2611 first,
2612 second,
2613 metadata,
2614 } => HydroNode::MergeOrdered {
2615 first: Box::new(first.deep_clone(seen_tees)),
2616 second: Box::new(second.deep_clone(seen_tees)),
2617 metadata: metadata.clone(),
2618 },
2619 HydroNode::ChainFirst {
2620 first,
2621 second,
2622 metadata,
2623 } => HydroNode::ChainFirst {
2624 first: Box::new(first.deep_clone(seen_tees)),
2625 second: Box::new(second.deep_clone(seen_tees)),
2626 metadata: metadata.clone(),
2627 },
2628 HydroNode::CrossProduct {
2629 left,
2630 right,
2631 metadata,
2632 } => HydroNode::CrossProduct {
2633 left: Box::new(left.deep_clone(seen_tees)),
2634 right: Box::new(right.deep_clone(seen_tees)),
2635 metadata: metadata.clone(),
2636 },
2637 HydroNode::CrossSingleton {
2638 left,
2639 right,
2640 metadata,
2641 } => HydroNode::CrossSingleton {
2642 left: Box::new(left.deep_clone(seen_tees)),
2643 right: Box::new(right.deep_clone(seen_tees)),
2644 metadata: metadata.clone(),
2645 },
2646 HydroNode::Join {
2647 left,
2648 right,
2649 metadata,
2650 } => HydroNode::Join {
2651 left: Box::new(left.deep_clone(seen_tees)),
2652 right: Box::new(right.deep_clone(seen_tees)),
2653 metadata: metadata.clone(),
2654 },
2655 HydroNode::JoinHalf {
2656 left,
2657 right,
2658 metadata,
2659 } => HydroNode::JoinHalf {
2660 left: Box::new(left.deep_clone(seen_tees)),
2661 right: Box::new(right.deep_clone(seen_tees)),
2662 metadata: metadata.clone(),
2663 },
2664 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2665 pos: Box::new(pos.deep_clone(seen_tees)),
2666 neg: Box::new(neg.deep_clone(seen_tees)),
2667 metadata: metadata.clone(),
2668 },
2669 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2670 pos: Box::new(pos.deep_clone(seen_tees)),
2671 neg: Box::new(neg.deep_clone(seen_tees)),
2672 metadata: metadata.clone(),
2673 },
2674 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2675 input: Box::new(input.deep_clone(seen_tees)),
2676 metadata: metadata.clone(),
2677 },
2678 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2679 HydroNode::ResolveFuturesBlocking {
2680 input: Box::new(input.deep_clone(seen_tees)),
2681 metadata: metadata.clone(),
2682 }
2683 }
2684 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2685 HydroNode::ResolveFuturesOrdered {
2686 input: Box::new(input.deep_clone(seen_tees)),
2687 metadata: metadata.clone(),
2688 }
2689 }
2690 HydroNode::Map {
2691 f,
2692 singleton_refs,
2693 input,
2694 metadata,
2695 } => HydroNode::Map {
2696 f: f.clone(),
2697 singleton_refs: singleton_refs
2698 .iter()
2699 .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
2700 .collect(),
2701 input: Box::new(input.deep_clone(seen_tees)),
2702 metadata: metadata.clone(),
2703 },
2704 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2705 f: f.clone(),
2706 input: Box::new(input.deep_clone(seen_tees)),
2707 metadata: metadata.clone(),
2708 },
2709 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2710 HydroNode::FlatMapStreamBlocking {
2711 f: f.clone(),
2712 input: Box::new(input.deep_clone(seen_tees)),
2713 metadata: metadata.clone(),
2714 }
2715 }
2716 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2717 f: f.clone(),
2718 input: Box::new(input.deep_clone(seen_tees)),
2719 metadata: metadata.clone(),
2720 },
2721 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2722 f: f.clone(),
2723 input: Box::new(input.deep_clone(seen_tees)),
2724 metadata: metadata.clone(),
2725 },
2726 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2727 input: Box::new(input.deep_clone(seen_tees)),
2728 metadata: metadata.clone(),
2729 },
2730 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2731 input: Box::new(input.deep_clone(seen_tees)),
2732 metadata: metadata.clone(),
2733 },
2734 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2735 f: f.clone(),
2736 input: Box::new(input.deep_clone(seen_tees)),
2737 metadata: metadata.clone(),
2738 },
2739 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2740 input: Box::new(input.deep_clone(seen_tees)),
2741 metadata: metadata.clone(),
2742 },
2743 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2744 input: Box::new(input.deep_clone(seen_tees)),
2745 metadata: metadata.clone(),
2746 },
2747 HydroNode::Fold {
2748 init,
2749 acc,
2750 input,
2751 metadata,
2752 } => HydroNode::Fold {
2753 init: init.clone(),
2754 acc: acc.clone(),
2755 input: Box::new(input.deep_clone(seen_tees)),
2756 metadata: metadata.clone(),
2757 },
2758 HydroNode::Scan {
2759 init,
2760 acc,
2761 input,
2762 metadata,
2763 } => HydroNode::Scan {
2764 init: init.clone(),
2765 acc: acc.clone(),
2766 input: Box::new(input.deep_clone(seen_tees)),
2767 metadata: metadata.clone(),
2768 },
2769 HydroNode::ScanAsyncBlocking {
2770 init,
2771 acc,
2772 input,
2773 metadata,
2774 } => HydroNode::ScanAsyncBlocking {
2775 init: init.clone(),
2776 acc: acc.clone(),
2777 input: Box::new(input.deep_clone(seen_tees)),
2778 metadata: metadata.clone(),
2779 },
2780 HydroNode::FoldKeyed {
2781 init,
2782 acc,
2783 input,
2784 metadata,
2785 } => HydroNode::FoldKeyed {
2786 init: init.clone(),
2787 acc: acc.clone(),
2788 input: Box::new(input.deep_clone(seen_tees)),
2789 metadata: metadata.clone(),
2790 },
2791 HydroNode::ReduceKeyedWatermark {
2792 f,
2793 input,
2794 watermark,
2795 metadata,
2796 } => HydroNode::ReduceKeyedWatermark {
2797 f: f.clone(),
2798 input: Box::new(input.deep_clone(seen_tees)),
2799 watermark: Box::new(watermark.deep_clone(seen_tees)),
2800 metadata: metadata.clone(),
2801 },
2802 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2803 f: f.clone(),
2804 input: Box::new(input.deep_clone(seen_tees)),
2805 metadata: metadata.clone(),
2806 },
2807 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2808 f: f.clone(),
2809 input: Box::new(input.deep_clone(seen_tees)),
2810 metadata: metadata.clone(),
2811 },
2812 HydroNode::Network {
2813 name,
2814 networking_info,
2815 serialize_fn,
2816 instantiate_fn,
2817 deserialize_fn,
2818 input,
2819 metadata,
2820 } => HydroNode::Network {
2821 name: name.clone(),
2822 networking_info: networking_info.clone(),
2823 serialize_fn: serialize_fn.clone(),
2824 instantiate_fn: instantiate_fn.clone(),
2825 deserialize_fn: deserialize_fn.clone(),
2826 input: Box::new(input.deep_clone(seen_tees)),
2827 metadata: metadata.clone(),
2828 },
2829 HydroNode::ExternalInput {
2830 from_external_key,
2831 from_port_id,
2832 from_many,
2833 codec_type,
2834 port_hint,
2835 instantiate_fn,
2836 deserialize_fn,
2837 metadata,
2838 } => HydroNode::ExternalInput {
2839 from_external_key: *from_external_key,
2840 from_port_id: *from_port_id,
2841 from_many: *from_many,
2842 codec_type: codec_type.clone(),
2843 port_hint: *port_hint,
2844 instantiate_fn: instantiate_fn.clone(),
2845 deserialize_fn: deserialize_fn.clone(),
2846 metadata: metadata.clone(),
2847 },
2848 HydroNode::Counter {
2849 tag,
2850 duration,
2851 prefix,
2852 input,
2853 metadata,
2854 } => HydroNode::Counter {
2855 tag: tag.clone(),
2856 duration: duration.clone(),
2857 prefix: prefix.clone(),
2858 input: Box::new(input.deep_clone(seen_tees)),
2859 metadata: metadata.clone(),
2860 },
2861 }
2862 }
2863
2864 #[cfg(feature = "build")]
2865 pub fn emit_core(
2866 &mut self,
2867 builders_or_callback: &mut BuildersOrCallback<
2868 impl FnMut(&mut HydroRoot, &mut usize),
2869 impl FnMut(&mut HydroNode, &mut usize),
2870 >,
2871 seen_tees: &mut SeenSharedNodes,
2872 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2873 next_stmt_id: &mut usize,
2874 fold_hooked_idents: &mut HashSet<String>,
2875 ) -> syn::Ident {
2876 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2877
2878 self.transform_bottom_up(
2879 &mut |node: &mut HydroNode| {
2880 let out_location = node.metadata().location_id.clone();
2881 match node {
2882 HydroNode::Placeholder => {
2883 panic!()
2884 }
2885
2886 HydroNode::Cast { .. } => {
2887 match builders_or_callback {
2890 BuildersOrCallback::Builders(_) => {}
2891 BuildersOrCallback::Callback(_, node_callback) => {
2892 node_callback(node, next_stmt_id);
2893 }
2894 }
2895
2896 *next_stmt_id += 1;
2897 }
2899
2900 HydroNode::ObserveNonDet {
2901 inner,
2902 trusted,
2903 metadata,
2904 ..
2905 } => {
2906 let inner_ident = ident_stack.pop().unwrap();
2907
2908 let observe_ident =
2909 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2910
2911 match builders_or_callback {
2912 BuildersOrCallback::Builders(graph_builders) => {
2913 graph_builders.observe_nondet(
2914 *trusted,
2915 &inner.metadata().location_id,
2916 inner_ident,
2917 &inner.metadata().collection_kind,
2918 &observe_ident,
2919 &metadata.collection_kind,
2920 &metadata.op,
2921 );
2922 }
2923 BuildersOrCallback::Callback(_, node_callback) => {
2924 node_callback(node, next_stmt_id);
2925 }
2926 }
2927
2928 *next_stmt_id += 1;
2929
2930 ident_stack.push(observe_ident);
2931 }
2932
2933 HydroNode::Batch {
2934 inner, metadata, ..
2935 } => {
2936 let inner_ident = ident_stack.pop().unwrap();
2937
2938 let batch_ident =
2939 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2940
2941 match builders_or_callback {
2942 BuildersOrCallback::Builders(graph_builders) => {
2943 graph_builders.batch(
2944 inner_ident,
2945 &inner.metadata().location_id,
2946 &inner.metadata().collection_kind,
2947 &batch_ident,
2948 &out_location,
2949 &metadata.op,
2950 fold_hooked_idents,
2951 );
2952 }
2953 BuildersOrCallback::Callback(_, node_callback) => {
2954 node_callback(node, next_stmt_id);
2955 }
2956 }
2957
2958 *next_stmt_id += 1;
2959
2960 ident_stack.push(batch_ident);
2961 }
2962
2963 HydroNode::YieldConcat { inner, .. } => {
2964 let inner_ident = ident_stack.pop().unwrap();
2965
2966 let yield_ident =
2967 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2968
2969 match builders_or_callback {
2970 BuildersOrCallback::Builders(graph_builders) => {
2971 graph_builders.yield_from_tick(
2972 inner_ident,
2973 &inner.metadata().location_id,
2974 &inner.metadata().collection_kind,
2975 &yield_ident,
2976 &out_location,
2977 );
2978 }
2979 BuildersOrCallback::Callback(_, node_callback) => {
2980 node_callback(node, next_stmt_id);
2981 }
2982 }
2983
2984 *next_stmt_id += 1;
2985
2986 ident_stack.push(yield_ident);
2987 }
2988
2989 HydroNode::BeginAtomic { inner, metadata } => {
2990 let inner_ident = ident_stack.pop().unwrap();
2991
2992 let begin_ident =
2993 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2994
2995 match builders_or_callback {
2996 BuildersOrCallback::Builders(graph_builders) => {
2997 graph_builders.begin_atomic(
2998 inner_ident,
2999 &inner.metadata().location_id,
3000 &inner.metadata().collection_kind,
3001 &begin_ident,
3002 &out_location,
3003 &metadata.op,
3004 );
3005 }
3006 BuildersOrCallback::Callback(_, node_callback) => {
3007 node_callback(node, next_stmt_id);
3008 }
3009 }
3010
3011 *next_stmt_id += 1;
3012
3013 ident_stack.push(begin_ident);
3014 }
3015
3016 HydroNode::EndAtomic { inner, .. } => {
3017 let inner_ident = ident_stack.pop().unwrap();
3018
3019 let end_ident =
3020 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3021
3022 match builders_or_callback {
3023 BuildersOrCallback::Builders(graph_builders) => {
3024 graph_builders.end_atomic(
3025 inner_ident,
3026 &inner.metadata().location_id,
3027 &inner.metadata().collection_kind,
3028 &end_ident,
3029 );
3030 }
3031 BuildersOrCallback::Callback(_, node_callback) => {
3032 node_callback(node, next_stmt_id);
3033 }
3034 }
3035
3036 *next_stmt_id += 1;
3037
3038 ident_stack.push(end_ident);
3039 }
3040
3041 HydroNode::Source {
3042 source, metadata, ..
3043 } => {
3044 if let HydroSource::ExternalNetwork() = source {
3045 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3046 } else {
3047 let source_ident =
3048 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3049
3050 let source_stmt = match source {
3051 HydroSource::Stream(expr) => {
3052 debug_assert!(metadata.location_id.is_top_level());
3053 parse_quote! {
3054 #source_ident = source_stream(#expr);
3055 }
3056 }
3057
3058 HydroSource::ExternalNetwork() => {
3059 unreachable!()
3060 }
3061
3062 HydroSource::Iter(expr) => {
3063 if metadata.location_id.is_top_level() {
3064 parse_quote! {
3065 #source_ident = source_iter(#expr);
3066 }
3067 } else {
3068 parse_quote! {
3070 #source_ident = source_iter(#expr) -> persist::<'static>();
3071 }
3072 }
3073 }
3074
3075 HydroSource::Spin() => {
3076 debug_assert!(metadata.location_id.is_top_level());
3077 parse_quote! {
3078 #source_ident = spin();
3079 }
3080 }
3081
3082 HydroSource::ClusterMembers(target_loc, state) => {
3083 debug_assert!(metadata.location_id.is_top_level());
3084
3085 let members_tee_ident = syn::Ident::new(
3086 &format!(
3087 "__cluster_members_tee_{}_{}",
3088 metadata.location_id.root().key(),
3089 target_loc.key(),
3090 ),
3091 Span::call_site(),
3092 );
3093
3094 match state {
3095 ClusterMembersState::Stream(d) => {
3096 parse_quote! {
3097 #members_tee_ident = source_stream(#d) -> tee();
3098 #source_ident = #members_tee_ident;
3099 }
3100 },
3101 ClusterMembersState::Uninit => syn::parse_quote! {
3102 #source_ident = source_stream(DUMMY);
3103 },
3104 ClusterMembersState::Tee(..) => parse_quote! {
3105 #source_ident = #members_tee_ident;
3106 },
3107 }
3108 }
3109
3110 HydroSource::Embedded(ident) => {
3111 parse_quote! {
3112 #source_ident = source_stream(#ident);
3113 }
3114 }
3115
3116 HydroSource::EmbeddedSingleton(ident) => {
3117 parse_quote! {
3118 #source_ident = source_iter([#ident]);
3119 }
3120 }
3121 };
3122
3123 match builders_or_callback {
3124 BuildersOrCallback::Builders(graph_builders) => {
3125 let builder = graph_builders.get_dfir_mut(&out_location);
3126 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3127 }
3128 BuildersOrCallback::Callback(_, node_callback) => {
3129 node_callback(node, next_stmt_id);
3130 }
3131 }
3132
3133 *next_stmt_id += 1;
3134
3135 ident_stack.push(source_ident);
3136 }
3137 }
3138
3139 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3140 let source_ident =
3141 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3142
3143 match builders_or_callback {
3144 BuildersOrCallback::Builders(graph_builders) => {
3145 let builder = graph_builders.get_dfir_mut(&out_location);
3146
3147 if *first_tick_only {
3148 assert!(
3149 !metadata.location_id.is_top_level(),
3150 "first_tick_only SingletonSource must be inside a tick"
3151 );
3152 }
3153
3154 if *first_tick_only
3155 || (metadata.location_id.is_top_level()
3156 && metadata.collection_kind.is_bounded())
3157 {
3158 builder.add_dfir(
3159 parse_quote! {
3160 #source_ident = source_iter([#value]);
3161 },
3162 None,
3163 Some(&next_stmt_id.to_string()),
3164 );
3165 } else {
3166 builder.add_dfir(
3167 parse_quote! {
3168 #source_ident = source_iter([#value]) -> persist::<'static>();
3169 },
3170 None,
3171 Some(&next_stmt_id.to_string()),
3172 );
3173 }
3174 }
3175 BuildersOrCallback::Callback(_, node_callback) => {
3176 node_callback(node, next_stmt_id);
3177 }
3178 }
3179
3180 *next_stmt_id += 1;
3181
3182 ident_stack.push(source_ident);
3183 }
3184
3185 HydroNode::CycleSource { cycle_id, .. } => {
3186 let ident = cycle_id.as_ident();
3187
3188 match builders_or_callback {
3189 BuildersOrCallback::Builders(_) => {}
3190 BuildersOrCallback::Callback(_, node_callback) => {
3191 node_callback(node, next_stmt_id);
3192 }
3193 }
3194
3195 *next_stmt_id += 1;
3197
3198 ident_stack.push(ident);
3199 }
3200
3201 HydroNode::Tee { inner, .. } => {
3202 let ret_ident = if let Some(built_idents) =
3203 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3204 {
3205 match builders_or_callback {
3206 BuildersOrCallback::Builders(_) => {}
3207 BuildersOrCallback::Callback(_, node_callback) => {
3208 node_callback(node, next_stmt_id);
3209 }
3210 }
3211
3212 built_idents[0].clone()
3213 } else {
3214 let inner_ident = ident_stack.pop().unwrap();
3217
3218 let tee_ident =
3219 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3220
3221 built_tees.insert(
3222 inner.0.as_ref() as *const RefCell<HydroNode>,
3223 vec![tee_ident.clone()],
3224 );
3225
3226 match builders_or_callback {
3227 BuildersOrCallback::Builders(graph_builders) => {
3228 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3240 fold_hooked_idents.insert(tee_ident.to_string());
3241 }
3242 let builder = graph_builders.get_dfir_mut(&out_location);
3243 builder.add_dfir(
3244 parse_quote! {
3245 #tee_ident = #inner_ident -> tee();
3246 },
3247 None,
3248 Some(&next_stmt_id.to_string()),
3249 );
3250 }
3251 BuildersOrCallback::Callback(_, node_callback) => {
3252 node_callback(node, next_stmt_id);
3253 }
3254 }
3255
3256 tee_ident
3257 };
3258
3259 *next_stmt_id += 1;
3263 ident_stack.push(ret_ident);
3264 }
3265
3266 HydroNode::Singleton { inner, .. } => {
3267 let ret_ident = if let Some(built_idents) =
3268 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3269 {
3270 built_idents[0].clone()
3271 } else {
3272 let inner_ident = ident_stack.pop().unwrap();
3273
3274 let singleton_ident =
3275 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3276
3277 built_tees.insert(
3278 inner.0.as_ref() as *const RefCell<HydroNode>,
3279 vec![singleton_ident.clone()],
3280 );
3281
3282 match builders_or_callback {
3283 BuildersOrCallback::Builders(graph_builders) => {
3284 let builder = graph_builders.get_dfir_mut(&out_location);
3285 builder.add_dfir(
3286 parse_quote! {
3287 #singleton_ident = #inner_ident -> singleton();
3288 },
3289 None,
3290 Some(&next_stmt_id.to_string()),
3291 );
3292 }
3293 BuildersOrCallback::Callback(_, node_callback) => {
3294 node_callback(node, next_stmt_id);
3295 }
3296 }
3297
3298 singleton_ident
3299 };
3300
3301 *next_stmt_id += 1;
3304 ident_stack.push(ret_ident);
3305 }
3306
3307 HydroNode::Partition {
3308 inner, f, is_true, ..
3309 } => {
3310 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3312 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3313 match builders_or_callback {
3314 BuildersOrCallback::Builders(_) => {}
3315 BuildersOrCallback::Callback(_, node_callback) => {
3316 node_callback(node, next_stmt_id);
3317 }
3318 }
3319
3320 let idx = if is_true { 0 } else { 1 };
3321 built_idents[idx].clone()
3322 } else {
3323 let inner_ident = ident_stack.pop().unwrap();
3326
3327 let partition_ident = syn::Ident::new(
3328 &format!("stream_{}_partition", *next_stmt_id),
3329 Span::call_site(),
3330 );
3331 let true_ident = syn::Ident::new(
3332 &format!("stream_{}_true", *next_stmt_id),
3333 Span::call_site(),
3334 );
3335 let false_ident = syn::Ident::new(
3336 &format!("stream_{}_false", *next_stmt_id),
3337 Span::call_site(),
3338 );
3339
3340 built_tees.insert(
3341 ptr,
3342 vec![true_ident.clone(), false_ident.clone()],
3343 );
3344
3345 match builders_or_callback {
3346 BuildersOrCallback::Builders(graph_builders) => {
3347 let builder = graph_builders.get_dfir_mut(&out_location);
3348 builder.add_dfir(
3349 parse_quote! {
3350 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3351 #true_ident = #partition_ident[0];
3352 #false_ident = #partition_ident[1];
3353 },
3354 None,
3355 Some(&next_stmt_id.to_string()),
3356 );
3357 }
3358 BuildersOrCallback::Callback(_, node_callback) => {
3359 node_callback(node, next_stmt_id);
3360 }
3361 }
3362
3363 if is_true { true_ident } else { false_ident }
3364 };
3365
3366 *next_stmt_id += 1;
3367 ident_stack.push(ret_ident);
3368 }
3369
3370 HydroNode::Chain { .. } => {
3371 let second_ident = ident_stack.pop().unwrap();
3373 let first_ident = ident_stack.pop().unwrap();
3374
3375 let chain_ident =
3376 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3377
3378 match builders_or_callback {
3379 BuildersOrCallback::Builders(graph_builders) => {
3380 let builder = graph_builders.get_dfir_mut(&out_location);
3381 builder.add_dfir(
3382 parse_quote! {
3383 #chain_ident = chain();
3384 #first_ident -> [0]#chain_ident;
3385 #second_ident -> [1]#chain_ident;
3386 },
3387 None,
3388 Some(&next_stmt_id.to_string()),
3389 );
3390 }
3391 BuildersOrCallback::Callback(_, node_callback) => {
3392 node_callback(node, next_stmt_id);
3393 }
3394 }
3395
3396 *next_stmt_id += 1;
3397
3398 ident_stack.push(chain_ident);
3399 }
3400
3401 HydroNode::MergeOrdered { first, metadata, .. } => {
3402 let second_ident = ident_stack.pop().unwrap();
3403 let first_ident = ident_stack.pop().unwrap();
3404
3405 let merge_ident =
3406 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3407
3408 match builders_or_callback {
3409 BuildersOrCallback::Builders(graph_builders) => {
3410 graph_builders.merge_ordered(
3411 &first.metadata().location_id,
3412 first_ident,
3413 second_ident,
3414 &merge_ident,
3415 &first.metadata().collection_kind,
3416 &metadata.op,
3417 Some(&next_stmt_id.to_string()),
3418 );
3419 }
3420 BuildersOrCallback::Callback(_, node_callback) => {
3421 node_callback(node, next_stmt_id);
3422 }
3423 }
3424
3425 *next_stmt_id += 1;
3426
3427 ident_stack.push(merge_ident);
3428 }
3429
3430 HydroNode::ChainFirst { .. } => {
3431 let second_ident = ident_stack.pop().unwrap();
3432 let first_ident = ident_stack.pop().unwrap();
3433
3434 let chain_ident =
3435 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3436
3437 match builders_or_callback {
3438 BuildersOrCallback::Builders(graph_builders) => {
3439 let builder = graph_builders.get_dfir_mut(&out_location);
3440 builder.add_dfir(
3441 parse_quote! {
3442 #chain_ident = chain_first_n(1);
3443 #first_ident -> [0]#chain_ident;
3444 #second_ident -> [1]#chain_ident;
3445 },
3446 None,
3447 Some(&next_stmt_id.to_string()),
3448 );
3449 }
3450 BuildersOrCallback::Callback(_, node_callback) => {
3451 node_callback(node, next_stmt_id);
3452 }
3453 }
3454
3455 *next_stmt_id += 1;
3456
3457 ident_stack.push(chain_ident);
3458 }
3459
3460 HydroNode::CrossSingleton { right, .. } => {
3461 let right_ident = ident_stack.pop().unwrap();
3462 let left_ident = ident_stack.pop().unwrap();
3463
3464 let cross_ident =
3465 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3466
3467 match builders_or_callback {
3468 BuildersOrCallback::Builders(graph_builders) => {
3469 let builder = graph_builders.get_dfir_mut(&out_location);
3470
3471 if right.metadata().location_id.is_top_level()
3472 && right.metadata().collection_kind.is_bounded()
3473 {
3474 builder.add_dfir(
3475 parse_quote! {
3476 #cross_ident = cross_singleton();
3477 #left_ident -> [input]#cross_ident;
3478 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3479 },
3480 None,
3481 Some(&next_stmt_id.to_string()),
3482 );
3483 } else {
3484 builder.add_dfir(
3485 parse_quote! {
3486 #cross_ident = cross_singleton();
3487 #left_ident -> [input]#cross_ident;
3488 #right_ident -> [single]#cross_ident;
3489 },
3490 None,
3491 Some(&next_stmt_id.to_string()),
3492 );
3493 }
3494 }
3495 BuildersOrCallback::Callback(_, node_callback) => {
3496 node_callback(node, next_stmt_id);
3497 }
3498 }
3499
3500 *next_stmt_id += 1;
3501
3502 ident_stack.push(cross_ident);
3503 }
3504
3505 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3506 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3507 parse_quote!(cross_join_multiset)
3508 } else {
3509 parse_quote!(join_multiset)
3510 };
3511
3512 let (HydroNode::CrossProduct { left, right, .. }
3513 | HydroNode::Join { left, right, .. }) = node
3514 else {
3515 unreachable!()
3516 };
3517
3518 let is_top_level = left.metadata().location_id.is_top_level()
3519 && right.metadata().location_id.is_top_level();
3520 let left_lifetime = if left.metadata().location_id.is_top_level() {
3521 quote!('static)
3522 } else {
3523 quote!('tick)
3524 };
3525
3526 let right_lifetime = if right.metadata().location_id.is_top_level() {
3527 quote!('static)
3528 } else {
3529 quote!('tick)
3530 };
3531
3532 let right_ident = ident_stack.pop().unwrap();
3533 let left_ident = ident_stack.pop().unwrap();
3534
3535 let stream_ident =
3536 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3537
3538 match builders_or_callback {
3539 BuildersOrCallback::Builders(graph_builders) => {
3540 let builder = graph_builders.get_dfir_mut(&out_location);
3541 builder.add_dfir(
3542 if is_top_level {
3543 parse_quote! {
3546 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3547 #left_ident -> [0]#stream_ident;
3548 #right_ident -> [1]#stream_ident;
3549 }
3550 } else {
3551 parse_quote! {
3552 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3553 #left_ident -> [0]#stream_ident;
3554 #right_ident -> [1]#stream_ident;
3555 }
3556 }
3557 ,
3558 None,
3559 Some(&next_stmt_id.to_string()),
3560 );
3561 }
3562 BuildersOrCallback::Callback(_, node_callback) => {
3563 node_callback(node, next_stmt_id);
3564 }
3565 }
3566
3567 *next_stmt_id += 1;
3568
3569 ident_stack.push(stream_ident);
3570 }
3571
3572 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3573 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3574 parse_quote!(difference)
3575 } else {
3576 parse_quote!(anti_join)
3577 };
3578
3579 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3580 node
3581 else {
3582 unreachable!()
3583 };
3584
3585 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3586 quote!('static)
3587 } else {
3588 quote!('tick)
3589 };
3590
3591 let neg_ident = ident_stack.pop().unwrap();
3592 let pos_ident = ident_stack.pop().unwrap();
3593
3594 let stream_ident =
3595 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3596
3597 match builders_or_callback {
3598 BuildersOrCallback::Builders(graph_builders) => {
3599 let builder = graph_builders.get_dfir_mut(&out_location);
3600 builder.add_dfir(
3601 parse_quote! {
3602 #stream_ident = #operator::<'tick, #neg_lifetime>();
3603 #pos_ident -> [pos]#stream_ident;
3604 #neg_ident -> [neg]#stream_ident;
3605 },
3606 None,
3607 Some(&next_stmt_id.to_string()),
3608 );
3609 }
3610 BuildersOrCallback::Callback(_, node_callback) => {
3611 node_callback(node, next_stmt_id);
3612 }
3613 }
3614
3615 *next_stmt_id += 1;
3616
3617 ident_stack.push(stream_ident);
3618 }
3619
3620 HydroNode::JoinHalf { .. } => {
3621 let HydroNode::JoinHalf { right, .. } = node else {
3622 unreachable!()
3623 };
3624
3625 assert!(
3626 right.metadata().collection_kind.is_bounded(),
3627 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3628 right.metadata().collection_kind
3629 );
3630
3631 let build_lifetime = if right.metadata().location_id.is_top_level() {
3632 quote!('static)
3633 } else {
3634 quote!('tick)
3635 };
3636
3637 let build_ident = ident_stack.pop().unwrap();
3638 let probe_ident = ident_stack.pop().unwrap();
3639
3640 let stream_ident =
3641 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3642
3643 match builders_or_callback {
3644 BuildersOrCallback::Builders(graph_builders) => {
3645 let builder = graph_builders.get_dfir_mut(&out_location);
3646 builder.add_dfir(
3647 parse_quote! {
3648 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3649 #probe_ident -> [probe]#stream_ident;
3650 #build_ident -> [build]#stream_ident;
3651 },
3652 None,
3653 Some(&next_stmt_id.to_string()),
3654 );
3655 }
3656 BuildersOrCallback::Callback(_, node_callback) => {
3657 node_callback(node, next_stmt_id);
3658 }
3659 }
3660
3661 *next_stmt_id += 1;
3662
3663 ident_stack.push(stream_ident);
3664 }
3665
3666 HydroNode::ResolveFutures { .. } => {
3667 let input_ident = ident_stack.pop().unwrap();
3668
3669 let futures_ident =
3670 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3671
3672 match builders_or_callback {
3673 BuildersOrCallback::Builders(graph_builders) => {
3674 let builder = graph_builders.get_dfir_mut(&out_location);
3675 builder.add_dfir(
3676 parse_quote! {
3677 #futures_ident = #input_ident -> resolve_futures();
3678 },
3679 None,
3680 Some(&next_stmt_id.to_string()),
3681 );
3682 }
3683 BuildersOrCallback::Callback(_, node_callback) => {
3684 node_callback(node, next_stmt_id);
3685 }
3686 }
3687
3688 *next_stmt_id += 1;
3689
3690 ident_stack.push(futures_ident);
3691 }
3692
3693 HydroNode::ResolveFuturesBlocking { .. } => {
3694 let input_ident = ident_stack.pop().unwrap();
3695
3696 let futures_ident =
3697 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3698
3699 match builders_or_callback {
3700 BuildersOrCallback::Builders(graph_builders) => {
3701 let builder = graph_builders.get_dfir_mut(&out_location);
3702 builder.add_dfir(
3703 parse_quote! {
3704 #futures_ident = #input_ident -> resolve_futures_blocking();
3705 },
3706 None,
3707 Some(&next_stmt_id.to_string()),
3708 );
3709 }
3710 BuildersOrCallback::Callback(_, node_callback) => {
3711 node_callback(node, next_stmt_id);
3712 }
3713 }
3714
3715 *next_stmt_id += 1;
3716
3717 ident_stack.push(futures_ident);
3718 }
3719
3720 HydroNode::ResolveFuturesOrdered { .. } => {
3721 let input_ident = ident_stack.pop().unwrap();
3722
3723 let futures_ident =
3724 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3725
3726 match builders_or_callback {
3727 BuildersOrCallback::Builders(graph_builders) => {
3728 let builder = graph_builders.get_dfir_mut(&out_location);
3729 builder.add_dfir(
3730 parse_quote! {
3731 #futures_ident = #input_ident -> resolve_futures_ordered();
3732 },
3733 None,
3734 Some(&next_stmt_id.to_string()),
3735 );
3736 }
3737 BuildersOrCallback::Callback(_, node_callback) => {
3738 node_callback(node, next_stmt_id);
3739 }
3740 }
3741
3742 *next_stmt_id += 1;
3743
3744 ident_stack.push(futures_ident);
3745 }
3746
3747 HydroNode::Map { f, singleton_refs, .. } => {
3748 let input_ident = ident_stack.pop().unwrap();
3750 let ref_idents = (0..singleton_refs.len())
3752 .map(|_| ident_stack.pop().unwrap())
3753 .collect::<Vec<_>>()
3754 .into_iter()
3755 .rev()
3756 .collect::<Vec<_>>();
3757
3758 let map_ident =
3759 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3760
3761 match builders_or_callback {
3762 BuildersOrCallback::Builders(graph_builders) => {
3763 let f_tokens = if singleton_refs.is_empty() {
3766 f.0.to_token_stream()
3767 } else {
3768 let local_idents = singleton_refs
3769 .iter()
3770 .map(|(local_ident, _)| local_ident);
3771 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
3772 let expr = &f.0;
3773 quote! {
3774 {
3775 #(
3776 let #local_idents = #hash #ref_idents;
3777 )*
3778 #expr
3779 }
3780 }
3781 };
3782
3783 let builder = graph_builders.get_dfir_mut(&out_location);
3784 builder.add_dfir(
3785 parse_quote! {
3786 #map_ident = #input_ident -> map(#f_tokens);
3787 },
3788 None,
3789 Some(&next_stmt_id.to_string()),
3790 );
3791 }
3792 BuildersOrCallback::Callback(_, node_callback) => {
3793 node_callback(node, next_stmt_id);
3794 }
3795 }
3796
3797 *next_stmt_id += 1;
3798
3799 ident_stack.push(map_ident);
3800 }
3801
3802 HydroNode::FlatMap { f, .. } => {
3803 let input_ident = ident_stack.pop().unwrap();
3804
3805 let flat_map_ident =
3806 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3807
3808 match builders_or_callback {
3809 BuildersOrCallback::Builders(graph_builders) => {
3810 let builder = graph_builders.get_dfir_mut(&out_location);
3811 builder.add_dfir(
3812 parse_quote! {
3813 #flat_map_ident = #input_ident -> flat_map(#f);
3814 },
3815 None,
3816 Some(&next_stmt_id.to_string()),
3817 );
3818 }
3819 BuildersOrCallback::Callback(_, node_callback) => {
3820 node_callback(node, next_stmt_id);
3821 }
3822 }
3823
3824 *next_stmt_id += 1;
3825
3826 ident_stack.push(flat_map_ident);
3827 }
3828
3829 HydroNode::FlatMapStreamBlocking { f, .. } => {
3830 let input_ident = ident_stack.pop().unwrap();
3831
3832 let flat_map_stream_blocking_ident =
3833 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3834
3835 match builders_or_callback {
3836 BuildersOrCallback::Builders(graph_builders) => {
3837 let builder = graph_builders.get_dfir_mut(&out_location);
3838 builder.add_dfir(
3839 parse_quote! {
3840 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3841 },
3842 None,
3843 Some(&next_stmt_id.to_string()),
3844 );
3845 }
3846 BuildersOrCallback::Callback(_, node_callback) => {
3847 node_callback(node, next_stmt_id);
3848 }
3849 }
3850
3851 *next_stmt_id += 1;
3852
3853 ident_stack.push(flat_map_stream_blocking_ident);
3854 }
3855
3856 HydroNode::Filter { f, .. } => {
3857 let input_ident = ident_stack.pop().unwrap();
3858
3859 let filter_ident =
3860 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3861
3862 match builders_or_callback {
3863 BuildersOrCallback::Builders(graph_builders) => {
3864 let builder = graph_builders.get_dfir_mut(&out_location);
3865 builder.add_dfir(
3866 parse_quote! {
3867 #filter_ident = #input_ident -> filter(#f);
3868 },
3869 None,
3870 Some(&next_stmt_id.to_string()),
3871 );
3872 }
3873 BuildersOrCallback::Callback(_, node_callback) => {
3874 node_callback(node, next_stmt_id);
3875 }
3876 }
3877
3878 *next_stmt_id += 1;
3879
3880 ident_stack.push(filter_ident);
3881 }
3882
3883 HydroNode::FilterMap { f, .. } => {
3884 let input_ident = ident_stack.pop().unwrap();
3885
3886 let filter_map_ident =
3887 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3888
3889 match builders_or_callback {
3890 BuildersOrCallback::Builders(graph_builders) => {
3891 let builder = graph_builders.get_dfir_mut(&out_location);
3892 builder.add_dfir(
3893 parse_quote! {
3894 #filter_map_ident = #input_ident -> filter_map(#f);
3895 },
3896 None,
3897 Some(&next_stmt_id.to_string()),
3898 );
3899 }
3900 BuildersOrCallback::Callback(_, node_callback) => {
3901 node_callback(node, next_stmt_id);
3902 }
3903 }
3904
3905 *next_stmt_id += 1;
3906
3907 ident_stack.push(filter_map_ident);
3908 }
3909
3910 HydroNode::Sort { .. } => {
3911 let input_ident = ident_stack.pop().unwrap();
3912
3913 let sort_ident =
3914 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3915
3916 match builders_or_callback {
3917 BuildersOrCallback::Builders(graph_builders) => {
3918 let builder = graph_builders.get_dfir_mut(&out_location);
3919 builder.add_dfir(
3920 parse_quote! {
3921 #sort_ident = #input_ident -> sort();
3922 },
3923 None,
3924 Some(&next_stmt_id.to_string()),
3925 );
3926 }
3927 BuildersOrCallback::Callback(_, node_callback) => {
3928 node_callback(node, next_stmt_id);
3929 }
3930 }
3931
3932 *next_stmt_id += 1;
3933
3934 ident_stack.push(sort_ident);
3935 }
3936
3937 HydroNode::DeferTick { .. } => {
3938 let input_ident = ident_stack.pop().unwrap();
3939
3940 let defer_tick_ident =
3941 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3942
3943 match builders_or_callback {
3944 BuildersOrCallback::Builders(graph_builders) => {
3945 let builder = graph_builders.get_dfir_mut(&out_location);
3946 builder.add_dfir(
3947 parse_quote! {
3948 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3949 },
3950 None,
3951 Some(&next_stmt_id.to_string()),
3952 );
3953 }
3954 BuildersOrCallback::Callback(_, node_callback) => {
3955 node_callback(node, next_stmt_id);
3956 }
3957 }
3958
3959 *next_stmt_id += 1;
3960
3961 ident_stack.push(defer_tick_ident);
3962 }
3963
3964 HydroNode::Enumerate { input, .. } => {
3965 let input_ident = ident_stack.pop().unwrap();
3966
3967 let enumerate_ident =
3968 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3969
3970 match builders_or_callback {
3971 BuildersOrCallback::Builders(graph_builders) => {
3972 let builder = graph_builders.get_dfir_mut(&out_location);
3973 let lifetime = if input.metadata().location_id.is_top_level() {
3974 quote!('static)
3975 } else {
3976 quote!('tick)
3977 };
3978 builder.add_dfir(
3979 parse_quote! {
3980 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3981 },
3982 None,
3983 Some(&next_stmt_id.to_string()),
3984 );
3985 }
3986 BuildersOrCallback::Callback(_, node_callback) => {
3987 node_callback(node, next_stmt_id);
3988 }
3989 }
3990
3991 *next_stmt_id += 1;
3992
3993 ident_stack.push(enumerate_ident);
3994 }
3995
3996 HydroNode::Inspect { f, .. } => {
3997 let input_ident = ident_stack.pop().unwrap();
3998
3999 let inspect_ident =
4000 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4001
4002 match builders_or_callback {
4003 BuildersOrCallback::Builders(graph_builders) => {
4004 let builder = graph_builders.get_dfir_mut(&out_location);
4005 builder.add_dfir(
4006 parse_quote! {
4007 #inspect_ident = #input_ident -> inspect(#f);
4008 },
4009 None,
4010 Some(&next_stmt_id.to_string()),
4011 );
4012 }
4013 BuildersOrCallback::Callback(_, node_callback) => {
4014 node_callback(node, next_stmt_id);
4015 }
4016 }
4017
4018 *next_stmt_id += 1;
4019
4020 ident_stack.push(inspect_ident);
4021 }
4022
4023 HydroNode::Unique { input, .. } => {
4024 let input_ident = ident_stack.pop().unwrap();
4025
4026 let unique_ident =
4027 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4028
4029 match builders_or_callback {
4030 BuildersOrCallback::Builders(graph_builders) => {
4031 let builder = graph_builders.get_dfir_mut(&out_location);
4032 let lifetime = if input.metadata().location_id.is_top_level() {
4033 quote!('static)
4034 } else {
4035 quote!('tick)
4036 };
4037
4038 builder.add_dfir(
4039 parse_quote! {
4040 #unique_ident = #input_ident -> unique::<#lifetime>();
4041 },
4042 None,
4043 Some(&next_stmt_id.to_string()),
4044 );
4045 }
4046 BuildersOrCallback::Callback(_, node_callback) => {
4047 node_callback(node, next_stmt_id);
4048 }
4049 }
4050
4051 *next_stmt_id += 1;
4052
4053 ident_stack.push(unique_ident);
4054 }
4055
4056 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4057 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4058 if input.metadata().location_id.is_top_level()
4059 && input.metadata().collection_kind.is_bounded()
4060 {
4061 parse_quote!(fold_no_replay)
4062 } else {
4063 parse_quote!(fold)
4064 }
4065 } else if matches!(node, HydroNode::Scan { .. }) {
4066 parse_quote!(scan)
4067 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4068 parse_quote!(scan_async_blocking)
4069 } else if let HydroNode::FoldKeyed { input, .. } = node {
4070 if input.metadata().location_id.is_top_level()
4071 && input.metadata().collection_kind.is_bounded()
4072 {
4073 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4074 } else {
4075 parse_quote!(fold_keyed)
4076 }
4077 } else {
4078 unreachable!()
4079 };
4080
4081 let (HydroNode::Fold { input, .. }
4082 | HydroNode::FoldKeyed { input, .. }
4083 | HydroNode::Scan { input, .. }
4084 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4085 else {
4086 unreachable!()
4087 };
4088
4089 let lifetime = if input.metadata().location_id.is_top_level() {
4090 quote!('static)
4091 } else {
4092 quote!('tick)
4093 };
4094
4095 let input_ident = ident_stack.pop().unwrap();
4096
4097 let (HydroNode::Fold { init, acc, .. }
4098 | HydroNode::FoldKeyed { init, acc, .. }
4099 | HydroNode::Scan { init, acc, .. }
4100 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4101 else {
4102 unreachable!()
4103 };
4104
4105 let fold_ident =
4106 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4107
4108 match builders_or_callback {
4109 BuildersOrCallback::Builders(graph_builders) => {
4110 if matches!(node, HydroNode::Fold { .. })
4111 && node.metadata().location_id.is_top_level()
4112 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4113 && graph_builders.singleton_intermediates()
4114 && !node.metadata().collection_kind.is_bounded()
4115 {
4116 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4117 let hooked_input_ident = graph_builders.emit_fold_hook(
4118 &input.metadata().location_id,
4119 &input_ident,
4120 &input.metadata().collection_kind,
4121 &node.metadata().op,
4122 );
4123
4124 let (effective_input, acc) = if let Some(ref hooked) = hooked_input_ident {
4125 let acc: syn::Expr = parse_quote!({
4126 let mut __inner = #acc;
4127 move |__state, __batch: Vec<_>| {
4128 if __batch.is_empty() {
4129 return None;
4130 }
4131 for __value in __batch {
4132 __inner(__state, __value);
4133 }
4134 Some(__state.clone())
4135 }
4136 });
4137 (hooked, acc)
4138 } else {
4139 let acc: syn::Expr = parse_quote!({
4140 let mut __inner = #acc;
4141 move |__state, __value| {
4142 __inner(__state, __value);
4143 Some(__state.clone())
4144 }
4145 });
4146 (&input_ident, acc)
4147 };
4148
4149 let builder = graph_builders.get_dfir_mut(&out_location);
4150 builder.add_dfir(
4151 parse_quote! {
4152 source_iter([(#init)()]) -> [0]#fold_ident;
4153 #effective_input -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
4154 #fold_ident = chain();
4155 },
4156 None,
4157 Some(&next_stmt_id.to_string()),
4158 );
4159
4160 if hooked_input_ident.is_some() {
4161 fold_hooked_idents.insert(fold_ident.to_string());
4162 }
4163 } else if matches!(node, HydroNode::FoldKeyed { .. })
4164 && node.metadata().location_id.is_top_level()
4165 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4166 && graph_builders.singleton_intermediates()
4167 && !node.metadata().collection_kind.is_bounded()
4168 {
4169 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4170 let hooked_input_ident = graph_builders.emit_fold_hook(
4171 &input.metadata().location_id,
4172 &input_ident,
4173 &input.metadata().collection_kind,
4174 &node.metadata().op,
4175 );
4176 let builder = graph_builders.get_dfir_mut(&out_location);
4177
4178 let acc: syn::Expr = parse_quote!({
4179 let mut __init = #init;
4180 let mut __inner = #acc;
4181 move |__state, __kv: (_, _)| {
4182 let __state = __state
4184 .entry(::std::clone::Clone::clone(&__kv.0))
4185 .or_insert_with(|| (__init)());
4186 __inner(__state, __kv.1);
4187 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4188 }
4189 });
4190
4191 if let Some(hooked_input_ident) = hooked_input_ident {
4192 builder.add_dfir(
4193 parse_quote! {
4194 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
4195 },
4196 None,
4197 Some(&next_stmt_id.to_string()),
4198 );
4199
4200 fold_hooked_idents.insert(fold_ident.to_string());
4201 } else {
4202 builder.add_dfir(
4203 parse_quote! {
4204 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
4205 },
4206 None,
4207 Some(&next_stmt_id.to_string()),
4208 );
4209 }
4210 } else if (matches!(node, HydroNode::Fold { .. })
4211 || matches!(node, HydroNode::FoldKeyed { .. }))
4212 && !node.metadata().location_id.is_top_level()
4213 && graph_builders.singleton_intermediates()
4214 {
4215 let input_ref = match &*node {
4216 HydroNode::Fold { input, .. } => input,
4217 HydroNode::FoldKeyed { input, .. } => input,
4218 _ => unreachable!(),
4219 };
4220 let hooked_input_ident = graph_builders.emit_fold_hook(
4221 &input_ref.metadata().location_id,
4222 &input_ident,
4223 &input_ref.metadata().collection_kind,
4224 &node.metadata().op,
4225 );
4226
4227 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4228 let builder = graph_builders.get_dfir_mut(&out_location);
4229 builder.add_dfir(
4230 parse_quote! {
4231 #fold_ident = #actual_input -> #operator::<#lifetime>(#init, #acc);
4232 },
4233 None,
4234 Some(&next_stmt_id.to_string()),
4235 );
4236 } else {
4237 let builder = graph_builders.get_dfir_mut(&out_location);
4238 builder.add_dfir(
4239 parse_quote! {
4240 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
4241 },
4242 None,
4243 Some(&next_stmt_id.to_string()),
4244 );
4245 }
4246 }
4247 BuildersOrCallback::Callback(_, node_callback) => {
4248 node_callback(node, next_stmt_id);
4249 }
4250 }
4251
4252 *next_stmt_id += 1;
4253
4254 ident_stack.push(fold_ident);
4255 }
4256
4257 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4258 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4259 if input.metadata().location_id.is_top_level()
4260 && input.metadata().collection_kind.is_bounded()
4261 {
4262 parse_quote!(reduce_no_replay)
4263 } else {
4264 parse_quote!(reduce)
4265 }
4266 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4267 if input.metadata().location_id.is_top_level()
4268 && input.metadata().collection_kind.is_bounded()
4269 {
4270 todo!(
4271 "Calling keyed reduce on a top-level bounded collection is not supported"
4272 )
4273 } else {
4274 parse_quote!(reduce_keyed)
4275 }
4276 } else {
4277 unreachable!()
4278 };
4279
4280 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4281 else {
4282 unreachable!()
4283 };
4284
4285 let lifetime = if input.metadata().location_id.is_top_level() {
4286 quote!('static)
4287 } else {
4288 quote!('tick)
4289 };
4290
4291 let input_ident = ident_stack.pop().unwrap();
4292
4293 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4294 else {
4295 unreachable!()
4296 };
4297
4298 let reduce_ident =
4299 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4300
4301 match builders_or_callback {
4302 BuildersOrCallback::Builders(graph_builders) => {
4303 if matches!(node, HydroNode::Reduce { .. })
4304 && node.metadata().location_id.is_top_level()
4305 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4306 && graph_builders.singleton_intermediates()
4307 && !node.metadata().collection_kind.is_bounded()
4308 {
4309 todo!(
4310 "Reduce with optional intermediates is not yet supported in simulator"
4311 );
4312 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4313 && node.metadata().location_id.is_top_level()
4314 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4315 && graph_builders.singleton_intermediates()
4316 && !node.metadata().collection_kind.is_bounded()
4317 {
4318 todo!(
4319 "Reduce keyed with optional intermediates is not yet supported in simulator"
4320 );
4321 } else {
4322 let builder = graph_builders.get_dfir_mut(&out_location);
4323 builder.add_dfir(
4324 parse_quote! {
4325 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
4326 },
4327 None,
4328 Some(&next_stmt_id.to_string()),
4329 );
4330 }
4331 }
4332 BuildersOrCallback::Callback(_, node_callback) => {
4333 node_callback(node, next_stmt_id);
4334 }
4335 }
4336
4337 *next_stmt_id += 1;
4338
4339 ident_stack.push(reduce_ident);
4340 }
4341
4342 HydroNode::ReduceKeyedWatermark {
4343 f,
4344 input,
4345 metadata,
4346 ..
4347 } => {
4348 let lifetime = if input.metadata().location_id.is_top_level() {
4349 quote!('static)
4350 } else {
4351 quote!('tick)
4352 };
4353
4354 let watermark_ident = ident_stack.pop().unwrap();
4356 let input_ident = ident_stack.pop().unwrap();
4357
4358 let chain_ident = syn::Ident::new(
4359 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4360 Span::call_site(),
4361 );
4362
4363 let fold_ident =
4364 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4365
4366 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4367 && input.metadata().collection_kind.is_bounded()
4368 {
4369 parse_quote!(fold_no_replay)
4370 } else {
4371 parse_quote!(fold)
4372 };
4373
4374 match builders_or_callback {
4375 BuildersOrCallback::Builders(graph_builders) => {
4376 if metadata.location_id.is_top_level()
4377 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4378 && graph_builders.singleton_intermediates()
4379 && !metadata.collection_kind.is_bounded()
4380 {
4381 todo!(
4382 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4383 )
4384 } else {
4385 let builder = graph_builders.get_dfir_mut(&out_location);
4386 builder.add_dfir(
4387 parse_quote! {
4388 #chain_ident = chain();
4389 #input_ident
4390 -> map(|x| (Some(x), None))
4391 -> [0]#chain_ident;
4392 #watermark_ident
4393 -> map(|watermark| (None, Some(watermark)))
4394 -> [1]#chain_ident;
4395
4396 #fold_ident = #chain_ident
4397 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4398 let __reduce_keyed_fn = #f;
4399 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4400 if let Some((k, v)) = opt_payload {
4401 if let Some(curr_watermark) = *opt_curr_watermark {
4402 if k < curr_watermark {
4403 return;
4404 }
4405 }
4406 match map.entry(k) {
4407 ::std::collections::hash_map::Entry::Vacant(e) => {
4408 e.insert(v);
4409 }
4410 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4411 __reduce_keyed_fn(e.get_mut(), v);
4412 }
4413 }
4414 } else {
4415 let watermark = opt_watermark.unwrap();
4416 if let Some(curr_watermark) = *opt_curr_watermark {
4417 if watermark <= curr_watermark {
4418 return;
4419 }
4420 }
4421 map.retain(|k, _| *k >= watermark);
4422 *opt_curr_watermark = Some(watermark);
4423 }
4424 }
4425 })
4426 -> flat_map(|(map, _curr_watermark)| map);
4427 },
4428 None,
4429 Some(&next_stmt_id.to_string()),
4430 );
4431 }
4432 }
4433 BuildersOrCallback::Callback(_, node_callback) => {
4434 node_callback(node, next_stmt_id);
4435 }
4436 }
4437
4438 *next_stmt_id += 1;
4439
4440 ident_stack.push(fold_ident);
4441 }
4442
4443 HydroNode::Network {
4444 networking_info,
4445 serialize_fn: serialize_pipeline,
4446 instantiate_fn,
4447 deserialize_fn: deserialize_pipeline,
4448 input,
4449 ..
4450 } => {
4451 let input_ident = ident_stack.pop().unwrap();
4452
4453 let receiver_stream_ident =
4454 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4455
4456 match builders_or_callback {
4457 BuildersOrCallback::Builders(graph_builders) => {
4458 let (sink_expr, source_expr) = match instantiate_fn {
4459 DebugInstantiate::Building => (
4460 syn::parse_quote!(DUMMY_SINK),
4461 syn::parse_quote!(DUMMY_SOURCE),
4462 ),
4463
4464 DebugInstantiate::Finalized(finalized) => {
4465 (finalized.sink.clone(), finalized.source.clone())
4466 }
4467 };
4468
4469 graph_builders.create_network(
4470 &input.metadata().location_id,
4471 &out_location,
4472 input_ident,
4473 &receiver_stream_ident,
4474 serialize_pipeline.as_ref(),
4475 sink_expr,
4476 source_expr,
4477 deserialize_pipeline.as_ref(),
4478 *next_stmt_id,
4479 networking_info,
4480 );
4481 }
4482 BuildersOrCallback::Callback(_, node_callback) => {
4483 node_callback(node, next_stmt_id);
4484 }
4485 }
4486
4487 *next_stmt_id += 1;
4488
4489 ident_stack.push(receiver_stream_ident);
4490 }
4491
4492 HydroNode::ExternalInput {
4493 instantiate_fn,
4494 deserialize_fn: deserialize_pipeline,
4495 ..
4496 } => {
4497 let receiver_stream_ident =
4498 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4499
4500 match builders_or_callback {
4501 BuildersOrCallback::Builders(graph_builders) => {
4502 let (_, source_expr) = match instantiate_fn {
4503 DebugInstantiate::Building => (
4504 syn::parse_quote!(DUMMY_SINK),
4505 syn::parse_quote!(DUMMY_SOURCE),
4506 ),
4507
4508 DebugInstantiate::Finalized(finalized) => {
4509 (finalized.sink.clone(), finalized.source.clone())
4510 }
4511 };
4512
4513 graph_builders.create_external_source(
4514 &out_location,
4515 source_expr,
4516 &receiver_stream_ident,
4517 deserialize_pipeline.as_ref(),
4518 *next_stmt_id,
4519 );
4520 }
4521 BuildersOrCallback::Callback(_, node_callback) => {
4522 node_callback(node, next_stmt_id);
4523 }
4524 }
4525
4526 *next_stmt_id += 1;
4527
4528 ident_stack.push(receiver_stream_ident);
4529 }
4530
4531 HydroNode::Counter {
4532 tag,
4533 duration,
4534 prefix,
4535 ..
4536 } => {
4537 let input_ident = ident_stack.pop().unwrap();
4538
4539 let counter_ident =
4540 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4541
4542 match builders_or_callback {
4543 BuildersOrCallback::Builders(graph_builders) => {
4544 let arg = format!("{}({})", prefix, tag);
4545 let builder = graph_builders.get_dfir_mut(&out_location);
4546 builder.add_dfir(
4547 parse_quote! {
4548 #counter_ident = #input_ident -> _counter(#arg, #duration);
4549 },
4550 None,
4551 Some(&next_stmt_id.to_string()),
4552 );
4553 }
4554 BuildersOrCallback::Callback(_, node_callback) => {
4555 node_callback(node, next_stmt_id);
4556 }
4557 }
4558
4559 *next_stmt_id += 1;
4560
4561 ident_stack.push(counter_ident);
4562 }
4563 }
4564 },
4565 seen_tees,
4566 false,
4567 );
4568
4569 ident_stack
4570 .pop()
4571 .expect("ident_stack should have exactly one element after traversal")
4572 }
4573
4574 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4575 match self {
4576 HydroNode::Placeholder => {
4577 panic!()
4578 }
4579 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4580 HydroNode::Source { source, .. } => match source {
4581 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4582 HydroSource::ExternalNetwork()
4583 | HydroSource::Spin()
4584 | HydroSource::ClusterMembers(_, _)
4585 | HydroSource::Embedded(_)
4586 | HydroSource::EmbeddedSingleton(_) => {} },
4588 HydroNode::SingletonSource { value, .. } => {
4589 transform(value);
4590 }
4591 HydroNode::CycleSource { .. }
4592 | HydroNode::Tee { .. }
4593 | HydroNode::Singleton { .. }
4594 | HydroNode::YieldConcat { .. }
4595 | HydroNode::BeginAtomic { .. }
4596 | HydroNode::EndAtomic { .. }
4597 | HydroNode::Batch { .. }
4598 | HydroNode::Chain { .. }
4599 | HydroNode::MergeOrdered { .. }
4600 | HydroNode::ChainFirst { .. }
4601 | HydroNode::CrossProduct { .. }
4602 | HydroNode::CrossSingleton { .. }
4603 | HydroNode::ResolveFutures { .. }
4604 | HydroNode::ResolveFuturesBlocking { .. }
4605 | HydroNode::ResolveFuturesOrdered { .. }
4606 | HydroNode::Join { .. }
4607 | HydroNode::JoinHalf { .. }
4608 | HydroNode::Difference { .. }
4609 | HydroNode::AntiJoin { .. }
4610 | HydroNode::DeferTick { .. }
4611 | HydroNode::Enumerate { .. }
4612 | HydroNode::Unique { .. }
4613 | HydroNode::Sort { .. } => {}
4614 HydroNode::Map { f, .. }
4615 | HydroNode::FlatMap { f, .. }
4616 | HydroNode::FlatMapStreamBlocking { f, .. }
4617 | HydroNode::Filter { f, .. }
4618 | HydroNode::FilterMap { f, .. }
4619 | HydroNode::Inspect { f, .. }
4620 | HydroNode::Partition { f, .. }
4621 | HydroNode::Reduce { f, .. }
4622 | HydroNode::ReduceKeyed { f, .. }
4623 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4624 transform(f);
4625 }
4626 HydroNode::Fold { init, acc, .. }
4627 | HydroNode::Scan { init, acc, .. }
4628 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4629 | HydroNode::FoldKeyed { init, acc, .. } => {
4630 transform(init);
4631 transform(acc);
4632 }
4633 HydroNode::Network {
4634 serialize_fn,
4635 deserialize_fn,
4636 ..
4637 } => {
4638 if let Some(serialize_fn) = serialize_fn {
4639 transform(serialize_fn);
4640 }
4641 if let Some(deserialize_fn) = deserialize_fn {
4642 transform(deserialize_fn);
4643 }
4644 }
4645 HydroNode::ExternalInput { deserialize_fn, .. } => {
4646 if let Some(deserialize_fn) = deserialize_fn {
4647 transform(deserialize_fn);
4648 }
4649 }
4650 HydroNode::Counter { duration, .. } => {
4651 transform(duration);
4652 }
4653 }
4654 }
4655
4656 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4657 &self.metadata().op
4658 }
4659
4660 pub fn metadata(&self) -> &HydroIrMetadata {
4661 match self {
4662 HydroNode::Placeholder => {
4663 panic!()
4664 }
4665 HydroNode::Cast { metadata, .. }
4666 | HydroNode::ObserveNonDet { metadata, .. }
4667 | HydroNode::Source { metadata, .. }
4668 | HydroNode::SingletonSource { metadata, .. }
4669 | HydroNode::CycleSource { metadata, .. }
4670 | HydroNode::Tee { metadata, .. }
4671 | HydroNode::Singleton { metadata, .. }
4672 | HydroNode::Partition { metadata, .. }
4673 | HydroNode::YieldConcat { metadata, .. }
4674 | HydroNode::BeginAtomic { metadata, .. }
4675 | HydroNode::EndAtomic { metadata, .. }
4676 | HydroNode::Batch { metadata, .. }
4677 | HydroNode::Chain { metadata, .. }
4678 | HydroNode::MergeOrdered { metadata, .. }
4679 | HydroNode::ChainFirst { metadata, .. }
4680 | HydroNode::CrossProduct { metadata, .. }
4681 | HydroNode::CrossSingleton { metadata, .. }
4682 | HydroNode::Join { metadata, .. }
4683 | HydroNode::JoinHalf { metadata, .. }
4684 | HydroNode::Difference { metadata, .. }
4685 | HydroNode::AntiJoin { metadata, .. }
4686 | HydroNode::ResolveFutures { metadata, .. }
4687 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4688 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4689 | HydroNode::Map { metadata, .. }
4690 | HydroNode::FlatMap { metadata, .. }
4691 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4692 | HydroNode::Filter { metadata, .. }
4693 | HydroNode::FilterMap { metadata, .. }
4694 | HydroNode::DeferTick { metadata, .. }
4695 | HydroNode::Enumerate { metadata, .. }
4696 | HydroNode::Inspect { metadata, .. }
4697 | HydroNode::Unique { metadata, .. }
4698 | HydroNode::Sort { metadata, .. }
4699 | HydroNode::Scan { metadata, .. }
4700 | HydroNode::ScanAsyncBlocking { metadata, .. }
4701 | HydroNode::Fold { metadata, .. }
4702 | HydroNode::FoldKeyed { metadata, .. }
4703 | HydroNode::Reduce { metadata, .. }
4704 | HydroNode::ReduceKeyed { metadata, .. }
4705 | HydroNode::ReduceKeyedWatermark { metadata, .. }
4706 | HydroNode::ExternalInput { metadata, .. }
4707 | HydroNode::Network { metadata, .. }
4708 | HydroNode::Counter { metadata, .. } => metadata,
4709 }
4710 }
4711
4712 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4713 &mut self.metadata_mut().op
4714 }
4715
4716 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4717 match self {
4718 HydroNode::Placeholder => {
4719 panic!()
4720 }
4721 HydroNode::Cast { metadata, .. }
4722 | HydroNode::ObserveNonDet { metadata, .. }
4723 | HydroNode::Source { metadata, .. }
4724 | HydroNode::SingletonSource { metadata, .. }
4725 | HydroNode::CycleSource { metadata, .. }
4726 | HydroNode::Tee { metadata, .. }
4727 | HydroNode::Singleton { metadata, .. }
4728 | HydroNode::Partition { metadata, .. }
4729 | HydroNode::YieldConcat { metadata, .. }
4730 | HydroNode::BeginAtomic { metadata, .. }
4731 | HydroNode::EndAtomic { metadata, .. }
4732 | HydroNode::Batch { metadata, .. }
4733 | HydroNode::Chain { metadata, .. }
4734 | HydroNode::MergeOrdered { metadata, .. }
4735 | HydroNode::ChainFirst { metadata, .. }
4736 | HydroNode::CrossProduct { metadata, .. }
4737 | HydroNode::CrossSingleton { metadata, .. }
4738 | HydroNode::Join { metadata, .. }
4739 | HydroNode::JoinHalf { metadata, .. }
4740 | HydroNode::Difference { metadata, .. }
4741 | HydroNode::AntiJoin { metadata, .. }
4742 | HydroNode::ResolveFutures { metadata, .. }
4743 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4744 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4745 | HydroNode::Map { metadata, .. }
4746 | HydroNode::FlatMap { metadata, .. }
4747 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4748 | HydroNode::Filter { metadata, .. }
4749 | HydroNode::FilterMap { metadata, .. }
4750 | HydroNode::DeferTick { metadata, .. }
4751 | HydroNode::Enumerate { metadata, .. }
4752 | HydroNode::Inspect { metadata, .. }
4753 | HydroNode::Unique { metadata, .. }
4754 | HydroNode::Sort { metadata, .. }
4755 | HydroNode::Scan { metadata, .. }
4756 | HydroNode::ScanAsyncBlocking { metadata, .. }
4757 | HydroNode::Fold { metadata, .. }
4758 | HydroNode::FoldKeyed { metadata, .. }
4759 | HydroNode::Reduce { metadata, .. }
4760 | HydroNode::ReduceKeyed { metadata, .. }
4761 | HydroNode::ReduceKeyedWatermark { metadata, .. }
4762 | HydroNode::ExternalInput { metadata, .. }
4763 | HydroNode::Network { metadata, .. }
4764 | HydroNode::Counter { metadata, .. } => metadata,
4765 }
4766 }
4767
4768 pub fn input(&self) -> Vec<&HydroNode> {
4769 match self {
4770 HydroNode::Placeholder => {
4771 panic!()
4772 }
4773 HydroNode::Source { .. }
4774 | HydroNode::SingletonSource { .. }
4775 | HydroNode::ExternalInput { .. }
4776 | HydroNode::CycleSource { .. }
4777 | HydroNode::Tee { .. }
4778 | HydroNode::Singleton { .. }
4779 | HydroNode::Partition { .. } => {
4780 vec![]
4782 }
4783 HydroNode::Cast { inner, .. }
4784 | HydroNode::ObserveNonDet { inner, .. }
4785 | HydroNode::YieldConcat { inner, .. }
4786 | HydroNode::BeginAtomic { inner, .. }
4787 | HydroNode::EndAtomic { inner, .. }
4788 | HydroNode::Batch { inner, .. } => {
4789 vec![inner]
4790 }
4791 HydroNode::Chain { first, second, .. } => {
4792 vec![first, second]
4793 }
4794 HydroNode::MergeOrdered { first, second, .. } => {
4795 vec![first, second]
4796 }
4797 HydroNode::ChainFirst { first, second, .. } => {
4798 vec![first, second]
4799 }
4800 HydroNode::CrossProduct { left, right, .. }
4801 | HydroNode::CrossSingleton { left, right, .. }
4802 | HydroNode::Join { left, right, .. }
4803 | HydroNode::JoinHalf { left, right, .. } => {
4804 vec![left, right]
4805 }
4806 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4807 vec![pos, neg]
4808 }
4809 HydroNode::Map { input, .. }
4810 | HydroNode::FlatMap { input, .. }
4811 | HydroNode::FlatMapStreamBlocking { input, .. }
4812 | HydroNode::Filter { input, .. }
4813 | HydroNode::FilterMap { input, .. }
4814 | HydroNode::Sort { input, .. }
4815 | HydroNode::DeferTick { input, .. }
4816 | HydroNode::Enumerate { input, .. }
4817 | HydroNode::Inspect { input, .. }
4818 | HydroNode::Unique { input, .. }
4819 | HydroNode::Network { input, .. }
4820 | HydroNode::Counter { input, .. }
4821 | HydroNode::ResolveFutures { input, .. }
4822 | HydroNode::ResolveFuturesBlocking { input, .. }
4823 | HydroNode::ResolveFuturesOrdered { input, .. }
4824 | HydroNode::Fold { input, .. }
4825 | HydroNode::FoldKeyed { input, .. }
4826 | HydroNode::Reduce { input, .. }
4827 | HydroNode::ReduceKeyed { input, .. }
4828 | HydroNode::Scan { input, .. }
4829 | HydroNode::ScanAsyncBlocking { input, .. } => {
4830 vec![input]
4831 }
4832 HydroNode::ReduceKeyedWatermark {
4833 input, watermark, ..
4834 } => {
4835 vec![input, watermark]
4836 }
4837 }
4838 }
4839
4840 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4841 self.input()
4842 .iter()
4843 .map(|input_node| input_node.metadata())
4844 .collect()
4845 }
4846
4847 pub fn is_shared_with_others(&self) -> bool {
4851 match self {
4852 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4853 Rc::strong_count(&inner.0) > 1
4854 }
4855 HydroNode::Singleton { .. } => false,
4858 _ => false,
4859 }
4860 }
4861
4862 pub fn print_root(&self) -> String {
4863 match self {
4864 HydroNode::Placeholder => {
4865 panic!()
4866 }
4867 HydroNode::Cast { .. } => "Cast()".to_owned(),
4868 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4869 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4870 HydroNode::SingletonSource {
4871 value,
4872 first_tick_only,
4873 ..
4874 } => format!(
4875 "SingletonSource({:?}, first_tick_only={})",
4876 value, first_tick_only
4877 ),
4878 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4879 HydroNode::Tee { inner, .. } => {
4880 format!("Tee({})", inner.0.borrow().print_root())
4881 }
4882 HydroNode::Singleton { inner, .. } => {
4883 format!("Singleton({})", inner.0.borrow().print_root())
4884 }
4885 HydroNode::Partition { f, is_true, .. } => {
4886 format!("Partition({:?}, is_true={})", f, is_true)
4887 }
4888 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4889 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4890 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4891 HydroNode::Batch { .. } => "Batch()".to_owned(),
4892 HydroNode::Chain { first, second, .. } => {
4893 format!("Chain({}, {})", first.print_root(), second.print_root())
4894 }
4895 HydroNode::MergeOrdered { first, second, .. } => {
4896 format!(
4897 "MergeOrdered({}, {})",
4898 first.print_root(),
4899 second.print_root()
4900 )
4901 }
4902 HydroNode::ChainFirst { first, second, .. } => {
4903 format!(
4904 "ChainFirst({}, {})",
4905 first.print_root(),
4906 second.print_root()
4907 )
4908 }
4909 HydroNode::CrossProduct { left, right, .. } => {
4910 format!(
4911 "CrossProduct({}, {})",
4912 left.print_root(),
4913 right.print_root()
4914 )
4915 }
4916 HydroNode::CrossSingleton { left, right, .. } => {
4917 format!(
4918 "CrossSingleton({}, {})",
4919 left.print_root(),
4920 right.print_root()
4921 )
4922 }
4923 HydroNode::Join { left, right, .. } => {
4924 format!("Join({}, {})", left.print_root(), right.print_root())
4925 }
4926 HydroNode::JoinHalf { left, right, .. } => {
4927 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4928 }
4929 HydroNode::Difference { pos, neg, .. } => {
4930 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4931 }
4932 HydroNode::AntiJoin { pos, neg, .. } => {
4933 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4934 }
4935 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4936 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4937 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4938 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4939 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4940 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4941 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4942 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4943 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4944 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4945 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4946 HydroNode::Unique { .. } => "Unique()".to_owned(),
4947 HydroNode::Sort { .. } => "Sort()".to_owned(),
4948 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4949 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4950 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4951 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4952 }
4953 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4954 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4955 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4956 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4957 HydroNode::Network { .. } => "Network()".to_owned(),
4958 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4959 HydroNode::Counter { tag, duration, .. } => {
4960 format!("Counter({:?}, {:?})", tag, duration)
4961 }
4962 }
4963 }
4964}
4965
4966#[cfg(feature = "build")]
4967fn instantiate_network<'a, D>(
4968 env: &mut D::InstantiateEnv,
4969 from_location: &LocationId,
4970 to_location: &LocationId,
4971 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4972 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4973 name: Option<&str>,
4974 networking_info: &crate::networking::NetworkingInfo,
4975) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4976where
4977 D: Deploy<'a>,
4978{
4979 let ((sink, source), connect_fn) = match (from_location, to_location) {
4980 (&LocationId::Process(from), &LocationId::Process(to)) => {
4981 let from_node = processes
4982 .get(from)
4983 .unwrap_or_else(|| {
4984 panic!("A process used in the graph was not instantiated: {}", from)
4985 })
4986 .clone();
4987 let to_node = processes
4988 .get(to)
4989 .unwrap_or_else(|| {
4990 panic!("A process used in the graph was not instantiated: {}", to)
4991 })
4992 .clone();
4993
4994 let sink_port = from_node.next_port();
4995 let source_port = to_node.next_port();
4996
4997 (
4998 D::o2o_sink_source(
4999 env,
5000 &from_node,
5001 &sink_port,
5002 &to_node,
5003 &source_port,
5004 name,
5005 networking_info,
5006 ),
5007 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5008 )
5009 }
5010 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5011 let from_node = processes
5012 .get(from)
5013 .unwrap_or_else(|| {
5014 panic!("A process used in the graph was not instantiated: {}", from)
5015 })
5016 .clone();
5017 let to_node = clusters
5018 .get(to)
5019 .unwrap_or_else(|| {
5020 panic!("A cluster used in the graph was not instantiated: {}", to)
5021 })
5022 .clone();
5023
5024 let sink_port = from_node.next_port();
5025 let source_port = to_node.next_port();
5026
5027 (
5028 D::o2m_sink_source(
5029 env,
5030 &from_node,
5031 &sink_port,
5032 &to_node,
5033 &source_port,
5034 name,
5035 networking_info,
5036 ),
5037 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5038 )
5039 }
5040 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5041 let from_node = clusters
5042 .get(from)
5043 .unwrap_or_else(|| {
5044 panic!("A cluster used in the graph was not instantiated: {}", from)
5045 })
5046 .clone();
5047 let to_node = processes
5048 .get(to)
5049 .unwrap_or_else(|| {
5050 panic!("A process used in the graph was not instantiated: {}", to)
5051 })
5052 .clone();
5053
5054 let sink_port = from_node.next_port();
5055 let source_port = to_node.next_port();
5056
5057 (
5058 D::m2o_sink_source(
5059 env,
5060 &from_node,
5061 &sink_port,
5062 &to_node,
5063 &source_port,
5064 name,
5065 networking_info,
5066 ),
5067 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5068 )
5069 }
5070 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5071 let from_node = clusters
5072 .get(from)
5073 .unwrap_or_else(|| {
5074 panic!("A cluster used in the graph was not instantiated: {}", from)
5075 })
5076 .clone();
5077 let to_node = clusters
5078 .get(to)
5079 .unwrap_or_else(|| {
5080 panic!("A cluster used in the graph was not instantiated: {}", to)
5081 })
5082 .clone();
5083
5084 let sink_port = from_node.next_port();
5085 let source_port = to_node.next_port();
5086
5087 (
5088 D::m2m_sink_source(
5089 env,
5090 &from_node,
5091 &sink_port,
5092 &to_node,
5093 &source_port,
5094 name,
5095 networking_info,
5096 ),
5097 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5098 )
5099 }
5100 (LocationId::Tick(_, _), _) => panic!(),
5101 (_, LocationId::Tick(_, _)) => panic!(),
5102 (LocationId::Atomic(_), _) => panic!(),
5103 (_, LocationId::Atomic(_)) => panic!(),
5104 };
5105 (sink, source, connect_fn)
5106}
5107
5108#[cfg(test)]
5109mod serde_test;
5110
5111#[cfg(test)]
5112mod test {
5113 use std::mem::size_of;
5114
5115 use stageleft::{QuotedWithContext, q};
5116
5117 use super::*;
5118
5119 #[test]
5120 #[cfg_attr(
5121 not(feature = "build"),
5122 ignore = "expects inclusion of feature-gated fields"
5123 )]
5124 fn hydro_node_size() {
5125 assert_eq!(size_of::<HydroNode>(), 248);
5126 }
5127
5128 #[test]
5129 #[cfg_attr(
5130 not(feature = "build"),
5131 ignore = "expects inclusion of feature-gated fields"
5132 )]
5133 fn hydro_root_size() {
5134 assert_eq!(size_of::<HydroRoot>(), 136);
5135 }
5136
5137 #[test]
5138 fn test_simplify_q_macro_basic() {
5139 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5141 let result = simplify_q_macro(simple_expr.clone());
5142 assert_eq!(result, simple_expr);
5143 }
5144
5145 #[test]
5146 fn test_simplify_q_macro_actual_stageleft_call() {
5147 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5149 let result = simplify_q_macro(stageleft_call);
5150 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5153 }
5154
5155 #[test]
5156 fn test_closure_no_pipe_at_start() {
5157 let stageleft_call = q!({
5159 let foo = 123;
5160 move |b: usize| b + foo
5161 })
5162 .splice_fn1_ctx(&());
5163 let result = simplify_q_macro(stageleft_call);
5164 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5165 }
5166}