Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46use crate::location::dynamic::LocationId;
47use crate::location::external_process::{
48    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
49};
50use crate::nondet::NonDet;
51#[cfg(feature = "sim")]
52use crate::sim::SimSender;
53use crate::staging_util::get_this_crate;
54
55pub mod dynamic;
56
57pub mod external_process;
58pub use external_process::External;
59
60pub mod process;
61pub use process::Process;
62
63pub mod cluster;
64pub use cluster::Cluster;
65
66pub mod member_id;
67pub use member_id::{MemberId, TaglessMemberId};
68
69pub mod tick;
70pub use tick::{Atomic, NoTick, Tick};
71
72/// An event indicating a change in membership status of a location in a group
73/// (e.g. a node in a [`Cluster`] or an external client connection).
74#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
75pub enum MembershipEvent {
76    /// The member has joined the group and is now active.
77    Joined,
78    /// The member has left the group and is no longer active.
79    Left,
80}
81
82/// A hint for configuring the network transport used by an external connection.
83///
84/// This controls how the underlying TCP listener is set up when binding
85/// external client connections via methods like [`Location::bind_single_client`]
86/// or [`Location::bidi_external_many_bytes`].
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88pub enum NetworkHint {
89    /// Automatically select the network configuration (e.g. an ephemeral port).
90    Auto,
91    /// Use a TCP port, optionally specifying a fixed port number.
92    ///
93    /// If `None`, an available port will be chosen automatically.
94    /// If `Some(port)`, the given port number will be used.
95    TcpPort(Option<u16>),
96}
97
98pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
99    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
100}
101
102#[stageleft::export(LocationKey)]
103new_key_type! {
104    /// A unique identifier for a clock tick.
105    pub struct LocationKey;
106}
107
108impl std::fmt::Display for LocationKey {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
111    }
112}
113
114/// This is used for the ECS membership stream.
115/// TODO(mingwei): Make this more robust?
116impl std::str::FromStr for LocationKey {
117    type Err = Option<ParseIntError>;
118
119    fn from_str(s: &str) -> Result<Self, Self::Err> {
120        let nvn = s.strip_prefix("loc").ok_or(None)?;
121        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
122        let idx: u64 = idx.parse()?;
123        let ver: u64 = ver.parse()?;
124        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
125    }
126}
127
128impl LocationKey {
129    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
130    /// The first location key, used by the simulator as the default external location.
131    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
132
133    /// A key for testing with index 1.
134    #[cfg(test)]
135    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); // `1v255`
136
137    /// A key for testing with index 2.
138    #[cfg(test)]
139    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); // `2v255`
140}
141
142/// This is used within `q!` code in docker and ECS.
143impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
144    type O = LocationKey;
145
146    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
147    where
148        Self: Sized,
149    {
150        let root = get_this_crate();
151        let n = Key::data(&self).as_ffi();
152        (
153            QuoteTokens {
154                prelude: None,
155                expr: Some(quote! {
156                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
157                }),
158            },
159            (),
160        )
161    }
162}
163
164/// A simple enum for the type of a root location.
165#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
166pub enum LocationType {
167    /// A process (single node).
168    Process,
169    /// A cluster (multiple nodes).
170    Cluster,
171    /// An external client.
172    External,
173}
174
175/// A location where data can be materialized and computation can be executed.
176///
177/// Hydro is a **global**, **distributed** programming model. This means that the data
178/// and computation in a Hydro program can be spread across multiple machines, data
179/// centers, and even continents. To achieve this, Hydro uses the concept of
180/// **locations** to keep track of _where_ data is located and computation is executed.
181///
182/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
183/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
184/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
185/// to allow live collections to be _moved_ between locations via network send/receive.
186///
187/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
188#[expect(
189    private_bounds,
190    reason = "only internal Hydro code can define location types"
191)]
192pub trait Location<'a>: dynamic::DynLocation {
193    /// The root location type for this location.
194    ///
195    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
196    /// For nested locations like [`Tick`], this is the root location that contains it.
197    type Root: Location<'a>;
198
199    /// Returns the root location for this location.
200    ///
201    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
202    /// For nested locations like [`Tick`], this returns the root location that contains it.
203    fn root(&self) -> Self::Root;
204
205    /// Attempts to create a new [`Tick`] clock domain at this location.
206    ///
207    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
208    /// or `None` if this location is already inside a tick (nested ticks are not supported).
209    ///
210    /// Prefer using [`Location::tick`] when you know the location is top-level.
211    fn try_tick(&self) -> Option<Tick<Self>> {
212        if Self::is_top_level() {
213            let id = self.flow_state().borrow_mut().next_clock_id();
214            Some(Tick {
215                id,
216                l: self.clone(),
217            })
218        } else {
219            None
220        }
221    }
222
223    /// Returns the unique identifier for this location.
224    fn id(&self) -> LocationId {
225        dynamic::DynLocation::id(self)
226    }
227
228    /// Creates a new [`Tick`] clock domain at this location.
229    ///
230    /// A tick represents a logical clock that can be used to batch streaming data
231    /// into discrete time steps. This is useful for implementing iterative algorithms
232    /// or for synchronizing data across multiple streams.
233    ///
234    /// # Example
235    /// ```rust
236    /// # #[cfg(feature = "deploy")] {
237    /// # use hydro_lang::prelude::*;
238    /// # use futures::StreamExt;
239    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
240    /// let tick = process.tick();
241    /// let inside_tick = process
242    ///     .source_iter(q!(vec![1, 2, 3, 4]))
243    ///     .batch(&tick, nondet!(/** test */));
244    /// inside_tick.all_ticks()
245    /// # }, |mut stream| async move {
246    /// // 1, 2, 3, 4
247    /// # for w in vec![1, 2, 3, 4] {
248    /// #     assert_eq!(stream.next().await.unwrap(), w);
249    /// # }
250    /// # }));
251    /// # }
252    /// ```
253    fn tick(&self) -> Tick<Self>
254    where
255        Self: NoTick,
256    {
257        let id = self.flow_state().borrow_mut().next_clock_id();
258        Tick {
259            id,
260            l: self.clone(),
261        }
262    }
263
264    /// Creates an unbounded stream that continuously emits unit values `()`.
265    ///
266    /// This is useful for driving computations that need to run continuously,
267    /// such as polling or heartbeat mechanisms.
268    ///
269    /// # Example
270    /// ```rust
271    /// # #[cfg(feature = "deploy")] {
272    /// # use hydro_lang::prelude::*;
273    /// # use futures::StreamExt;
274    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
275    /// let tick = process.tick();
276    /// process.spin()
277    ///     .batch(&tick, nondet!(/** test */))
278    ///     .map(q!(|_| 42))
279    ///     .all_ticks()
280    /// # }, |mut stream| async move {
281    /// // 42, 42, 42, ...
282    /// # assert_eq!(stream.next().await.unwrap(), 42);
283    /// # assert_eq!(stream.next().await.unwrap(), 42);
284    /// # assert_eq!(stream.next().await.unwrap(), 42);
285    /// # }));
286    /// # }
287    /// ```
288    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
289    where
290        Self: Sized + NoTick,
291    {
292        Stream::new(
293            self.clone(),
294            HydroNode::Source {
295                source: HydroSource::Spin(),
296                metadata: self.new_node_metadata(Stream::<
297                    (),
298                    Self,
299                    Unbounded,
300                    TotalOrder,
301                    ExactlyOnce,
302                >::collection_kind()),
303            },
304        )
305    }
306
307    /// Creates a stream from an async [`FuturesStream`].
308    ///
309    /// This is useful for integrating with external async data sources,
310    /// such as network connections or file readers.
311    ///
312    /// # Example
313    /// ```rust
314    /// # #[cfg(feature = "deploy")] {
315    /// # use hydro_lang::prelude::*;
316    /// # use futures::StreamExt;
317    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
318    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
319    /// # }, |mut stream| async move {
320    /// // 1, 2, 3
321    /// # for w in vec![1, 2, 3] {
322    /// #     assert_eq!(stream.next().await.unwrap(), w);
323    /// # }
324    /// # }));
325    /// # }
326    /// ```
327    fn source_stream<T, E>(
328        &self,
329        e: impl QuotedWithContext<'a, E, Self>,
330    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
331    where
332        E: FuturesStream<Item = T> + Unpin,
333        Self: Sized + NoTick,
334    {
335        let e = e.splice_untyped_ctx(self);
336
337        Stream::new(
338            self.clone(),
339            HydroNode::Source {
340                source: HydroSource::Stream(e.into()),
341                metadata: self.new_node_metadata(Stream::<
342                    T,
343                    Self,
344                    Unbounded,
345                    TotalOrder,
346                    ExactlyOnce,
347                >::collection_kind()),
348            },
349        )
350    }
351
352    /// Creates a bounded stream from an iterator.
353    ///
354    /// The iterator is evaluated once at runtime, and all elements are emitted
355    /// in order. This is useful for creating streams from static data or
356    /// for testing.
357    ///
358    /// # Example
359    /// ```rust
360    /// # #[cfg(feature = "deploy")] {
361    /// # use hydro_lang::prelude::*;
362    /// # use futures::StreamExt;
363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364    /// process.source_iter(q!(vec![1, 2, 3, 4]))
365    /// # }, |mut stream| async move {
366    /// // 1, 2, 3, 4
367    /// # for w in vec![1, 2, 3, 4] {
368    /// #     assert_eq!(stream.next().await.unwrap(), w);
369    /// # }
370    /// # }));
371    /// # }
372    /// ```
373    fn source_iter<T, E>(
374        &self,
375        e: impl QuotedWithContext<'a, E, Self>,
376    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
377    where
378        E: IntoIterator<Item = T>,
379        Self: Sized,
380    {
381        let e = e.splice_typed_ctx(self);
382
383        Stream::new(
384            self.clone(),
385            HydroNode::Source {
386                source: HydroSource::Iter(e.into()),
387                metadata: self.new_node_metadata(
388                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
389                ),
390            },
391        )
392    }
393
394    /// Creates a stream of membership events for a cluster.
395    ///
396    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
397    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
398    /// keyed by the [`MemberId`] of the cluster member.
399    ///
400    /// This is useful for implementing protocols that need to track cluster membership,
401    /// such as broadcasting to all members or detecting failures.
402    ///
403    /// # Example
404    /// ```rust
405    /// # #[cfg(feature = "deploy")] {
406    /// # use hydro_lang::prelude::*;
407    /// # use futures::StreamExt;
408    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
409    /// let p1 = flow.process::<()>();
410    /// let workers: Cluster<()> = flow.cluster::<()>();
411    /// # // do nothing on each worker
412    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
413    /// let cluster_members = p1.source_cluster_members(&workers);
414    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
415    /// // if there are 4 members in the cluster, we would see a join event for each
416    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
417    /// # }, |mut stream| async move {
418    /// # let mut results = Vec::new();
419    /// # for w in 0..4 {
420    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
421    /// # }
422    /// # results.sort();
423    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
424    /// # }));
425    /// # }
426    /// ```
427    fn source_cluster_members<C: 'a>(
428        &self,
429        cluster: &Cluster<'a, C>,
430    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
431    where
432        Self: Sized + NoTick,
433    {
434        Stream::new(
435            self.clone(),
436            HydroNode::Source {
437                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
438                metadata: self.new_node_metadata(Stream::<
439                    (TaglessMemberId, MembershipEvent),
440                    Self,
441                    Unbounded,
442                    TotalOrder,
443                    ExactlyOnce,
444                >::collection_kind()),
445            },
446        )
447        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
448        .into_keyed()
449    }
450
451    /// Creates a one-way connection from an external process to receive raw bytes.
452    ///
453    /// Returns a port handle for the external process to connect to, and a stream
454    /// of received byte buffers.
455    ///
456    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
457    /// or [`Location::source_external_bincode`].
458    fn source_external_bytes<L>(
459        &self,
460        from: &External<L>,
461    ) -> (
462        ExternalBytesPort,
463        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
464    )
465    where
466        Self: Sized + NoTick,
467    {
468        let (port, stream, sink) =
469            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
470
471        sink.complete(self.source_iter(q!([])));
472
473        (port, stream)
474    }
475
476    /// Creates a one-way connection from an external process to receive bincode-serialized data.
477    ///
478    /// Returns a sink handle for the external process to send data to, and a stream
479    /// of received values.
480    ///
481    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
482    #[expect(clippy::type_complexity, reason = "stream markers")]
483    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
484        &self,
485        from: &External<L>,
486    ) -> (
487        ExternalBincodeSink<T, NotMany, O, R>,
488        Stream<T, Self, Unbounded, O, R>,
489    )
490    where
491        Self: Sized + NoTick,
492        T: Serialize + DeserializeOwned,
493    {
494        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
495        sink.complete(self.source_iter(q!([])));
496
497        (
498            ExternalBincodeSink {
499                process_key: from.key,
500                port_id: port.port_id,
501                _phantom: PhantomData,
502            },
503            stream.weaken_ordering().weaken_retries(),
504        )
505    }
506
507    /// Sets up a simulated input port on this location for testing.
508    ///
509    /// Returns a handle to send messages to the location as well as a stream
510    /// of received messages. This is only available when the `sim` feature is enabled.
511    #[cfg(feature = "sim")]
512    #[expect(clippy::type_complexity, reason = "stream markers")]
513    fn sim_input<T, O: Ordering, R: Retries>(
514        &self,
515    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
516    where
517        Self: Sized + NoTick,
518        T: Serialize + DeserializeOwned,
519    {
520        let external_location: External<'a, ()> = External {
521            key: LocationKey::FIRST,
522            flow_state: self.flow_state().clone(),
523            _phantom: PhantomData,
524        };
525
526        let (external, stream) = self.source_external_bincode(&external_location);
527
528        (SimSender(external.port_id, PhantomData), stream)
529    }
530
531    /// Creates an external input stream for embedded deployment mode.
532    ///
533    /// The `name` parameter specifies the name of the generated function parameter
534    /// that will supply data to this stream at runtime. The generated function will
535    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
536    fn embedded_input<T>(
537        &self,
538        name: impl Into<String>,
539    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
540    where
541        Self: Sized + NoTick,
542    {
543        let ident = syn::Ident::new(&name.into(), Span::call_site());
544
545        Stream::new(
546            self.clone(),
547            HydroNode::Source {
548                source: HydroSource::Embedded(ident),
549                metadata: self.new_node_metadata(Stream::<
550                    T,
551                    Self,
552                    Unbounded,
553                    TotalOrder,
554                    ExactlyOnce,
555                >::collection_kind()),
556            },
557        )
558    }
559
560    /// Creates an embedded singleton input for embedded deployment mode.
561    ///
562    /// The `name` parameter specifies the name of the generated function parameter
563    /// that will supply data to this singleton at runtime. The generated function will
564    /// accept a plain `T` parameter with this name.
565    fn embedded_singleton_input<T>(&self, name: impl Into<String>) -> Singleton<T, Self, Bounded>
566    where
567        Self: Sized + NoTick,
568    {
569        let ident = syn::Ident::new(&name.into(), Span::call_site());
570
571        Singleton::new(
572            self.clone(),
573            HydroNode::Source {
574                source: HydroSource::EmbeddedSingleton(ident),
575                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
576            },
577        )
578    }
579
580    /// Establishes a server on this location to receive a bidirectional connection from a single
581    /// client, identified by the given `External` handle. Returns a port handle for the external
582    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
583    /// messages.
584    ///
585    /// # Example
586    /// ```rust
587    /// # #[cfg(feature = "deploy")] {
588    /// # use hydro_lang::prelude::*;
589    /// # use hydro_deploy::Deployment;
590    /// # use futures::{SinkExt, StreamExt};
591    /// # tokio_test::block_on(async {
592    /// # use bytes::Bytes;
593    /// # use hydro_lang::location::NetworkHint;
594    /// # use tokio_util::codec::LengthDelimitedCodec;
595    /// # let mut flow = FlowBuilder::new();
596    /// let node = flow.process::<()>();
597    /// let external = flow.external::<()>();
598    /// let (port, incoming, outgoing) =
599    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
600    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
601    ///     let mut resp: Vec<u8> = data.into();
602    ///     resp.push(42);
603    ///     resp.into() // : Bytes
604    /// })));
605    ///
606    /// # let mut deployment = Deployment::new();
607    /// let nodes = flow // ... with_process and with_external
608    /// #     .with_process(&node, deployment.Localhost())
609    /// #     .with_external(&external, deployment.Localhost())
610    /// #     .deploy(&mut deployment);
611    ///
612    /// deployment.deploy().await.unwrap();
613    /// deployment.start().await.unwrap();
614    ///
615    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
616    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
617    /// assert_eq!(
618    ///     external_out.next().await.unwrap().unwrap(),
619    ///     vec![1, 2, 3, 42]
620    /// );
621    /// # });
622    /// # }
623    /// ```
624    #[expect(clippy::type_complexity, reason = "stream markers")]
625    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
626        &self,
627        from: &External<L>,
628        port_hint: NetworkHint,
629    ) -> (
630        ExternalBytesPort<NotMany>,
631        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
632        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
633    )
634    where
635        Self: Sized + NoTick,
636    {
637        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
638
639        let (fwd_ref, to_sink) =
640            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
641        let mut flow_state_borrow = self.flow_state().borrow_mut();
642
643        flow_state_borrow.push_root(HydroRoot::SendExternal {
644            to_external_key: from.key,
645            to_port_id: next_external_port_id,
646            to_many: false,
647            unpaired: false,
648            serialize_fn: None,
649            instantiate_fn: DebugInstantiate::Building,
650            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
651            op_metadata: HydroIrOpMetadata::new(),
652        });
653
654        let raw_stream: Stream<
655            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
656            Self,
657            Unbounded,
658            TotalOrder,
659            ExactlyOnce,
660        > = Stream::new(
661            self.clone(),
662            HydroNode::ExternalInput {
663                from_external_key: from.key,
664                from_port_id: next_external_port_id,
665                from_many: false,
666                codec_type: quote_type::<Codec>().into(),
667                port_hint,
668                instantiate_fn: DebugInstantiate::Building,
669                deserialize_fn: None,
670                metadata: self.new_node_metadata(Stream::<
671                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
672                    Self,
673                    Unbounded,
674                    TotalOrder,
675                    ExactlyOnce,
676                >::collection_kind()),
677            },
678        );
679
680        (
681            ExternalBytesPort {
682                process_key: from.key,
683                port_id: next_external_port_id,
684                _phantom: PhantomData,
685            },
686            raw_stream.flatten_ordered(),
687            fwd_ref,
688        )
689    }
690
691    /// Establishes a bidirectional connection from a single external client using bincode serialization.
692    ///
693    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
694    /// and a handle to send outgoing messages. This is a convenience wrapper around
695    /// [`Location::bind_single_client`] that uses bincode for serialization.
696    ///
697    /// # Type Parameters
698    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
699    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
700    #[expect(clippy::type_complexity, reason = "stream markers")]
701    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
702        &self,
703        from: &External<L>,
704    ) -> (
705        ExternalBincodeBidi<InT, OutT, NotMany>,
706        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
707        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
708    )
709    where
710        Self: Sized + NoTick,
711    {
712        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
713
714        let (fwd_ref, to_sink) =
715            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
716        let mut flow_state_borrow = self.flow_state().borrow_mut();
717
718        let root = get_this_crate();
719
720        let out_t_type = quote_type::<OutT>();
721        let ser_fn: syn::Expr = syn::parse_quote! {
722            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
723                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
724            )
725        };
726
727        flow_state_borrow.push_root(HydroRoot::SendExternal {
728            to_external_key: from.key,
729            to_port_id: next_external_port_id,
730            to_many: false,
731            unpaired: false,
732            serialize_fn: Some(ser_fn.into()),
733            instantiate_fn: DebugInstantiate::Building,
734            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
735            op_metadata: HydroIrOpMetadata::new(),
736        });
737
738        let in_t_type = quote_type::<InT>();
739
740        let deser_fn: syn::Expr = syn::parse_quote! {
741            |res| {
742                let b = res.unwrap();
743                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
744            }
745        };
746
747        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
748            self.clone(),
749            HydroNode::ExternalInput {
750                from_external_key: from.key,
751                from_port_id: next_external_port_id,
752                from_many: false,
753                codec_type: quote_type::<LengthDelimitedCodec>().into(),
754                port_hint: NetworkHint::Auto,
755                instantiate_fn: DebugInstantiate::Building,
756                deserialize_fn: Some(deser_fn.into()),
757                metadata: self.new_node_metadata(Stream::<
758                    InT,
759                    Self,
760                    Unbounded,
761                    TotalOrder,
762                    ExactlyOnce,
763                >::collection_kind()),
764            },
765        );
766
767        (
768            ExternalBincodeBidi {
769                process_key: from.key,
770                port_id: next_external_port_id,
771                _phantom: PhantomData,
772            },
773            raw_stream,
774            fwd_ref,
775        )
776    }
777
778    /// Establishes a server on this location to receive bidirectional connections from multiple
779    /// external clients using raw bytes.
780    ///
781    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
782    /// connections. Each client is assigned a unique `u64` identifier.
783    ///
784    /// Returns:
785    /// - A port handle for external processes to connect to
786    /// - A keyed stream of incoming messages, keyed by client ID
787    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
788    /// - A handle to send outgoing messages, keyed by client ID
789    #[expect(clippy::type_complexity, reason = "stream markers")]
790    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
791        &self,
792        from: &External<L>,
793        port_hint: NetworkHint,
794    ) -> (
795        ExternalBytesPort<Many>,
796        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
797        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
798        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
799    )
800    where
801        Self: Sized + NoTick,
802    {
803        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
804
805        let (fwd_ref, to_sink) =
806            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
807        let mut flow_state_borrow = self.flow_state().borrow_mut();
808
809        flow_state_borrow.push_root(HydroRoot::SendExternal {
810            to_external_key: from.key,
811            to_port_id: next_external_port_id,
812            to_many: true,
813            unpaired: false,
814            serialize_fn: None,
815            instantiate_fn: DebugInstantiate::Building,
816            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
817            op_metadata: HydroIrOpMetadata::new(),
818        });
819
820        let raw_stream: Stream<
821            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
822            Self,
823            Unbounded,
824            TotalOrder,
825            ExactlyOnce,
826        > = Stream::new(
827            self.clone(),
828            HydroNode::ExternalInput {
829                from_external_key: from.key,
830                from_port_id: next_external_port_id,
831                from_many: true,
832                codec_type: quote_type::<Codec>().into(),
833                port_hint,
834                instantiate_fn: DebugInstantiate::Building,
835                deserialize_fn: None,
836                metadata: self.new_node_metadata(Stream::<
837                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
838                    Self,
839                    Unbounded,
840                    TotalOrder,
841                    ExactlyOnce,
842                >::collection_kind()),
843            },
844        );
845
846        let membership_stream_ident = syn::Ident::new(
847            &format!(
848                "__hydro_deploy_many_{}_{}_membership",
849                from.key, next_external_port_id
850            ),
851            Span::call_site(),
852        );
853        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
854        let raw_membership_stream: KeyedStream<
855            u64,
856            bool,
857            Self,
858            Unbounded,
859            TotalOrder,
860            ExactlyOnce,
861        > = KeyedStream::new(
862            self.clone(),
863            HydroNode::Source {
864                source: HydroSource::Stream(membership_stream_expr.into()),
865                metadata: self.new_node_metadata(KeyedStream::<
866                    u64,
867                    bool,
868                    Self,
869                    Unbounded,
870                    TotalOrder,
871                    ExactlyOnce,
872                >::collection_kind()),
873            },
874        );
875
876        (
877            ExternalBytesPort {
878                process_key: from.key,
879                port_id: next_external_port_id,
880                _phantom: PhantomData,
881            },
882            raw_stream
883                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
884                .into_keyed(),
885            raw_membership_stream.map(q!(|join| {
886                if join {
887                    MembershipEvent::Joined
888                } else {
889                    MembershipEvent::Left
890                }
891            })),
892            fwd_ref,
893        )
894    }
895
896    /// Establishes a server on this location to receive bidirectional connections from multiple
897    /// external clients using bincode serialization.
898    ///
899    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
900    /// client connections. Each client is assigned a unique `u64` identifier.
901    ///
902    /// Returns:
903    /// - A port handle for external processes to connect to
904    /// - A keyed stream of incoming messages, keyed by client ID
905    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
906    /// - A handle to send outgoing messages, keyed by client ID
907    ///
908    /// # Type Parameters
909    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
910    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
911    #[expect(clippy::type_complexity, reason = "stream markers")]
912    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
913        &self,
914        from: &External<L>,
915    ) -> (
916        ExternalBincodeBidi<InT, OutT, Many>,
917        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
918        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
919        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
920    )
921    where
922        Self: Sized + NoTick,
923    {
924        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
925
926        let (fwd_ref, to_sink) =
927            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
928        let mut flow_state_borrow = self.flow_state().borrow_mut();
929
930        let root = get_this_crate();
931
932        let out_t_type = quote_type::<OutT>();
933        let ser_fn: syn::Expr = syn::parse_quote! {
934            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
935                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
936            )
937        };
938
939        flow_state_borrow.push_root(HydroRoot::SendExternal {
940            to_external_key: from.key,
941            to_port_id: next_external_port_id,
942            to_many: true,
943            unpaired: false,
944            serialize_fn: Some(ser_fn.into()),
945            instantiate_fn: DebugInstantiate::Building,
946            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
947            op_metadata: HydroIrOpMetadata::new(),
948        });
949
950        let in_t_type = quote_type::<InT>();
951
952        let deser_fn: syn::Expr = syn::parse_quote! {
953            |res| {
954                let (id, b) = res.unwrap();
955                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
956            }
957        };
958
959        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
960            KeyedStream::new(
961                self.clone(),
962                HydroNode::ExternalInput {
963                    from_external_key: from.key,
964                    from_port_id: next_external_port_id,
965                    from_many: true,
966                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
967                    port_hint: NetworkHint::Auto,
968                    instantiate_fn: DebugInstantiate::Building,
969                    deserialize_fn: Some(deser_fn.into()),
970                    metadata: self.new_node_metadata(KeyedStream::<
971                        u64,
972                        InT,
973                        Self,
974                        Unbounded,
975                        TotalOrder,
976                        ExactlyOnce,
977                    >::collection_kind()),
978                },
979            );
980
981        let membership_stream_ident = syn::Ident::new(
982            &format!(
983                "__hydro_deploy_many_{}_{}_membership",
984                from.key, next_external_port_id
985            ),
986            Span::call_site(),
987        );
988        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
989        let raw_membership_stream: KeyedStream<
990            u64,
991            bool,
992            Self,
993            Unbounded,
994            TotalOrder,
995            ExactlyOnce,
996        > = KeyedStream::new(
997            self.clone(),
998            HydroNode::Source {
999                source: HydroSource::Stream(membership_stream_expr.into()),
1000                metadata: self.new_node_metadata(KeyedStream::<
1001                    u64,
1002                    bool,
1003                    Self,
1004                    Unbounded,
1005                    TotalOrder,
1006                    ExactlyOnce,
1007                >::collection_kind()),
1008            },
1009        );
1010
1011        (
1012            ExternalBincodeBidi {
1013                process_key: from.key,
1014                port_id: next_external_port_id,
1015                _phantom: PhantomData,
1016            },
1017            raw_stream,
1018            raw_membership_stream.map(q!(|join| {
1019                if join {
1020                    MembershipEvent::Joined
1021                } else {
1022                    MembershipEvent::Left
1023                }
1024            })),
1025            fwd_ref,
1026        )
1027    }
1028
1029    /// Bridges user-owned async code to the dataflow as a **bidirectional sidecar**.
1030    ///
1031    /// The closure is called once at startup and must return a
1032    /// `(Stream<InT>, Sink<OutT>)` pair. The framework reads from the stream
1033    /// (items flowing *into* the dataflow) and writes to the sink (items flowing
1034    /// *out* to the sidecar). The user controls buffering, backpressure, and
1035    /// internal lifecycle — Hydro only sees the stream/sink interface.
1036    ///
1037    /// This will hopefully make it easy to integrate hydro with existing frameworks,
1038    /// for example grpc code generated service endpoints.
1039    ///
1040    /// # Returns
1041    /// - A `Stream<InT>` carrying items from the sidecar into the dataflow.
1042    /// - A [`ForwardHandle`] expecting a `Stream<OutT>` that the user completes
1043    ///   with items destined for the sidecar.
1044    ///
1045    /// # Example
1046    ///
1047    /// ```rust
1048    /// # #[cfg(feature = "deploy")] {
1049    /// # use hydro_lang::prelude::*;
1050    /// # use futures::StreamExt;
1051    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1052    /// // Sidecar that echoes whatever it receives back into the dataflow.
1053    /// let (inbound, response_handle) = process.sidecar_bidi::<String, String, _>(q!(|| {
1054    ///     let (to_df_tx, to_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1055    ///     let (from_df_tx, mut from_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1056    ///
1057    ///     // Spawn the sidecar: echoes items from the dataflow back into it.
1058    ///     tokio::spawn(async move {
1059    ///         while let Some(msg) = from_df_rx.recv().await {
1060    ///             to_df_tx.send(msg).await.ok();
1061    ///         }
1062    ///     });
1063    ///
1064    ///     // Return the framework-facing ends (concrete types, no boxing needed).
1065    ///     let stream = tokio_stream::wrappers::ReceiverStream::new(to_df_rx);
1066    ///     let sink = tokio_util::sync::PollSender::new(from_df_tx);
1067    ///     (stream, sink)
1068    /// }));
1069    ///
1070    /// // Send "hello" into the sidecar via the response channel.
1071    /// let input = process.source_stream(q!(futures::stream::iter(vec!["hello".to_string()])));
1072    /// response_handle.complete(input);
1073    ///
1074    /// // The sidecar echoes it back — assert we get "hello" out.
1075    /// inbound
1076    /// # }, |mut stream| async move {
1077    /// #     assert_eq!(stream.next().await.unwrap(), "hello");
1078    /// # }));
1079    /// # }
1080    /// ```
1081    #[expect(clippy::type_complexity, reason = "stream markers")]
1082    fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1083        &self,
1084        sidecar: impl QuotedWithContext<'a, F, Self>,
1085    ) -> (
1086        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1087        ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1088    )
1089    where
1090        Self: Sized + NoTick,
1091    {
1092        let location_key = Location::id(self).key();
1093
1094        let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1095        let (stream_ident, sink_ident) = sidecar_id.idents();
1096
1097        let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1098        self.flow_state()
1099            .borrow_mut()
1100            .sidecars
1101            .push(crate::compile::builder::Sidecar::Bidi {
1102                location_key,
1103                sidecar_id,
1104                sidecar_closure: Box::new(sidecar_closure),
1105            });
1106
1107        // Inbound stream: reads from the stream returned by the sidecar closure
1108        let source_expr: syn::Expr = parse_quote! {
1109            #stream_ident
1110        };
1111        let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1112            self.clone(),
1113            HydroNode::Source {
1114                source: HydroSource::Stream(source_expr.into()),
1115                metadata: self.new_node_metadata(Stream::<
1116                    InT,
1117                    Self,
1118                    Unbounded,  // TODO: maybe bounded sidecars are interesting..?
1119                    TotalOrder, // TODO: NoOrder..?
1120                    ExactlyOnce,
1121                >::collection_kind()),
1122            },
1123        );
1124
1125        // Outbound: forward_ref cycle feeding the sink returned by the sidecar closure
1126        let (fwd_ref, to_sink): (
1127            ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1128            Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1129        ) = self.forward_ref();
1130
1131        let sink_expr: syn::Expr = parse_quote! {
1132            #sink_ident
1133        };
1134
1135        let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1136        self.flow_state()
1137            .borrow_mut()
1138            .try_push_root(HydroRoot::DestSink {
1139                sink: sink_expr.into(),
1140                input: Box::new(sink_input_ir),
1141                op_metadata: HydroIrOpMetadata::new(),
1142            });
1143
1144        (inbound, fwd_ref)
1145    }
1146
1147    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1148    ///
1149    /// See also: [`Tick::singleton`], for creating a singleton _within_ a tick, which requires
1150    /// `T: Clone`.
1151    ///
1152    /// # Example
1153    /// ```rust
1154    /// # #[cfg(feature = "deploy")] {
1155    /// # use hydro_lang::prelude::*;
1156    /// # use futures::StreamExt;
1157    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1158    /// let singleton = process.singleton(q!(5));
1159    /// # singleton.into_stream()
1160    /// # }, |mut stream| async move {
1161    /// // 5
1162    /// # assert_eq!(stream.next().await.unwrap(), 5);
1163    /// # }));
1164    /// # }
1165    /// ```
1166    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1167    where
1168        Self: Sized + NoTick,
1169    {
1170        let e = e.splice_untyped_ctx(self);
1171
1172        Singleton::new(
1173            self.clone(),
1174            HydroNode::SingletonSource {
1175                value: e.into(),
1176                first_tick_only: false,
1177                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1178            },
1179        )
1180    }
1181
1182    /// Constructs a [`Singleton`] by resolving an async [`Future`] to completion.
1183    ///
1184    /// This is a convenience method equivalent to
1185    /// `self.singleton(future_expr).resolve_future_blocking()`, which is a common
1186    /// pattern when initializing a singleton from an async computation.
1187    ///
1188    /// # Example
1189    /// ```rust
1190    /// # #[cfg(feature = "deploy")] {
1191    /// # use hydro_lang::prelude::*;
1192    /// # use futures::StreamExt;
1193    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1194    /// let singleton = process.singleton_future(q!(async { 42 }));
1195    /// singleton.into_stream()
1196    /// # }, |mut stream| async move {
1197    /// // 42
1198    /// # assert_eq!(stream.next().await.unwrap(), 42);
1199    /// # }));
1200    /// # }
1201    /// ```
1202    ///
1203    /// [`Future`]: std::future::Future
1204    fn singleton_future<F>(
1205        &self,
1206        e: impl QuotedWithContext<'a, F, Self>,
1207    ) -> Singleton<F::Output, Self, Bounded>
1208    where
1209        F: Future,
1210        Self: Sized + NoTick,
1211    {
1212        self.singleton(e).resolve_future_blocking()
1213    }
1214
1215    /// Generates a stream that emits `()` at a fixed interval.
1216    ///
1217    /// The first tick completes immediately. Missed ticks will be scheduled
1218    /// as soon as possible.
1219    ///
1220    /// Because this only emits `()`, the non-determinism of *when* events fire
1221    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1222    /// [`NonDet`] guard is required.
1223    fn source_interval(
1224        &self,
1225        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1226    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1227    where
1228        Self: Sized + NoTick,
1229    {
1230        self.source_stream(q!(tokio_stream::StreamExt::map(
1231            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1232            |_| ()
1233        )))
1234    }
1235
1236    /// Generates a stream that emits `()` at a fixed interval, after an
1237    /// initial delay.
1238    ///
1239    /// Because this only emits `()`, the non-determinism of *when* events fire
1240    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1241    /// [`NonDet`] guard is required.
1242    fn source_interval_delayed(
1243        &self,
1244        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1245        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1246    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1247    where
1248        Self: Sized + NoTick,
1249    {
1250        self.source_stream(q!(tokio_stream::StreamExt::map(
1251            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1252                tokio::time::Instant::now() + delay,
1253                interval,
1254            )),
1255            |_| ()
1256        )))
1257    }
1258
1259    /// Returns the current wall-clock time as a [`Singleton`] containing a
1260    /// [`tokio::time::Instant`].
1261    ///
1262    /// # Non-Determinism
1263    /// Reading wall-clock time is inherently non-deterministic because the
1264    /// value depends on when the tick executes. A [`NonDet`] guard is required
1265    /// to acknowledge this.
1266    fn current_tick_instant(
1267        &self,
1268        _nondet: NonDet,
1269    ) -> Singleton<tokio::time::Instant, Tick<Self>, Bounded>
1270    where
1271        Self: Sized + NoTick,
1272    {
1273        let tick = self.tick();
1274        tick.singleton(q!(tokio::time::Instant::now()))
1275    }
1276
1277    /// Creates a forward reference, allowing a stream to be used before its source is defined.
1278    ///
1279    /// Returns a `(handle, placeholder)` pair. Use the placeholder in the dataflow graph,
1280    /// then call `handle.complete(actual_stream)` to wire in the real source.
1281    ///
1282    /// This is useful for mutually-dependent dataflows or when the definition order
1283    /// doesn't match the data flow direction. For feedback loops, prefer [`Tick::cycle`]
1284    /// instead, which automatically defers values by one tick.
1285    ///
1286    /// # Panics
1287    /// Panics if the forward reference creates a synchronous cycle (i.e., the completed
1288    /// stream transitively depends on the placeholder without a `defer_tick` or network
1289    /// hop in between).
1290    ///
1291    /// # Example
1292    /// ```rust
1293    /// # #[cfg(feature = "deploy")] {
1294    /// # use hydro_lang::prelude::*;
1295    /// # use hydro_lang::live_collections::stream::NoOrder;
1296    /// # use futures::StreamExt;
1297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1298    /// // Create a forward reference to define a stream that will be completed later
1299    /// let (complete, forward_stream) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1300    ///
1301    /// // Use the forward reference as input to another computation
1302    /// let output: Stream<_, _, _, NoOrder> = forward_stream.map(q!(|x| x * 2));
1303    ///
1304    /// // Complete the forward reference with the actual source
1305    /// let source: Stream<_, _, Unbounded> = process.source_iter(q!([1, 2, 3])).into();
1306    /// complete.complete(source);
1307    /// output
1308    /// # }, |mut stream| async move {
1309    /// // 2, 4, 6
1310    /// # assert_eq!(stream.next().await.unwrap(), 2);
1311    /// # assert_eq!(stream.next().await.unwrap(), 4);
1312    /// # assert_eq!(stream.next().await.unwrap(), 6);
1313    /// # }));
1314    /// # }
1315    /// ```
1316    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1317    where
1318        S: CycleCollection<'a, ForwardRef, Location = Self>,
1319    {
1320        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1321        (
1322            ForwardHandle::new(cycle_id, Location::id(self)),
1323            S::create_source(cycle_id, self.clone()),
1324        )
1325    }
1326}
1327
1328#[cfg(feature = "deploy")]
1329#[cfg(test)]
1330mod tests {
1331    use std::collections::HashSet;
1332
1333    use futures::{SinkExt, StreamExt};
1334    use hydro_deploy::Deployment;
1335    use stageleft::q;
1336    use tokio_util::codec::LengthDelimitedCodec;
1337
1338    use crate::compile::builder::FlowBuilder;
1339    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1340    use crate::location::{Location, NetworkHint};
1341    use crate::nondet::nondet;
1342
1343    #[tokio::test]
1344    async fn top_level_singleton_replay_cardinality() {
1345        let mut deployment = Deployment::new();
1346
1347        let mut flow = FlowBuilder::new();
1348        let node = flow.process::<()>();
1349        let external = flow.external::<()>();
1350
1351        let (in_port, input) =
1352            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1353        let singleton = node.singleton(q!(123));
1354        let tick = node.tick();
1355        let out = input
1356            .batch(&tick, nondet!(/** test */))
1357            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1358            .cross_singleton(
1359                singleton
1360                    .snapshot(&tick, nondet!(/** test */))
1361                    .into_stream()
1362                    .count(),
1363            )
1364            .all_ticks()
1365            .send_bincode_external(&external);
1366
1367        let nodes = flow
1368            .with_process(&node, deployment.Localhost())
1369            .with_external(&external, deployment.Localhost())
1370            .deploy(&mut deployment);
1371
1372        deployment.deploy().await.unwrap();
1373
1374        let mut external_in = nodes.connect(in_port).await;
1375        let mut external_out = nodes.connect(out).await;
1376
1377        deployment.start().await.unwrap();
1378
1379        external_in.send(1).await.unwrap();
1380        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1381
1382        external_in.send(2).await.unwrap();
1383        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1384    }
1385
1386    #[tokio::test]
1387    async fn tick_singleton_replay_cardinality() {
1388        let mut deployment = Deployment::new();
1389
1390        let mut flow = FlowBuilder::new();
1391        let node = flow.process::<()>();
1392        let external = flow.external::<()>();
1393
1394        let (in_port, input) =
1395            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1396        let tick = node.tick();
1397        let singleton = tick.singleton(q!(123));
1398        let out = input
1399            .batch(&tick, nondet!(/** test */))
1400            .cross_singleton(singleton.clone())
1401            .cross_singleton(singleton.into_stream().count())
1402            .all_ticks()
1403            .send_bincode_external(&external);
1404
1405        let nodes = flow
1406            .with_process(&node, deployment.Localhost())
1407            .with_external(&external, deployment.Localhost())
1408            .deploy(&mut deployment);
1409
1410        deployment.deploy().await.unwrap();
1411
1412        let mut external_in = nodes.connect(in_port).await;
1413        let mut external_out = nodes.connect(out).await;
1414
1415        deployment.start().await.unwrap();
1416
1417        external_in.send(1).await.unwrap();
1418        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1419
1420        external_in.send(2).await.unwrap();
1421        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1422    }
1423
1424    #[tokio::test]
1425    async fn external_bytes() {
1426        let mut deployment = Deployment::new();
1427
1428        let mut flow = FlowBuilder::new();
1429        let first_node = flow.process::<()>();
1430        let external = flow.external::<()>();
1431
1432        let (in_port, input) = first_node.source_external_bytes(&external);
1433        let out = input.send_bincode_external(&external);
1434
1435        let nodes = flow
1436            .with_process(&first_node, deployment.Localhost())
1437            .with_external(&external, deployment.Localhost())
1438            .deploy(&mut deployment);
1439
1440        deployment.deploy().await.unwrap();
1441
1442        let mut external_in = nodes.connect(in_port).await.1;
1443        let mut external_out = nodes.connect(out).await;
1444
1445        deployment.start().await.unwrap();
1446
1447        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1448
1449        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1450    }
1451
1452    #[tokio::test]
1453    async fn multi_external_source() {
1454        let mut deployment = Deployment::new();
1455
1456        let mut flow = FlowBuilder::new();
1457        let first_node = flow.process::<()>();
1458        let external = flow.external::<()>();
1459
1460        let (in_port, input, _membership, complete_sink) =
1461            first_node.bidi_external_many_bincode(&external);
1462        let out = input.entries().send_bincode_external(&external);
1463        complete_sink.complete(
1464            first_node
1465                .source_iter::<(u64, ()), _>(q!([]))
1466                .into_keyed()
1467                .weaken_ordering(),
1468        );
1469
1470        let nodes = flow
1471            .with_process(&first_node, deployment.Localhost())
1472            .with_external(&external, deployment.Localhost())
1473            .deploy(&mut deployment);
1474
1475        deployment.deploy().await.unwrap();
1476
1477        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1478        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1479        let external_out = nodes.connect(out).await;
1480
1481        deployment.start().await.unwrap();
1482
1483        external_in_1.send(123).await.unwrap();
1484        external_in_2.send(456).await.unwrap();
1485
1486        assert_eq!(
1487            external_out.take(2).collect::<HashSet<_>>().await,
1488            vec![(0, 123), (1, 456)].into_iter().collect()
1489        );
1490    }
1491
1492    #[tokio::test]
1493    async fn second_connection_only_multi_source() {
1494        let mut deployment = Deployment::new();
1495
1496        let mut flow = FlowBuilder::new();
1497        let first_node = flow.process::<()>();
1498        let external = flow.external::<()>();
1499
1500        let (in_port, input, _membership, complete_sink) =
1501            first_node.bidi_external_many_bincode(&external);
1502        let out = input.entries().send_bincode_external(&external);
1503        complete_sink.complete(
1504            first_node
1505                .source_iter::<(u64, ()), _>(q!([]))
1506                .into_keyed()
1507                .weaken_ordering(),
1508        );
1509
1510        let nodes = flow
1511            .with_process(&first_node, deployment.Localhost())
1512            .with_external(&external, deployment.Localhost())
1513            .deploy(&mut deployment);
1514
1515        deployment.deploy().await.unwrap();
1516
1517        // intentionally skipped to test stream waking logic
1518        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1519        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1520        let mut external_out = nodes.connect(out).await;
1521
1522        deployment.start().await.unwrap();
1523
1524        external_in_2.send(456).await.unwrap();
1525
1526        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1527    }
1528
1529    #[tokio::test]
1530    async fn multi_external_bytes() {
1531        let mut deployment = Deployment::new();
1532
1533        let mut flow = FlowBuilder::new();
1534        let first_node = flow.process::<()>();
1535        let external = flow.external::<()>();
1536
1537        let (in_port, input, _membership, complete_sink) = first_node
1538            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1539        let out = input.entries().send_bincode_external(&external);
1540        complete_sink.complete(
1541            first_node
1542                .source_iter(q!([]))
1543                .into_keyed()
1544                .weaken_ordering(),
1545        );
1546
1547        let nodes = flow
1548            .with_process(&first_node, deployment.Localhost())
1549            .with_external(&external, deployment.Localhost())
1550            .deploy(&mut deployment);
1551
1552        deployment.deploy().await.unwrap();
1553
1554        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1555        let mut external_in_2 = nodes.connect(in_port).await.1;
1556        let external_out = nodes.connect(out).await;
1557
1558        deployment.start().await.unwrap();
1559
1560        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1561        external_in_2.send(vec![4, 5].into()).await.unwrap();
1562
1563        assert_eq!(
1564            external_out.take(2).collect::<HashSet<_>>().await,
1565            vec![
1566                (0, (&[1u8, 2, 3] as &[u8]).into()),
1567                (1, (&[4u8, 5] as &[u8]).into())
1568            ]
1569            .into_iter()
1570            .collect()
1571        );
1572    }
1573
1574    #[tokio::test]
1575    async fn single_client_external_bytes() {
1576        let mut deployment = Deployment::new();
1577        let mut flow = FlowBuilder::new();
1578        let first_node = flow.process::<()>();
1579        let external = flow.external::<()>();
1580        let (port, input, complete_sink) = first_node
1581            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1582        complete_sink.complete(input.map(q!(|data| {
1583            let mut resp: Vec<u8> = data.into();
1584            resp.push(42);
1585            resp.into() // : Bytes
1586        })));
1587
1588        let nodes = flow
1589            .with_process(&first_node, deployment.Localhost())
1590            .with_external(&external, deployment.Localhost())
1591            .deploy(&mut deployment);
1592
1593        deployment.deploy().await.unwrap();
1594        deployment.start().await.unwrap();
1595
1596        let (mut external_out, mut external_in) = nodes.connect(port).await;
1597
1598        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1599        assert_eq!(
1600            external_out.next().await.unwrap().unwrap(),
1601            vec![1, 2, 3, 42]
1602        );
1603    }
1604
1605    #[tokio::test]
1606    async fn echo_external_bytes() {
1607        let mut deployment = Deployment::new();
1608
1609        let mut flow = FlowBuilder::new();
1610        let first_node = flow.process::<()>();
1611        let external = flow.external::<()>();
1612
1613        let (port, input, _membership, complete_sink) = first_node
1614            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1615        complete_sink
1616            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1617
1618        let nodes = flow
1619            .with_process(&first_node, deployment.Localhost())
1620            .with_external(&external, deployment.Localhost())
1621            .deploy(&mut deployment);
1622
1623        deployment.deploy().await.unwrap();
1624
1625        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1626        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1627
1628        deployment.start().await.unwrap();
1629
1630        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1631        external_in_2.send(vec![4, 5].into()).await.unwrap();
1632
1633        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1634        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1635    }
1636
1637    #[tokio::test]
1638    async fn echo_external_bincode() {
1639        let mut deployment = Deployment::new();
1640
1641        let mut flow = FlowBuilder::new();
1642        let first_node = flow.process::<()>();
1643        let external = flow.external::<()>();
1644
1645        let (port, input, _membership, complete_sink) =
1646            first_node.bidi_external_many_bincode(&external);
1647        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1648
1649        let nodes = flow
1650            .with_process(&first_node, deployment.Localhost())
1651            .with_external(&external, deployment.Localhost())
1652            .deploy(&mut deployment);
1653
1654        deployment.deploy().await.unwrap();
1655
1656        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1657        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1658
1659        deployment.start().await.unwrap();
1660
1661        external_in_1.send("hi".to_owned()).await.unwrap();
1662        external_in_2.send("hello".to_owned()).await.unwrap();
1663
1664        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1665        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1666    }
1667
1668    #[tokio::test]
1669    async fn closure_location_name() {
1670        let mut deployment = Deployment::new();
1671        let mut flow = FlowBuilder::new();
1672
1673        enum ClosureProcess {}
1674
1675        let node = flow.process::<ClosureProcess>();
1676        let external = flow.external::<()>();
1677
1678        let (in_port, input) =
1679            node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1680        let out = input.send_bincode_external(&external);
1681
1682        let nodes = flow
1683            .with_process(&node, deployment.Localhost())
1684            .with_external(&external, deployment.Localhost())
1685            .deploy(&mut deployment);
1686
1687        deployment.deploy().await.unwrap();
1688
1689        let mut external_in = nodes.connect(in_port).await;
1690        let mut external_out = nodes.connect(out).await;
1691
1692        deployment.start().await.unwrap();
1693
1694        external_in.send(42).await.unwrap();
1695        assert_eq!(external_out.next().await.unwrap(), 42);
1696    }
1697}