Skip to main content

hydro_lang/compile/
built.rs

1use std::marker::PhantomData;
2
3use dfir_lang::graph::{
4    DfirGraph, FlatGraphBuilderOutput, eliminate_extra_unions_tees, partition_graph,
5};
6use slotmap::{SecondaryMap, SlotMap};
7
8use super::compiled::CompiledFlow;
9use super::deploy::{DeployFlow, DeployResult};
10use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
11use super::ir::{HydroRoot, emit};
12use crate::location::{Cluster, External, LocationKey, LocationType, Process};
13#[cfg(stageleft_runtime)]
14#[cfg(feature = "sim")]
15use crate::sim::{flow::SimFlow, graph::SimNode};
16use crate::staging_util::Invariant;
17#[cfg(stageleft_runtime)]
18#[cfg(feature = "viz")]
19use crate::viz::api::GraphApi;
20
21pub struct BuiltFlow<'a> {
22    pub(super) ir: Vec<HydroRoot>,
23    pub(super) locations: SlotMap<LocationKey, LocationType>,
24    pub(super) location_names: SecondaryMap<LocationKey, String>,
25
26    /// Compile-time sidecar directives extracted from the flow state.
27    pub(super) sidecars: Vec<super::builder::Sidecar>,
28
29    /// Application name used in telemetry.
30    pub(super) flow_name: String,
31
32    pub(super) _phantom: Invariant<'a>,
33}
34
35pub(crate) fn build_inner(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, DfirGraph> {
36    emit(ir)
37        .into_iter()
38        .map(|(k, v)| {
39            let FlatGraphBuilderOutput { mut flat_graph, .. } =
40                v.build().expect("Failed to build DFIR flat graph.");
41            eliminate_extra_unions_tees(&mut flat_graph);
42            let partitioned_graph =
43                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
44            (k, partitioned_graph)
45        })
46        .collect()
47}
48
49impl<'a> BuiltFlow<'a> {
50    /// Returns all [`HydroRoot`]s in the IR.
51    pub fn ir(&self) -> &[HydroRoot] {
52        &self.ir
53    }
54
55    /// Serialize the IR as JSON.
56    #[cfg(feature = "runtime_support")]
57    pub fn ir_json(&self) -> Result<String, serde_json::Error> {
58        super::ir::serialize_dedup_shared(|| serde_json::to_string_pretty(&self.ir))
59    }
60
61    /// Returns all raw location ID -> location name mappings.
62    pub fn location_names(&self) -> &SecondaryMap<LocationKey, String> {
63        &self.location_names
64    }
65
66    /// Get a GraphApi instance for this built flow
67    #[cfg(stageleft_runtime)]
68    #[cfg(feature = "viz")]
69    pub fn graph_api(&self) -> GraphApi<'_> {
70        GraphApi::new(&self.ir, self.location_names())
71    }
72
73    /// Render graph to string in the given format.
74    #[cfg(feature = "viz")]
75    pub fn render_graph(
76        &self,
77        format: crate::viz::config::GraphType,
78        use_short_labels: bool,
79        show_metadata: bool,
80    ) -> String {
81        self.graph_api()
82            .render(format, use_short_labels, show_metadata)
83    }
84
85    /// Write graph to file.
86    #[cfg(feature = "viz")]
87    pub fn write_graph_to_file(
88        &self,
89        format: crate::viz::config::GraphType,
90        filename: &str,
91        use_short_labels: bool,
92        show_metadata: bool,
93    ) -> Result<(), Box<dyn std::error::Error>> {
94        self.graph_api()
95            .write_to_file(format, filename, use_short_labels, show_metadata)
96    }
97
98    /// Generate graph based on CLI config. Returns Some(path) if written.
99    #[cfg(feature = "viz")]
100    pub fn generate_graph(
101        &self,
102        config: &crate::viz::config::GraphConfig,
103    ) -> Result<Option<String>, Box<dyn std::error::Error>> {
104        self.graph_api().generate_graph(config)
105    }
106
107    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroRoot])) -> Self {
108        f(&mut self.ir);
109        self
110    }
111
112    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
113        self.into_deploy()
114    }
115
116    #[cfg(feature = "sim")]
117    /// Creates a simulation for this builder, which can be used to run deterministic simulations
118    /// of the Hydro program.
119    pub fn sim(self) -> SimFlow<'a> {
120        use std::cell::RefCell;
121        use std::rc::Rc;
122
123        use slotmap::SparseSecondaryMap;
124
125        use crate::sim::graph::SimNodePort;
126
127        let shared_port_counter = Rc::new(RefCell::new(SimNodePort::default()));
128
129        let mut processes = SparseSecondaryMap::new();
130        let mut clusters = SparseSecondaryMap::new();
131        let externals = SparseSecondaryMap::new();
132
133        for (key, loc) in self.locations.iter() {
134            match loc {
135                LocationType::Process => {
136                    processes.insert(
137                        key,
138                        SimNode {
139                            shared_port_counter: shared_port_counter.clone(),
140                        },
141                    );
142                }
143                LocationType::Cluster => {
144                    clusters.insert(
145                        key,
146                        SimNode {
147                            shared_port_counter: shared_port_counter.clone(),
148                        },
149                    );
150                }
151                LocationType::External => {
152                    panic!("Sim cannot have externals");
153                }
154            }
155        }
156
157        SimFlow {
158            ir: self.ir,
159            processes,
160            clusters,
161            externals,
162            cluster_max_sizes: SparseSecondaryMap::new(),
163            externals_port_registry: Default::default(),
164            test_safety_only: false,
165            unit_test_fuzz_iterations: 8192,
166            _phantom: PhantomData,
167        }
168    }
169
170    pub fn into_deploy<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
171        let (processes, clusters, externals) = Default::default();
172        DeployFlow {
173            ir: self.ir,
174            locations: self.locations,
175            location_names: self.location_names,
176            processes,
177            clusters,
178            externals,
179            sidecars: self.sidecars,
180            flow_name: self.flow_name,
181            _phantom: PhantomData,
182        }
183    }
184
185    pub fn with_process<P, D: Deploy<'a>>(
186        self,
187        process: &Process<P>,
188        spec: impl IntoProcessSpec<'a, D>,
189    ) -> DeployFlow<'a, D> {
190        self.into_deploy().with_process(process, spec)
191    }
192
193    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
194        self,
195        spec: impl Fn() -> S,
196    ) -> DeployFlow<'a, D> {
197        self.into_deploy().with_remaining_processes(spec)
198    }
199
200    pub fn with_external<P, D: Deploy<'a>>(
201        self,
202        process: &External<P>,
203        spec: impl ExternalSpec<'a, D>,
204    ) -> DeployFlow<'a, D> {
205        self.into_deploy().with_external(process, spec)
206    }
207
208    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
209        self,
210        spec: impl Fn() -> S,
211    ) -> DeployFlow<'a, D> {
212        self.into_deploy().with_remaining_externals(spec)
213    }
214
215    pub fn with_cluster<C, D: Deploy<'a>>(
216        self,
217        cluster: &Cluster<C>,
218        spec: impl ClusterSpec<'a, D>,
219    ) -> DeployFlow<'a, D> {
220        self.into_deploy().with_cluster(cluster, spec)
221    }
222
223    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
224        self,
225        spec: impl Fn() -> S,
226    ) -> DeployFlow<'a, D> {
227        self.into_deploy().with_remaining_clusters(spec)
228    }
229
230    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
231        self.into_deploy::<D>().compile()
232    }
233
234    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
235        self.into_deploy::<D>().deploy(env)
236    }
237}