Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45    /// The [`StreamOrder`] corresponding to this type.
46    const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82    /// The weaker of the two orderings.
83    type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88    type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93    type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99    MinRetries<Self, Min = Self>
100    + MinRetries<ExactlyOnce, Min = Self>
101    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103    /// The [`StreamRetry`] corresponding to this type.
104    const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136    /// The weaker of the two retry guarantees.
137    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142    type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147    type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153    label = "required here",
154    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166    label = "required here",
167    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192///   (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196    Type,
197    Loc,
198    Bound: Boundedness = Unbounded,
199    Order: Ordering = TotalOrder,
200    Retry: Retries = ExactlyOnce,
201> {
202    pub(crate) location: Loc,
203    pub(crate) ir_node: RefCell<HydroNode>,
204    pub(crate) flow_state: FlowState,
205
206    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210    fn drop(&mut self) {
211        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214                input: Box::new(ir_node),
215                op_metadata: HydroIrOpMetadata::new(),
216            });
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222    for Stream<T, L, Unbounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227        let new_meta = stream
228            .location
229            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231        Stream {
232            location: stream.location.clone(),
233            flow_state: stream.flow_state.clone(),
234            ir_node: RefCell::new(HydroNode::Cast {
235                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236                metadata: new_meta,
237            }),
238            _phantom: PhantomData,
239        }
240    }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244    for Stream<T, L, B, NoOrder, R>
245where
246    L: Location<'a>,
247{
248    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249        stream.weaken_ordering()
250    }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254    for Stream<T, L, B, O, AtLeastOnce>
255where
256    L: Location<'a>,
257{
258    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259        stream.weaken_retries()
260    }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265    L: Location<'a>,
266{
267    fn defer_tick(self) -> Self {
268        Stream::defer_tick(self)
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280        Stream::new(
281            location.clone(),
282            HydroNode::CycleSource {
283                cycle_id,
284                metadata: location.new_node_metadata(Self::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291    for Stream<T, Tick<L>, Bounded, O, R>
292where
293    L: Location<'a>,
294{
295    type Location = Tick<L>;
296
297    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299            location.clone(),
300            HydroNode::DeferTick {
301                input: Box::new(HydroNode::CycleSource {
302                    cycle_id,
303                    metadata: location.new_node_metadata(Self::collection_kind()),
304                }),
305                metadata: location.new_node_metadata(Self::collection_kind()),
306            },
307        );
308
309        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310    }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314    for Stream<T, Tick<L>, Bounded, O, R>
315where
316    L: Location<'a>,
317{
318    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319        assert_eq!(
320            Location::id(&self.location),
321            expected_location,
322            "locations do not match"
323        );
324        self.location
325            .flow_state()
326            .borrow_mut()
327            .push_root(HydroRoot::CycleSink {
328                cycle_id,
329                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330                op_metadata: HydroIrOpMetadata::new(),
331            });
332    }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336    for Stream<T, L, B, O, R>
337where
338    L: Location<'a> + NoTick,
339{
340    type Location = L;
341
342    fn create_source(cycle_id: CycleId, location: L) -> Self {
343        Stream::new(
344            location.clone(),
345            HydroNode::CycleSource {
346                cycle_id,
347                metadata: location.new_node_metadata(Self::collection_kind()),
348            },
349        )
350    }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354    for Stream<T, L, B, O, R>
355where
356    L: Location<'a> + NoTick,
357{
358    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359        assert_eq!(
360            Location::id(&self.location),
361            expected_location,
362            "locations do not match"
363        );
364        self.location
365            .flow_state()
366            .borrow_mut()
367            .push_root(HydroRoot::CycleSink {
368                cycle_id,
369                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370                op_metadata: HydroIrOpMetadata::new(),
371            });
372    }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377    T: Clone,
378    L: Location<'a>,
379{
380    fn clone(&self) -> Self {
381        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383            *self.ir_node.borrow_mut() = HydroNode::Tee {
384                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385                metadata: self.location.new_node_metadata(Self::collection_kind()),
386            };
387        }
388
389        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
390            unreachable!()
391        };
392        Stream {
393            location: self.location.clone(),
394            flow_state: self.flow_state.clone(),
395            ir_node: HydroNode::Tee {
396                inner: SharedNode(inner.0.clone()),
397                metadata: metadata.clone(),
398            }
399            .into(),
400            _phantom: PhantomData,
401        }
402    }
403}
404
405impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
406where
407    L: Location<'a>,
408{
409    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
410        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
411        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
412
413        let flow_state = location.flow_state().clone();
414        Stream {
415            location,
416            flow_state,
417            ir_node: RefCell::new(ir_node),
418            _phantom: PhantomData,
419        }
420    }
421
422    /// Returns the [`Location`] where this stream is being materialized.
423    pub fn location(&self) -> &L {
424        &self.location
425    }
426
427    pub(crate) fn collection_kind() -> CollectionKind {
428        CollectionKind::Stream {
429            bound: B::BOUND_KIND,
430            order: O::ORDERING_KIND,
431            retry: R::RETRIES_KIND,
432            element_type: quote_type::<T>().into(),
433        }
434    }
435
436    /// Produces a stream based on invoking `f` on each element.
437    /// If you do not want to modify the stream and instead only want to view
438    /// each item use [`Stream::inspect`] instead.
439    ///
440    /// # Example
441    /// ```rust
442    /// # #[cfg(feature = "deploy")] {
443    /// # use hydro_lang::prelude::*;
444    /// # use futures::StreamExt;
445    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
446    /// let words = process.source_iter(q!(vec!["hello", "world"]));
447    /// words.map(q!(|x| x.to_uppercase()))
448    /// # }, |mut stream| async move {
449    /// # for w in vec!["HELLO", "WORLD"] {
450    /// #     assert_eq!(stream.next().await.unwrap(), w);
451    /// # }
452    /// # }));
453    /// # }
454    /// ```
455    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
456    where
457        F: Fn(T) -> U + 'a,
458    {
459        let (f, singleton_refs) = crate::singleton_ref::with_singleton_capture(|| {
460            f.splice_fn1_ctx(&self.location).into()
461        });
462        Stream::new(
463            self.location.clone(),
464            HydroNode::Map {
465                f,
466                singleton_refs,
467                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
468                metadata: self
469                    .location
470                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
471            },
472        )
473    }
474
475    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
476    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
477    /// for the output type `U` must produce items in a **deterministic** order.
478    ///
479    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
480    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
481    ///
482    /// # Example
483    /// ```rust
484    /// # #[cfg(feature = "deploy")] {
485    /// # use hydro_lang::prelude::*;
486    /// # use futures::StreamExt;
487    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
488    /// process
489    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
490    ///     .flat_map_ordered(q!(|x| x))
491    /// # }, |mut stream| async move {
492    /// // 1, 2, 3, 4
493    /// # for w in (1..5) {
494    /// #     assert_eq!(stream.next().await.unwrap(), w);
495    /// # }
496    /// # }));
497    /// # }
498    /// ```
499    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
500    where
501        I: IntoIterator<Item = U>,
502        F: Fn(T) -> I + 'a,
503    {
504        let f = f.splice_fn1_ctx(&self.location).into();
505        Stream::new(
506            self.location.clone(),
507            HydroNode::FlatMap {
508                f,
509                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
510                metadata: self
511                    .location
512                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
513            },
514        )
515    }
516
517    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
518    /// for the output type `U` to produce items in any order.
519    ///
520    /// # Example
521    /// ```rust
522    /// # #[cfg(feature = "deploy")] {
523    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
524    /// # use futures::StreamExt;
525    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
526    /// process
527    ///     .source_iter(q!(vec![
528    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
529    ///         std::collections::HashSet::from_iter(vec![3, 4]),
530    ///     ]))
531    ///     .flat_map_unordered(q!(|x| x))
532    /// # }, |mut stream| async move {
533    /// // 1, 2, 3, 4, but in no particular order
534    /// # let mut results = Vec::new();
535    /// # for w in (1..5) {
536    /// #     results.push(stream.next().await.unwrap());
537    /// # }
538    /// # results.sort();
539    /// # assert_eq!(results, vec![1, 2, 3, 4]);
540    /// # }));
541    /// # }
542    /// ```
543    pub fn flat_map_unordered<U, I, F>(
544        self,
545        f: impl IntoQuotedMut<'a, F, L>,
546    ) -> Stream<U, L, B, NoOrder, R>
547    where
548        I: IntoIterator<Item = U>,
549        F: Fn(T) -> I + 'a,
550    {
551        let f = f.splice_fn1_ctx(&self.location).into();
552        Stream::new(
553            self.location.clone(),
554            HydroNode::FlatMap {
555                f,
556                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
557                metadata: self
558                    .location
559                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
560            },
561        )
562    }
563
564    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
565    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
566    ///
567    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
568    /// not deterministic, use [`Stream::flatten_unordered`] instead.
569    ///
570    /// ```rust
571    /// # #[cfg(feature = "deploy")] {
572    /// # use hydro_lang::prelude::*;
573    /// # use futures::StreamExt;
574    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
575    /// process
576    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
577    ///     .flatten_ordered()
578    /// # }, |mut stream| async move {
579    /// // 1, 2, 3, 4
580    /// # for w in (1..5) {
581    /// #     assert_eq!(stream.next().await.unwrap(), w);
582    /// # }
583    /// # }));
584    /// # }
585    /// ```
586    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
587    where
588        T: IntoIterator<Item = U>,
589    {
590        self.flat_map_ordered(q!(|d| d))
591    }
592
593    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
594    /// for the element type `T` to produce items in any order.
595    ///
596    /// # Example
597    /// ```rust
598    /// # #[cfg(feature = "deploy")] {
599    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
600    /// # use futures::StreamExt;
601    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
602    /// process
603    ///     .source_iter(q!(vec![
604    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
605    ///         std::collections::HashSet::from_iter(vec![3, 4]),
606    ///     ]))
607    ///     .flatten_unordered()
608    /// # }, |mut stream| async move {
609    /// // 1, 2, 3, 4, but in no particular order
610    /// # let mut results = Vec::new();
611    /// # for w in (1..5) {
612    /// #     results.push(stream.next().await.unwrap());
613    /// # }
614    /// # results.sort();
615    /// # assert_eq!(results, vec![1, 2, 3, 4]);
616    /// # }));
617    /// # }
618    /// ```
619    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
620    where
621        T: IntoIterator<Item = U>,
622    {
623        self.flat_map_unordered(q!(|d| d))
624    }
625
626    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
627    /// then emit the elements of that stream one by one. When the inner stream yields
628    /// `Pending`, this operator yields as well.
629    pub fn flat_map_stream_blocking<U, S, F>(
630        self,
631        f: impl IntoQuotedMut<'a, F, L>,
632    ) -> Stream<U, L, B, O, R>
633    where
634        S: futures::Stream<Item = U>,
635        F: Fn(T) -> S + 'a,
636    {
637        let f = f.splice_fn1_ctx(&self.location).into();
638        Stream::new(
639            self.location.clone(),
640            HydroNode::FlatMapStreamBlocking {
641                f,
642                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
643                metadata: self
644                    .location
645                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
646            },
647        )
648    }
649
650    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
651    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
652    /// yields as well.
653    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
654    where
655        T: futures::Stream<Item = U>,
656    {
657        self.flat_map_stream_blocking(q!(|d| d))
658    }
659
660    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
661    /// `f`, preserving the order of the elements.
662    ///
663    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
664    /// not modify or take ownership of the values. If you need to modify the values while filtering
665    /// use [`Stream::filter_map`] instead.
666    ///
667    /// # Example
668    /// ```rust
669    /// # #[cfg(feature = "deploy")] {
670    /// # use hydro_lang::prelude::*;
671    /// # use futures::StreamExt;
672    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673    /// process
674    ///     .source_iter(q!(vec![1, 2, 3, 4]))
675    ///     .filter(q!(|&x| x > 2))
676    /// # }, |mut stream| async move {
677    /// // 3, 4
678    /// # for w in (3..5) {
679    /// #     assert_eq!(stream.next().await.unwrap(), w);
680    /// # }
681    /// # }));
682    /// # }
683    /// ```
684    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
685    where
686        F: Fn(&T) -> bool + 'a,
687    {
688        let f = f.splice_fn1_borrow_ctx(&self.location).into();
689        Stream::new(
690            self.location.clone(),
691            HydroNode::Filter {
692                f,
693                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
694                metadata: self.location.new_node_metadata(Self::collection_kind()),
695            },
696        )
697    }
698
699    /// Splits the stream into two streams based on a predicate, without cloning elements.
700    ///
701    /// Elements for which `f` returns `true` are sent to the first output stream,
702    /// and elements for which `f` returns `false` are sent to the second output stream.
703    ///
704    /// Unlike using `filter` twice, this only evaluates the predicate once per element
705    /// and does not require `T: Clone`.
706    ///
707    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
708    /// the predicate is only used for routing; the element itself is moved to the
709    /// appropriate output stream.
710    ///
711    /// # Example
712    /// ```rust
713    /// # #[cfg(feature = "deploy")] {
714    /// # use hydro_lang::prelude::*;
715    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
716    /// # use futures::StreamExt;
717    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
718    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
719    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
720    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
721    /// evens.map(q!(|x| (x, true)))
722    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
723    /// # }, |mut stream| async move {
724    /// # let mut results = Vec::new();
725    /// # for _ in 0..6 {
726    /// #     results.push(stream.next().await.unwrap());
727    /// # }
728    /// # results.sort();
729    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
730    /// # }));
731    /// # }
732    /// ```
733    #[expect(
734        clippy::type_complexity,
735        reason = "return type mirrors the input stream type"
736    )]
737    pub fn partition<F>(
738        self,
739        f: impl IntoQuotedMut<'a, F, L>,
740    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
741    where
742        F: Fn(&T) -> bool + 'a,
743    {
744        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
745        let shared = SharedNode(Rc::new(RefCell::new(
746            self.ir_node.replace(HydroNode::Placeholder),
747        )));
748
749        let true_stream = Stream::new(
750            self.location.clone(),
751            HydroNode::Partition {
752                inner: SharedNode(shared.0.clone()),
753                f: f.clone(),
754                is_true: true,
755                metadata: self.location.new_node_metadata(Self::collection_kind()),
756            },
757        );
758
759        let false_stream = Stream::new(
760            self.location.clone(),
761            HydroNode::Partition {
762                inner: SharedNode(shared.0),
763                f,
764                is_true: false,
765                metadata: self.location.new_node_metadata(Self::collection_kind()),
766            },
767        );
768
769        (true_stream, false_stream)
770    }
771
772    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
773    ///
774    /// # Example
775    /// ```rust
776    /// # #[cfg(feature = "deploy")] {
777    /// # use hydro_lang::prelude::*;
778    /// # use futures::StreamExt;
779    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
780    /// process
781    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
782    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
783    /// # }, |mut stream| async move {
784    /// // 1, 2
785    /// # for w in (1..3) {
786    /// #     assert_eq!(stream.next().await.unwrap(), w);
787    /// # }
788    /// # }));
789    /// # }
790    /// ```
791    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
792    where
793        F: Fn(T) -> Option<U> + 'a,
794    {
795        let f = f.splice_fn1_ctx(&self.location).into();
796        Stream::new(
797            self.location.clone(),
798            HydroNode::FilterMap {
799                f,
800                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
801                metadata: self
802                    .location
803                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
804            },
805        )
806    }
807
808    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
809    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
810    /// If `other` is an empty [`Optional`], no values will be produced.
811    ///
812    /// # Example
813    /// ```rust
814    /// # #[cfg(feature = "deploy")] {
815    /// # use hydro_lang::prelude::*;
816    /// # use futures::StreamExt;
817    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818    /// let tick = process.tick();
819    /// let batch = process
820    ///   .source_iter(q!(vec![1, 2, 3, 4]))
821    ///   .batch(&tick, nondet!(/** test */));
822    /// let count = batch.clone().count(); // `count()` returns a singleton
823    /// batch.cross_singleton(count).all_ticks()
824    /// # }, |mut stream| async move {
825    /// // (1, 4), (2, 4), (3, 4), (4, 4)
826    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
827    /// #     assert_eq!(stream.next().await.unwrap(), w);
828    /// # }
829    /// # }));
830    /// # }
831    /// ```
832    pub fn cross_singleton<O2>(
833        self,
834        other: impl Into<Optional<O2, L, Bounded>>,
835    ) -> Stream<(T, O2), L, B, O, R>
836    where
837        O2: Clone,
838    {
839        let other: Optional<O2, L, Bounded> = other.into();
840        check_matching_location(&self.location, &other.location);
841
842        Stream::new(
843            self.location.clone(),
844            HydroNode::CrossSingleton {
845                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
846                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
847                metadata: self
848                    .location
849                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
850            },
851        )
852    }
853
854    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
855    ///
856    /// # Example
857    /// ```rust
858    /// # #[cfg(feature = "deploy")] {
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let tick = process.tick();
863    /// // ticks are lazy by default, forces the second tick to run
864    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865    ///
866    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
867    /// let batch_first_tick = process
868    ///   .source_iter(q!(vec![1, 2, 3, 4]))
869    ///   .batch(&tick, nondet!(/** test */));
870    /// let batch_second_tick = process
871    ///   .source_iter(q!(vec![5, 6, 7, 8]))
872    ///   .batch(&tick, nondet!(/** test */))
873    ///   .defer_tick();
874    /// batch_first_tick.chain(batch_second_tick)
875    ///   .filter_if(signal)
876    ///   .all_ticks()
877    /// # }, |mut stream| async move {
878    /// // [1, 2, 3, 4]
879    /// # for w in vec![1, 2, 3, 4] {
880    /// #     assert_eq!(stream.next().await.unwrap(), w);
881    /// # }
882    /// # }));
883    /// # }
884    /// ```
885    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
886        self.cross_singleton(signal.filter(q!(|b| *b)))
887            .map(q!(|(d, _)| d))
888    }
889
890    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
891    ///
892    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
893    /// leader of a cluster.
894    ///
895    /// # Example
896    /// ```rust
897    /// # #[cfg(feature = "deploy")] {
898    /// # use hydro_lang::prelude::*;
899    /// # use futures::StreamExt;
900    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901    /// let tick = process.tick();
902    /// // ticks are lazy by default, forces the second tick to run
903    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904    ///
905    /// let batch_first_tick = process
906    ///   .source_iter(q!(vec![1, 2, 3, 4]))
907    ///   .batch(&tick, nondet!(/** test */));
908    /// let batch_second_tick = process
909    ///   .source_iter(q!(vec![5, 6, 7, 8]))
910    ///   .batch(&tick, nondet!(/** test */))
911    ///   .defer_tick(); // appears on the second tick
912    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
913    /// batch_first_tick.chain(batch_second_tick)
914    ///   .filter_if_some(some_on_first_tick)
915    ///   .all_ticks()
916    /// # }, |mut stream| async move {
917    /// // [1, 2, 3, 4]
918    /// # for w in vec![1, 2, 3, 4] {
919    /// #     assert_eq!(stream.next().await.unwrap(), w);
920    /// # }
921    /// # }));
922    /// # }
923    /// ```
924    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
925    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
926        self.filter_if(signal.is_some())
927    }
928
929    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
930    ///
931    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
932    /// some local state.
933    ///
934    /// # Example
935    /// ```rust
936    /// # #[cfg(feature = "deploy")] {
937    /// # use hydro_lang::prelude::*;
938    /// # use futures::StreamExt;
939    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
940    /// let tick = process.tick();
941    /// // ticks are lazy by default, forces the second tick to run
942    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
943    ///
944    /// let batch_first_tick = process
945    ///   .source_iter(q!(vec![1, 2, 3, 4]))
946    ///   .batch(&tick, nondet!(/** test */));
947    /// let batch_second_tick = process
948    ///   .source_iter(q!(vec![5, 6, 7, 8]))
949    ///   .batch(&tick, nondet!(/** test */))
950    ///   .defer_tick(); // appears on the second tick
951    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
952    /// batch_first_tick.chain(batch_second_tick)
953    ///   .filter_if_none(some_on_first_tick)
954    ///   .all_ticks()
955    /// # }, |mut stream| async move {
956    /// // [5, 6, 7, 8]
957    /// # for w in vec![5, 6, 7, 8] {
958    /// #     assert_eq!(stream.next().await.unwrap(), w);
959    /// # }
960    /// # }));
961    /// # }
962    /// ```
963    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
964    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
965        self.filter_if(other.is_none())
966    }
967
968    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
969    /// returning all tupled pairs.
970    ///
971    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
972    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
973    /// symmetric hash join is used and ordering is [`NoOrder`].
974    ///
975    /// # Example
976    /// ```rust
977    /// # #[cfg(feature = "deploy")] {
978    /// # use hydro_lang::prelude::*;
979    /// # use std::collections::HashSet;
980    /// # use futures::StreamExt;
981    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
982    /// let tick = process.tick();
983    /// let stream1 = process.source_iter(q!(vec![1, 2]));
984    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
985    /// stream1.cross_product(stream2)
986    /// # }, |mut stream| async move {
987    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
988    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
989    /// # stream.map(|i| assert!(expected.contains(&i)));
990    /// # }));
991    /// # }
992    pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
993        self,
994        other: Stream<T2, L, B2, O2, R>,
995    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
996    where
997        T: Clone,
998        T2: Clone,
999    {
1000        self.map(q!(|v| ((), v)))
1001            .join(other.map(q!(|v| ((), v))))
1002            .map(q!(|((), (v1, v2))| (v1, v2)))
1003    }
1004
1005    /// Takes one stream as input and filters out any duplicate occurrences. The output
1006    /// contains all unique values from the input.
1007    ///
1008    /// # Example
1009    /// ```rust
1010    /// # #[cfg(feature = "deploy")] {
1011    /// # use hydro_lang::prelude::*;
1012    /// # use futures::StreamExt;
1013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014    /// let tick = process.tick();
1015    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1016    /// # }, |mut stream| async move {
1017    /// # for w in vec![1, 2, 3, 4] {
1018    /// #     assert_eq!(stream.next().await.unwrap(), w);
1019    /// # }
1020    /// # }));
1021    /// # }
1022    /// ```
1023    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1024    where
1025        T: Eq + Hash,
1026    {
1027        Stream::new(
1028            self.location.clone(),
1029            HydroNode::Unique {
1030                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1031                metadata: self
1032                    .location
1033                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1034            },
1035        )
1036    }
1037
1038    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1039    ///
1040    /// The `other` stream must be [`Bounded`], since this function will wait until
1041    /// all its elements are available before producing any output.
1042    /// # Example
1043    /// ```rust
1044    /// # #[cfg(feature = "deploy")] {
1045    /// # use hydro_lang::prelude::*;
1046    /// # use futures::StreamExt;
1047    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1048    /// let tick = process.tick();
1049    /// let stream = process
1050    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1051    ///   .batch(&tick, nondet!(/** test */));
1052    /// let batch = process
1053    ///   .source_iter(q!(vec![1, 2]))
1054    ///   .batch(&tick, nondet!(/** test */));
1055    /// stream.filter_not_in(batch).all_ticks()
1056    /// # }, |mut stream| async move {
1057    /// # for w in vec![3, 4] {
1058    /// #     assert_eq!(stream.next().await.unwrap(), w);
1059    /// # }
1060    /// # }));
1061    /// # }
1062    /// ```
1063    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1064    where
1065        T: Eq + Hash,
1066        B2: IsBounded,
1067    {
1068        check_matching_location(&self.location, &other.location);
1069
1070        Stream::new(
1071            self.location.clone(),
1072            HydroNode::Difference {
1073                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1074                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1075                metadata: self
1076                    .location
1077                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1078            },
1079        )
1080    }
1081
1082    /// An operator which allows you to "inspect" each element of a stream without
1083    /// modifying it. The closure `f` is called on a reference to each item. This is
1084    /// mainly useful for debugging, and should not be used to generate side-effects.
1085    ///
1086    /// # Example
1087    /// ```rust
1088    /// # #[cfg(feature = "deploy")] {
1089    /// # use hydro_lang::prelude::*;
1090    /// # use futures::StreamExt;
1091    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092    /// let nums = process.source_iter(q!(vec![1, 2]));
1093    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1094    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1095    /// # }, |mut stream| async move {
1096    /// # for w in vec![1, 2] {
1097    /// #     assert_eq!(stream.next().await.unwrap(), w);
1098    /// # }
1099    /// # }));
1100    /// # }
1101    /// ```
1102    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1103    where
1104        F: Fn(&T) + 'a,
1105    {
1106        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1107
1108        Stream::new(
1109            self.location.clone(),
1110            HydroNode::Inspect {
1111                f,
1112                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1113                metadata: self.location.new_node_metadata(Self::collection_kind()),
1114            },
1115        )
1116    }
1117
1118    /// Executes the provided closure for every element in this stream.
1119    ///
1120    /// Because the closure may have side effects, the stream must have deterministic order
1121    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1122    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1123    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1124    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1125    where
1126        O: IsOrdered,
1127        R: IsExactlyOnce,
1128    {
1129        let f = f.splice_fn1_ctx(&self.location).into();
1130        self.location
1131            .flow_state()
1132            .borrow_mut()
1133            .push_root(HydroRoot::ForEach {
1134                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1135                f,
1136                op_metadata: HydroIrOpMetadata::new(),
1137            });
1138    }
1139
1140    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1141    /// TCP socket to some other server. You should _not_ use this API for interacting with
1142    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1143    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1144    /// interaction with asynchronous sinks.
1145    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1146    where
1147        O: IsOrdered,
1148        R: IsExactlyOnce,
1149        S: 'a + futures::Sink<T> + Unpin,
1150    {
1151        self.location
1152            .flow_state()
1153            .borrow_mut()
1154            .push_root(HydroRoot::DestSink {
1155                sink: sink.splice_typed_ctx(&self.location).into(),
1156                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1157                op_metadata: HydroIrOpMetadata::new(),
1158            });
1159    }
1160
1161    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1162    ///
1163    /// # Example
1164    /// ```rust
1165    /// # #[cfg(feature = "deploy")] {
1166    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1167    /// # use futures::StreamExt;
1168    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1169    /// let tick = process.tick();
1170    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1171    /// numbers.enumerate()
1172    /// # }, |mut stream| async move {
1173    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1174    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1175    /// #     assert_eq!(stream.next().await.unwrap(), w);
1176    /// # }
1177    /// # }));
1178    /// # }
1179    /// ```
1180    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1181    where
1182        O: IsOrdered,
1183        R: IsExactlyOnce,
1184    {
1185        Stream::new(
1186            self.location.clone(),
1187            HydroNode::Enumerate {
1188                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1189                metadata: self.location.new_node_metadata(Stream::<
1190                    (usize, T),
1191                    L,
1192                    B,
1193                    TotalOrder,
1194                    ExactlyOnce,
1195                >::collection_kind()),
1196            },
1197        )
1198    }
1199
1200    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1201    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1202    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1203    ///
1204    /// Depending on the input stream guarantees, the closure may need to be commutative
1205    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1206    ///
1207    /// # Example
1208    /// ```rust
1209    /// # #[cfg(feature = "deploy")] {
1210    /// # use hydro_lang::prelude::*;
1211    /// # use futures::StreamExt;
1212    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1213    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1214    /// words
1215    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1216    ///     .into_stream()
1217    /// # }, |mut stream| async move {
1218    /// // "HELLOWORLD"
1219    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1220    /// # }));
1221    /// # }
1222    /// ```
1223    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1224        self,
1225        init: impl IntoQuotedMut<'a, I, L>,
1226        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1227    ) -> Singleton<A, L, B2>
1228    where
1229        I: Fn() -> A + 'a,
1230        F: Fn(&mut A, T),
1231        C: ValidCommutativityFor<O>,
1232        Idemp: ValidIdempotenceFor<R>,
1233        B: ApplyMonotoneStream<M, B2>,
1234    {
1235        let init = init.splice_fn0_ctx(&self.location).into();
1236        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1237        proof.register_proof(&comb);
1238
1239        // Only assume_retries (for idempotence), not assume_ordering.
1240        // The fold hook in the simulator handles ordering non-determinism directly.
1241        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1242        let retried: Stream<T, L, B, O, ExactlyOnce> = self.assume_retries(nondet);
1243
1244        let core = HydroNode::Fold {
1245            init,
1246            acc: comb.into(),
1247            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1248            metadata: retried
1249                .location
1250                .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1251        };
1252
1253        Singleton::new(retried.location.clone(), core)
1254    }
1255
1256    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1257    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1258    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1259    /// reference, so that it can be modified in place.
1260    ///
1261    /// Depending on the input stream guarantees, the closure may need to be commutative
1262    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1263    ///
1264    /// # Example
1265    /// ```rust
1266    /// # #[cfg(feature = "deploy")] {
1267    /// # use hydro_lang::prelude::*;
1268    /// # use futures::StreamExt;
1269    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1270    /// let bools = process.source_iter(q!(vec![false, true, false]));
1271    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1272    /// # }, |mut stream| async move {
1273    /// // true
1274    /// # assert_eq!(stream.next().await.unwrap(), true);
1275    /// # }));
1276    /// # }
1277    /// ```
1278    pub fn reduce<F, C, Idemp>(
1279        self,
1280        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1281    ) -> Optional<T, L, B>
1282    where
1283        F: Fn(&mut T, T) + 'a,
1284        C: ValidCommutativityFor<O>,
1285        Idemp: ValidIdempotenceFor<R>,
1286    {
1287        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1288        proof.register_proof(&f);
1289
1290        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1291        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1292
1293        let core = HydroNode::Reduce {
1294            f: f.into(),
1295            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1296            metadata: ordered_etc
1297                .location
1298                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1299        };
1300
1301        Optional::new(ordered_etc.location.clone(), core)
1302    }
1303
1304    /// Computes the maximum element in the stream as an [`Optional`], which
1305    /// will be empty until the first element in the input arrives.
1306    ///
1307    /// # Example
1308    /// ```rust
1309    /// # #[cfg(feature = "deploy")] {
1310    /// # use hydro_lang::prelude::*;
1311    /// # use futures::StreamExt;
1312    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1313    /// let tick = process.tick();
1314    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1315    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1316    /// batch.max().all_ticks()
1317    /// # }, |mut stream| async move {
1318    /// // 4
1319    /// # assert_eq!(stream.next().await.unwrap(), 4);
1320    /// # }));
1321    /// # }
1322    /// ```
1323    pub fn max(self) -> Optional<T, L, B>
1324    where
1325        T: Ord,
1326    {
1327        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1328            .assume_ordering_trusted_bounded::<TotalOrder>(
1329                nondet!(/** max is commutative, but order affects intermediates */),
1330            )
1331            .reduce(q!(|curr, new| {
1332                if new > *curr {
1333                    *curr = new;
1334                }
1335            }))
1336    }
1337
1338    /// Computes the minimum element in the stream as an [`Optional`], which
1339    /// will be empty until the first element in the input arrives.
1340    ///
1341    /// # Example
1342    /// ```rust
1343    /// # #[cfg(feature = "deploy")] {
1344    /// # use hydro_lang::prelude::*;
1345    /// # use futures::StreamExt;
1346    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347    /// let tick = process.tick();
1348    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350    /// batch.min().all_ticks()
1351    /// # }, |mut stream| async move {
1352    /// // 1
1353    /// # assert_eq!(stream.next().await.unwrap(), 1);
1354    /// # }));
1355    /// # }
1356    /// ```
1357    pub fn min(self) -> Optional<T, L, B>
1358    where
1359        T: Ord,
1360    {
1361        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1362            .assume_ordering_trusted_bounded::<TotalOrder>(
1363                nondet!(/** max is commutative, but order affects intermediates */),
1364            )
1365            .reduce(q!(|curr, new| {
1366                if new < *curr {
1367                    *curr = new;
1368                }
1369            }))
1370    }
1371
1372    /// Computes the first element in the stream as an [`Optional`], which
1373    /// will be empty until the first element in the input arrives.
1374    ///
1375    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1376    /// re-ordering of elements may cause the first element to change.
1377    ///
1378    /// # Example
1379    /// ```rust
1380    /// # #[cfg(feature = "deploy")] {
1381    /// # use hydro_lang::prelude::*;
1382    /// # use futures::StreamExt;
1383    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384    /// let tick = process.tick();
1385    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1386    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1387    /// batch.first().all_ticks()
1388    /// # }, |mut stream| async move {
1389    /// // 1
1390    /// # assert_eq!(stream.next().await.unwrap(), 1);
1391    /// # }));
1392    /// # }
1393    /// ```
1394    pub fn first(self) -> Optional<T, L, B>
1395    where
1396        O: IsOrdered,
1397    {
1398        self.make_totally_ordered()
1399            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1400            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1401            .reduce(q!(|_, _| {}))
1402    }
1403
1404    /// Computes the last element in the stream as an [`Optional`], which
1405    /// will be empty until an element in the input arrives.
1406    ///
1407    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1408    /// re-ordering of elements may cause the last element to change.
1409    ///
1410    /// # Example
1411    /// ```rust
1412    /// # #[cfg(feature = "deploy")] {
1413    /// # use hydro_lang::prelude::*;
1414    /// # use futures::StreamExt;
1415    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1416    /// let tick = process.tick();
1417    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1418    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1419    /// batch.last().all_ticks()
1420    /// # }, |mut stream| async move {
1421    /// // 4
1422    /// # assert_eq!(stream.next().await.unwrap(), 4);
1423    /// # }));
1424    /// # }
1425    /// ```
1426    pub fn last(self) -> Optional<T, L, B>
1427    where
1428        O: IsOrdered,
1429    {
1430        self.make_totally_ordered()
1431            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1432            .reduce(q!(|curr, new| *curr = new))
1433    }
1434
1435    /// Returns a stream containing at most the first `n` elements of the input stream,
1436    /// preserving the original order. Similar to `LIMIT` in SQL.
1437    ///
1438    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1439    /// retries, since the result depends on the order and cardinality of elements.
1440    ///
1441    /// # Example
1442    /// ```rust
1443    /// # #[cfg(feature = "deploy")] {
1444    /// # use hydro_lang::prelude::*;
1445    /// # use futures::StreamExt;
1446    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1447    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1448    /// numbers.limit(q!(3))
1449    /// # }, |mut stream| async move {
1450    /// // 10, 20, 30
1451    /// # for w in vec![10, 20, 30] {
1452    /// #     assert_eq!(stream.next().await.unwrap(), w);
1453    /// # }
1454    /// # }));
1455    /// # }
1456    /// ```
1457    pub fn limit(
1458        self,
1459        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1460    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1461    where
1462        O: IsOrdered,
1463        R: IsExactlyOnce,
1464    {
1465        self.generator(
1466            q!(|| 0usize),
1467            q!(move |count, item| {
1468                if *count == n {
1469                    Generate::Break
1470                } else {
1471                    *count += 1;
1472                    if *count == n {
1473                        Generate::Return(item)
1474                    } else {
1475                        Generate::Yield(item)
1476                    }
1477                }
1478            }),
1479        )
1480    }
1481
1482    /// Collects all the elements of this stream into a single [`Vec`] element.
1483    ///
1484    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1485    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1486    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1487    /// the vector at an arbitrary point in time.
1488    ///
1489    /// # Example
1490    /// ```rust
1491    /// # #[cfg(feature = "deploy")] {
1492    /// # use hydro_lang::prelude::*;
1493    /// # use futures::StreamExt;
1494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1495    /// let tick = process.tick();
1496    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1497    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1498    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1499    /// # }, |mut stream| async move {
1500    /// // [ vec![1, 2, 3, 4] ]
1501    /// # for w in vec![vec![1, 2, 3, 4]] {
1502    /// #     assert_eq!(stream.next().await.unwrap(), w);
1503    /// # }
1504    /// # }));
1505    /// # }
1506    /// ```
1507    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1508    where
1509        O: IsOrdered,
1510        R: IsExactlyOnce,
1511    {
1512        self.make_totally_ordered().make_exactly_once().fold(
1513            q!(|| vec![]),
1514            q!(|acc, v| {
1515                acc.push(v);
1516            }),
1517        )
1518    }
1519
1520    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1521    /// and emitting each intermediate result.
1522    ///
1523    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1524    /// containing all intermediate accumulated values. The scan operation can also terminate early
1525    /// by returning `None`.
1526    ///
1527    /// The function takes a mutable reference to the accumulator and the current element, and returns
1528    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1529    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1530    ///
1531    /// # Examples
1532    ///
1533    /// Basic usage - running sum:
1534    /// ```rust
1535    /// # #[cfg(feature = "deploy")] {
1536    /// # use hydro_lang::prelude::*;
1537    /// # use futures::StreamExt;
1538    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1540    ///     q!(|| 0),
1541    ///     q!(|acc, x| {
1542    ///         *acc += x;
1543    ///         Some(*acc)
1544    ///     }),
1545    /// )
1546    /// # }, |mut stream| async move {
1547    /// // Output: 1, 3, 6, 10
1548    /// # for w in vec![1, 3, 6, 10] {
1549    /// #     assert_eq!(stream.next().await.unwrap(), w);
1550    /// # }
1551    /// # }));
1552    /// # }
1553    /// ```
1554    ///
1555    /// Early termination example:
1556    /// ```rust
1557    /// # #[cfg(feature = "deploy")] {
1558    /// # use hydro_lang::prelude::*;
1559    /// # use futures::StreamExt;
1560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1562    ///     q!(|| 1),
1563    ///     q!(|state, x| {
1564    ///         *state = *state * x;
1565    ///         if *state > 6 {
1566    ///             None // Terminate the stream
1567    ///         } else {
1568    ///             Some(-*state)
1569    ///         }
1570    ///     }),
1571    /// )
1572    /// # }, |mut stream| async move {
1573    /// // Output: -1, -2, -6
1574    /// # for w in vec![-1, -2, -6] {
1575    /// #     assert_eq!(stream.next().await.unwrap(), w);
1576    /// # }
1577    /// # }));
1578    /// # }
1579    /// ```
1580    pub fn scan<A, U, I, F>(
1581        self,
1582        init: impl IntoQuotedMut<'a, I, L>,
1583        f: impl IntoQuotedMut<'a, F, L>,
1584    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1585    where
1586        O: IsOrdered,
1587        R: IsExactlyOnce,
1588        I: Fn() -> A + 'a,
1589        F: Fn(&mut A, T) -> Option<U> + 'a,
1590    {
1591        let init = init.splice_fn0_ctx(&self.location).into();
1592        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1593
1594        Stream::new(
1595            self.location.clone(),
1596            HydroNode::Scan {
1597                init,
1598                acc: f,
1599                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1600                metadata: self.location.new_node_metadata(
1601                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1602                ),
1603            },
1604        )
1605    }
1606
1607    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1608    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1609    /// by the function.
1610    ///
1611    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1612    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1613    /// emitted. If it resolves to `None`, the item is filtered out.
1614    ///
1615    /// # Examples
1616    ///
1617    /// ```rust
1618    /// # #[cfg(feature = "deploy")] {
1619    /// # use hydro_lang::prelude::*;
1620    /// # use futures::StreamExt;
1621    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1622    /// process
1623    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1624    ///     .scan_async_blocking(
1625    ///         q!(|| 0),
1626    ///         q!(|acc, x| {
1627    ///             *acc += x;
1628    ///             let val = *acc;
1629    ///             async move { Some(val) }
1630    ///         }),
1631    ///     )
1632    /// # }, |mut stream| async move {
1633    /// // Output: 1, 3, 6, 10
1634    /// # for w in vec![1, 3, 6, 10] {
1635    /// #     assert_eq!(stream.next().await.unwrap(), w);
1636    /// # }
1637    /// # }));
1638    /// # }
1639    /// ```
1640    pub fn scan_async_blocking<A, U, I, F, Fut>(
1641        self,
1642        init: impl IntoQuotedMut<'a, I, L>,
1643        f: impl IntoQuotedMut<'a, F, L>,
1644    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1645    where
1646        O: IsOrdered,
1647        R: IsExactlyOnce,
1648        I: Fn() -> A + 'a,
1649        F: Fn(&mut A, T) -> Fut + 'a,
1650        Fut: Future<Output = Option<U>> + 'a,
1651    {
1652        let init = init.splice_fn0_ctx(&self.location).into();
1653        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1654
1655        Stream::new(
1656            self.location.clone(),
1657            HydroNode::ScanAsyncBlocking {
1658                init,
1659                acc: f,
1660                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1661                metadata: self.location.new_node_metadata(
1662                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1663                ),
1664            },
1665        )
1666    }
1667
1668    /// Iteratively processes the elements of the stream using a state machine that can yield
1669    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1670    /// syntax in Rust, without requiring special syntax.
1671    ///
1672    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1673    /// state. The second argument defines the processing logic, taking in a mutable reference
1674    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1675    /// variants define what is emitted and whether further inputs should be processed.
1676    ///
1677    /// # Example
1678    /// ```rust
1679    /// # #[cfg(feature = "deploy")] {
1680    /// # use hydro_lang::prelude::*;
1681    /// # use futures::StreamExt;
1682    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1683    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1684    ///     q!(|| 0),
1685    ///     q!(|acc, x| {
1686    ///         *acc += x;
1687    ///         if *acc > 100 {
1688    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1689    ///         } else if *acc % 2 == 0 {
1690    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1691    ///         } else {
1692    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1693    ///         }
1694    ///     }),
1695    /// )
1696    /// # }, |mut stream| async move {
1697    /// // Output: "even", "done!"
1698    /// # let mut results = Vec::new();
1699    /// # for _ in 0..2 {
1700    /// #     results.push(stream.next().await.unwrap());
1701    /// # }
1702    /// # results.sort();
1703    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1704    /// # }));
1705    /// # }
1706    /// ```
1707    pub fn generator<A, U, I, F>(
1708        self,
1709        init: impl IntoQuotedMut<'a, I, L> + Copy,
1710        f: impl IntoQuotedMut<'a, F, L> + Copy,
1711    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1712    where
1713        O: IsOrdered,
1714        R: IsExactlyOnce,
1715        I: Fn() -> A + 'a,
1716        F: Fn(&mut A, T) -> Generate<U> + 'a,
1717    {
1718        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1719        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1720
1721        let this = self.make_totally_ordered().make_exactly_once();
1722
1723        // State is Option<Option<A>>:
1724        //   None = not yet initialized
1725        //   Some(Some(a)) = active with state a
1726        //   Some(None) = terminated
1727        let scan_init = q!(|| None)
1728            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1729            .into();
1730        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1731            if state.is_none() {
1732                *state = Some(Some(init()));
1733            }
1734            match state {
1735                Some(Some(state_value)) => match f(state_value, v) {
1736                    Generate::Yield(out) => Some(Some(out)),
1737                    Generate::Return(out) => {
1738                        *state = Some(None);
1739                        Some(Some(out))
1740                    }
1741                    // Unlike KeyedStream, we can terminate the scan directly on
1742                    // Break/Return because there is only one state (no other keys
1743                    // that still need processing).
1744                    Generate::Break => None,
1745                    Generate::Continue => Some(None),
1746                },
1747                // State is Some(None) after Return; terminate the scan.
1748                _ => None,
1749            }
1750        })
1751        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1752        .into();
1753
1754        let scan_node = HydroNode::Scan {
1755            init: scan_init,
1756            acc: scan_f,
1757            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1758            metadata: this.location.new_node_metadata(Stream::<
1759                Option<U>,
1760                L,
1761                B,
1762                TotalOrder,
1763                ExactlyOnce,
1764            >::collection_kind()),
1765        };
1766
1767        let flatten_f = q!(|d| d)
1768            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1769            .into();
1770        let flatten_node = HydroNode::FlatMap {
1771            f: flatten_f,
1772            input: Box::new(scan_node),
1773            metadata: this
1774                .location
1775                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1776        };
1777
1778        Stream::new(this.location.clone(), flatten_node)
1779    }
1780
1781    /// Given a time interval, returns a stream corresponding to samples taken from the
1782    /// stream roughly at that interval. The output will have elements in the same order
1783    /// as the input, but with arbitrary elements skipped between samples. There is also
1784    /// no guarantee on the exact timing of the samples.
1785    ///
1786    /// # Non-Determinism
1787    /// The output stream is non-deterministic in which elements are sampled, since this
1788    /// is controlled by a clock.
1789    pub fn sample_every(
1790        self,
1791        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1792        nondet: NonDet,
1793    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1794    where
1795        L: NoTick + NoAtomic,
1796    {
1797        let samples = self.location.source_interval(interval);
1798
1799        let tick = self.location.tick();
1800        self.batch(&tick, nondet)
1801            .filter_if(samples.batch(&tick, nondet).first().is_some())
1802            .all_ticks()
1803            .weaken_retries()
1804    }
1805
1806    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1807    /// stream has not emitted a value since that duration.
1808    ///
1809    /// # Non-Determinism
1810    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1811    /// samples take place, timeouts may be non-deterministically generated or missed,
1812    /// and the notification of the timeout may be delayed as well. There is also no
1813    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1814    /// detected based on when the next sample is taken.
1815    pub fn timeout(
1816        self,
1817        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1818        nondet: NonDet,
1819    ) -> Optional<(), L, Unbounded>
1820    where
1821        L: NoTick + NoAtomic,
1822    {
1823        let tick = self.location.tick();
1824
1825        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1826            q!(|| None),
1827            q!(
1828                |latest, _| {
1829                    *latest = Some(Instant::now());
1830                },
1831                commutative = manual_proof!(/** TODO */)
1832            ),
1833        );
1834
1835        latest_received
1836            .snapshot(&tick, nondet)
1837            .filter_map(q!(move |latest_received| {
1838                if let Some(latest_received) = latest_received {
1839                    if Instant::now().duration_since(latest_received) > duration {
1840                        Some(())
1841                    } else {
1842                        None
1843                    }
1844                } else {
1845                    Some(())
1846                }
1847            }))
1848            .latest()
1849    }
1850
1851    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1852    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1853    ///
1854    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1855    /// processed before an acknowledgement is emitted.
1856    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1857        let id = self.location.flow_state().borrow_mut().next_clock_id();
1858        let out_location = Atomic {
1859            tick: Tick {
1860                id,
1861                l: self.location.clone(),
1862            },
1863        };
1864        Stream::new(
1865            out_location.clone(),
1866            HydroNode::BeginAtomic {
1867                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1868                metadata: out_location
1869                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1870            },
1871        )
1872    }
1873
1874    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1875    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1876    /// the order of the input. The output stream will execute in the [`Tick`] that was
1877    /// used to create the atomic section.
1878    ///
1879    /// # Non-Determinism
1880    /// The batch boundaries are non-deterministic and may change across executions.
1881    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1882        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1883        Stream::new(
1884            tick.clone(),
1885            HydroNode::Batch {
1886                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1887                metadata: tick
1888                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1889            },
1890        )
1891    }
1892
1893    /// An operator which allows you to "name" a `HydroNode`.
1894    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1895    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1896        {
1897            let mut node = self.ir_node.borrow_mut();
1898            let metadata = node.metadata_mut();
1899            metadata.tag = Some(name.to_owned());
1900        }
1901        self
1902    }
1903
1904    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
1905    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
1906    /// so uses must be carefully vetted.
1907    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
1908    where
1909        B: IsBounded,
1910    {
1911        Optional::new(
1912            self.location.clone(),
1913            HydroNode::Cast {
1914                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1915                metadata: self
1916                    .location
1917                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1918            },
1919        )
1920    }
1921
1922    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
1923        if O::ORDERING_KIND == O2::ORDERING_KIND {
1924            Stream::new(
1925                self.location.clone(),
1926                self.ir_node.replace(HydroNode::Placeholder),
1927            )
1928        } else {
1929            panic!(
1930                "Runtime ordering {:?} did not match requested cast {:?}.",
1931                O::ORDERING_KIND,
1932                O2::ORDERING_KIND
1933            )
1934        }
1935    }
1936
1937    /// Explicitly "casts" the stream to a type with a different ordering
1938    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1939    /// by the type-system.
1940    ///
1941    /// # Non-Determinism
1942    /// This function is used as an escape hatch, and any mistakes in the
1943    /// provided ordering guarantee will propagate into the guarantees
1944    /// for the rest of the program.
1945    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1946        if O::ORDERING_KIND == O2::ORDERING_KIND {
1947            self.use_ordering_type()
1948        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1949            // We can always weaken the ordering guarantee
1950            Stream::new(
1951                self.location.clone(),
1952                HydroNode::Cast {
1953                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1954                    metadata: self
1955                        .location
1956                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1957                },
1958            )
1959        } else {
1960            Stream::new(
1961                self.location.clone(),
1962                HydroNode::ObserveNonDet {
1963                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1964                    trusted: false,
1965                    metadata: self
1966                        .location
1967                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1968                },
1969            )
1970        }
1971    }
1972
1973    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1974    // intermediate states will not be revealed
1975    fn assume_ordering_trusted_bounded<O2: Ordering>(
1976        self,
1977        nondet: NonDet,
1978    ) -> Stream<T, L, B, O2, R> {
1979        if B::BOUNDED {
1980            self.assume_ordering_trusted(nondet)
1981        } else {
1982            self.assume_ordering(nondet)
1983        }
1984    }
1985
1986    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1987    // is not observable
1988    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1989        self,
1990        _nondet: NonDet,
1991    ) -> Stream<T, L, B, O2, R> {
1992        if O::ORDERING_KIND == O2::ORDERING_KIND {
1993            Stream::new(
1994                self.location.clone(),
1995                self.ir_node.replace(HydroNode::Placeholder),
1996            )
1997        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1998            // We can always weaken the ordering guarantee
1999            Stream::new(
2000                self.location.clone(),
2001                HydroNode::Cast {
2002                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2003                    metadata: self
2004                        .location
2005                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2006                },
2007            )
2008        } else {
2009            Stream::new(
2010                self.location.clone(),
2011                HydroNode::ObserveNonDet {
2012                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2013                    trusted: true,
2014                    metadata: self
2015                        .location
2016                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2017                },
2018            )
2019        }
2020    }
2021
2022    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2023    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2024    /// which is always safe because that is the weakest possible guarantee.
2025    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2026        self.weaken_ordering::<NoOrder>()
2027    }
2028
2029    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2030    /// enforcing that `O2` is weaker than the input ordering guarantee.
2031    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2032        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2033        self.assume_ordering::<O2>(nondet)
2034    }
2035
2036    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2037    /// implies that `O == TotalOrder`.
2038    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2039    where
2040        O: IsOrdered,
2041    {
2042        self.assume_ordering(nondet!(/** no-op */))
2043    }
2044
2045    /// Explicitly "casts" the stream to a type with a different retries
2046    /// guarantee. Useful in unsafe code where the lack of retries cannot
2047    /// be proven by the type-system.
2048    ///
2049    /// # Non-Determinism
2050    /// This function is used as an escape hatch, and any mistakes in the
2051    /// provided retries guarantee will propagate into the guarantees
2052    /// for the rest of the program.
2053    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2054        if R::RETRIES_KIND == R2::RETRIES_KIND {
2055            Stream::new(
2056                self.location.clone(),
2057                self.ir_node.replace(HydroNode::Placeholder),
2058            )
2059        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2060            // We can always weaken the retries guarantee
2061            Stream::new(
2062                self.location.clone(),
2063                HydroNode::Cast {
2064                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2065                    metadata: self
2066                        .location
2067                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2068                },
2069            )
2070        } else {
2071            Stream::new(
2072                self.location.clone(),
2073                HydroNode::ObserveNonDet {
2074                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2075                    trusted: false,
2076                    metadata: self
2077                        .location
2078                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2079                },
2080            )
2081        }
2082    }
2083
2084    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2085    // is not observable
2086    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2087        if R::RETRIES_KIND == R2::RETRIES_KIND {
2088            Stream::new(
2089                self.location.clone(),
2090                self.ir_node.replace(HydroNode::Placeholder),
2091            )
2092        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2093            // We can always weaken the retries guarantee
2094            Stream::new(
2095                self.location.clone(),
2096                HydroNode::Cast {
2097                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2098                    metadata: self
2099                        .location
2100                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2101                },
2102            )
2103        } else {
2104            Stream::new(
2105                self.location.clone(),
2106                HydroNode::ObserveNonDet {
2107                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2108                    trusted: true,
2109                    metadata: self
2110                        .location
2111                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2112                },
2113            )
2114        }
2115    }
2116
2117    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2118    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2119    /// which is always safe because that is the weakest possible guarantee.
2120    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2121        self.weaken_retries::<AtLeastOnce>()
2122    }
2123
2124    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2125    /// enforcing that `R2` is weaker than the input retries guarantee.
2126    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2127        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2128        self.assume_retries::<R2>(nondet)
2129    }
2130
2131    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2132    /// implies that `R == ExactlyOnce`.
2133    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2134    where
2135        R: IsExactlyOnce,
2136    {
2137        self.assume_retries(nondet!(/** no-op */))
2138    }
2139
2140    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2141    /// implies that `B == Bounded`.
2142    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2143    where
2144        B: IsBounded,
2145    {
2146        self.weaken_boundedness()
2147    }
2148
2149    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2150    /// which implies that `B == Bounded`.
2151    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2152        if B::BOUNDED == B2::BOUNDED {
2153            Stream::new(
2154                self.location.clone(),
2155                self.ir_node.replace(HydroNode::Placeholder),
2156            )
2157        } else {
2158            // We can always weaken the boundedness
2159            Stream::new(
2160                self.location.clone(),
2161                HydroNode::Cast {
2162                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2163                    metadata: self
2164                        .location
2165                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2166                },
2167            )
2168        }
2169    }
2170}
2171
2172impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2173where
2174    L: Location<'a>,
2175{
2176    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2177    ///
2178    /// # Example
2179    /// ```rust
2180    /// # #[cfg(feature = "deploy")] {
2181    /// # use hydro_lang::prelude::*;
2182    /// # use futures::StreamExt;
2183    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2184    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2185    /// # }, |mut stream| async move {
2186    /// // 1, 2, 3
2187    /// # for w in vec![1, 2, 3] {
2188    /// #     assert_eq!(stream.next().await.unwrap(), w);
2189    /// # }
2190    /// # }));
2191    /// # }
2192    /// ```
2193    pub fn cloned(self) -> Stream<T, L, B, O, R>
2194    where
2195        T: Clone,
2196    {
2197        self.map(q!(|d| d.clone()))
2198    }
2199}
2200
2201impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2202where
2203    L: Location<'a>,
2204{
2205    /// Computes the number of elements in the stream as a [`Singleton`].
2206    ///
2207    /// # Example
2208    /// ```rust
2209    /// # #[cfg(feature = "deploy")] {
2210    /// # use hydro_lang::prelude::*;
2211    /// # use futures::StreamExt;
2212    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2213    /// let tick = process.tick();
2214    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2215    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2216    /// batch.count().all_ticks()
2217    /// # }, |mut stream| async move {
2218    /// // 4
2219    /// # assert_eq!(stream.next().await.unwrap(), 4);
2220    /// # }));
2221    /// # }
2222    /// ```
2223    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2224        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2225            /// Order does not affect eventual count, and also does not affect intermediate states.
2226        ))
2227        .fold(
2228            q!(|| 0usize),
2229            q!(
2230                |count, _| *count += 1,
2231                monotone = manual_proof!(/** += 1 is monotone */)
2232            ),
2233        )
2234    }
2235}
2236
2237impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2238    /// Produces a new stream that merges the elements of the two input streams.
2239    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2240    ///
2241    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2242    /// [`Bounded`], you can use [`Stream::chain`] instead.
2243    ///
2244    /// # Example
2245    /// ```rust
2246    /// # #[cfg(feature = "deploy")] {
2247    /// # use hydro_lang::prelude::*;
2248    /// # use futures::StreamExt;
2249    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2250    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2251    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2252    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2253    /// # }, |mut stream| async move {
2254    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2255    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2256    /// #     assert_eq!(stream.next().await.unwrap(), w);
2257    /// # }
2258    /// # }));
2259    /// # }
2260    /// ```
2261    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2262        self,
2263        other: Stream<T, L, Unbounded, O2, R2>,
2264    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2265    where
2266        R: MinRetries<R2>,
2267    {
2268        Stream::new(
2269            self.location.clone(),
2270            HydroNode::Chain {
2271                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2272                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2273                metadata: self.location.new_node_metadata(Stream::<
2274                    T,
2275                    L,
2276                    Unbounded,
2277                    NoOrder,
2278                    <R as MinRetries<R2>>::Min,
2279                >::collection_kind()),
2280            },
2281        )
2282    }
2283
2284    /// Deprecated: use [`Stream::merge_unordered`] instead.
2285    #[deprecated(note = "use `merge_unordered` instead")]
2286    pub fn interleave<O2: Ordering, R2: Retries>(
2287        self,
2288        other: Stream<T, L, Unbounded, O2, R2>,
2289    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2290    where
2291        R: MinRetries<R2>,
2292    {
2293        self.merge_unordered(other)
2294    }
2295}
2296
2297impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2298    /// Produces a new stream that combines the elements of the two input streams,
2299    /// preserving the relative order of elements within each input.
2300    ///
2301    /// # Non-Determinism
2302    /// The order in which elements *across* the two streams will be interleaved is
2303    /// non-deterministic, so the order of elements will vary across runs. If the output
2304    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2305    /// but emits an unordered stream. For deterministic first-then-second ordering on
2306    /// bounded streams, use [`Stream::chain`].
2307    ///
2308    /// # Example
2309    /// ```rust
2310    /// # #[cfg(feature = "deploy")] {
2311    /// # use hydro_lang::prelude::*;
2312    /// # use futures::StreamExt;
2313    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2314    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2315    /// # process.source_iter(q!(vec![1, 3])).into();
2316    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2317    /// # }, |mut stream| async move {
2318    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2319    /// # for w in vec![1, 3, 2, 4] {
2320    /// #     assert_eq!(stream.next().await.unwrap(), w);
2321    /// # }
2322    /// # }));
2323    /// # }
2324    /// ```
2325    pub fn merge_ordered<R2: Retries>(
2326        self,
2327        other: Stream<T, L, B, TotalOrder, R2>,
2328        _nondet: NonDet,
2329    ) -> Stream<T, L, B, TotalOrder, <R as MinRetries<R2>>::Min>
2330    where
2331        R: MinRetries<R2>,
2332    {
2333        Stream::new(
2334            self.location.clone(),
2335            HydroNode::MergeOrdered {
2336                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2337                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2338                metadata: self.location.new_node_metadata(Stream::<
2339                    T,
2340                    L,
2341                    B,
2342                    TotalOrder,
2343                    <R as MinRetries<R2>>::Min,
2344                >::collection_kind()),
2345            },
2346        )
2347    }
2348}
2349
2350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2351where
2352    L: Location<'a>,
2353{
2354    /// Produces a new stream that emits the input elements in sorted order.
2355    ///
2356    /// The input stream can have any ordering guarantee, but the output stream
2357    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2358    /// elements in the input stream are available, so it requires the input stream
2359    /// to be [`Bounded`].
2360    ///
2361    /// # Example
2362    /// ```rust
2363    /// # #[cfg(feature = "deploy")] {
2364    /// # use hydro_lang::prelude::*;
2365    /// # use futures::StreamExt;
2366    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2367    /// let tick = process.tick();
2368    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2369    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2370    /// batch.sort().all_ticks()
2371    /// # }, |mut stream| async move {
2372    /// // 1, 2, 3, 4
2373    /// # for w in (1..5) {
2374    /// #     assert_eq!(stream.next().await.unwrap(), w);
2375    /// # }
2376    /// # }));
2377    /// # }
2378    /// ```
2379    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2380    where
2381        B: IsBounded,
2382        T: Ord,
2383    {
2384        let this = self.make_bounded();
2385        Stream::new(
2386            this.location.clone(),
2387            HydroNode::Sort {
2388                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2389                metadata: this
2390                    .location
2391                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2392            },
2393        )
2394    }
2395
2396    /// Produces a new stream that first emits the elements of the `self` stream,
2397    /// and then emits the elements of the `other` stream. The output stream has
2398    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2399    /// [`TotalOrder`] guarantee.
2400    ///
2401    /// Currently, both input streams must be [`Bounded`]. This operator will block
2402    /// on the first stream until all its elements are available. In a future version,
2403    /// we will relax the requirement on the `other` stream.
2404    ///
2405    /// # Example
2406    /// ```rust
2407    /// # #[cfg(feature = "deploy")] {
2408    /// # use hydro_lang::prelude::*;
2409    /// # use futures::StreamExt;
2410    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2411    /// let tick = process.tick();
2412    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2413    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2414    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2415    /// # }, |mut stream| async move {
2416    /// // 2, 3, 4, 5, 1, 2, 3, 4
2417    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2418    /// #     assert_eq!(stream.next().await.unwrap(), w);
2419    /// # }
2420    /// # }));
2421    /// # }
2422    /// ```
2423    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2424        self,
2425        other: Stream<T, L, B2, O2, R2>,
2426    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2427    where
2428        B: IsBounded,
2429        O: MinOrder<O2>,
2430        R: MinRetries<R2>,
2431    {
2432        check_matching_location(&self.location, &other.location);
2433
2434        Stream::new(
2435            self.location.clone(),
2436            HydroNode::Chain {
2437                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2438                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2439                metadata: self.location.new_node_metadata(Stream::<
2440                    T,
2441                    L,
2442                    B2,
2443                    <O as MinOrder<O2>>::Min,
2444                    <R as MinRetries<R2>>::Min,
2445                >::collection_kind()),
2446            },
2447        )
2448    }
2449
2450    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2451    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2452    /// because this is compiled into a nested loop.
2453    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2454        self,
2455        other: Stream<T2, L, Bounded, O2, R>,
2456    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2457    where
2458        B: IsBounded,
2459        T: Clone,
2460        T2: Clone,
2461    {
2462        let this = self.make_bounded();
2463        check_matching_location(&this.location, &other.location);
2464
2465        Stream::new(
2466            this.location.clone(),
2467            HydroNode::CrossProduct {
2468                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2469                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2470                metadata: this.location.new_node_metadata(Stream::<
2471                    (T, T2),
2472                    L,
2473                    Bounded,
2474                    <O2 as MinOrder<O>>::Min,
2475                    R,
2476                >::collection_kind()),
2477            },
2478        )
2479    }
2480
2481    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2482    /// `self` used as the values for *each* key.
2483    ///
2484    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2485    /// values. For example, it can be used to send the same set of elements to several cluster
2486    /// members, if the membership information is available as a [`KeyedSingleton`].
2487    ///
2488    /// # Example
2489    /// ```rust
2490    /// # #[cfg(feature = "deploy")] {
2491    /// # use hydro_lang::prelude::*;
2492    /// # use futures::StreamExt;
2493    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2494    /// # let tick = process.tick();
2495    /// let keyed_singleton = // { 1: (), 2: () }
2496    /// # process
2497    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2498    /// #     .into_keyed()
2499    /// #     .batch(&tick, nondet!(/** test */))
2500    /// #     .first();
2501    /// let stream = // [ "a", "b" ]
2502    /// # process
2503    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2504    /// #     .batch(&tick, nondet!(/** test */));
2505    /// stream.repeat_with_keys(keyed_singleton)
2506    /// # .entries().all_ticks()
2507    /// # }, |mut stream| async move {
2508    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2509    /// # let mut results = Vec::new();
2510    /// # for _ in 0..4 {
2511    /// #     results.push(stream.next().await.unwrap());
2512    /// # }
2513    /// # results.sort();
2514    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2515    /// # }));
2516    /// # }
2517    /// ```
2518    pub fn repeat_with_keys<K, V2>(
2519        self,
2520        keys: KeyedSingleton<K, V2, L, Bounded>,
2521    ) -> KeyedStream<K, T, L, Bounded, O, R>
2522    where
2523        B: IsBounded,
2524        K: Clone,
2525        T: Clone,
2526    {
2527        keys.keys()
2528            .weaken_retries()
2529            .assume_ordering_trusted::<TotalOrder>(
2530                nondet!(/** keyed stream does not depend on ordering of keys */),
2531            )
2532            .cross_product_nested_loop(self.make_bounded())
2533            .into_keyed()
2534    }
2535
2536    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2537    /// execution until all results are available. The output order is based on when futures
2538    /// complete, and may be different than the input order.
2539    ///
2540    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2541    /// while futures are pending, this variant blocks until the futures resolve.
2542    ///
2543    /// # Example
2544    /// ```rust
2545    /// # #[cfg(feature = "deploy")] {
2546    /// # use std::collections::HashSet;
2547    /// # use futures::StreamExt;
2548    /// # use hydro_lang::prelude::*;
2549    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2550    /// process
2551    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2552    ///     .map(q!(|x| async move {
2553    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2554    ///         x
2555    ///     }))
2556    ///     .resolve_futures_blocking()
2557    /// #   },
2558    /// #   |mut stream| async move {
2559    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2560    /// #       let mut output = HashSet::new();
2561    /// #       for _ in 1..10 {
2562    /// #           output.insert(stream.next().await.unwrap());
2563    /// #       }
2564    /// #       assert_eq!(
2565    /// #           output,
2566    /// #           HashSet::<i32>::from_iter(1..10)
2567    /// #       );
2568    /// #   },
2569    /// # ));
2570    /// # }
2571    /// ```
2572    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2573    where
2574        T: Future,
2575    {
2576        Stream::new(
2577            self.location.clone(),
2578            HydroNode::ResolveFuturesBlocking {
2579                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2580                metadata: self
2581                    .location
2582                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2583            },
2584        )
2585    }
2586
2587    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2588    ///
2589    /// # Example
2590    /// ```rust
2591    /// # #[cfg(feature = "deploy")] {
2592    /// # use hydro_lang::prelude::*;
2593    /// # use futures::StreamExt;
2594    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2595    /// let tick = process.tick();
2596    /// let empty: Stream<i32, _, Bounded> = process
2597    ///   .source_iter(q!(Vec::<i32>::new()))
2598    ///   .batch(&tick, nondet!(/** test */));
2599    /// empty.is_empty().all_ticks()
2600    /// # }, |mut stream| async move {
2601    /// // true
2602    /// # assert_eq!(stream.next().await.unwrap(), true);
2603    /// # }));
2604    /// # }
2605    /// ```
2606    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2607    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2608    where
2609        B: IsBounded,
2610    {
2611        self.make_bounded()
2612            .assume_ordering_trusted::<TotalOrder>(
2613                nondet!(/** is_empty intermediates unaffected by order */),
2614            )
2615            .first()
2616            .is_none()
2617    }
2618}
2619
2620impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2621where
2622    L: Location<'a>,
2623{
2624    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2625    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2626    /// by equi-joining the two streams on the key attribute `K`.
2627    ///
2628    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2629    /// and streams the left side through, preserving the left side's ordering. When both
2630    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2631    ///
2632    /// # Example
2633    /// ```rust
2634    /// # #[cfg(feature = "deploy")] {
2635    /// # use hydro_lang::prelude::*;
2636    /// # use std::collections::HashSet;
2637    /// # use futures::StreamExt;
2638    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2639    /// let tick = process.tick();
2640    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2641    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2642    /// stream1.join(stream2)
2643    /// # }, |mut stream| async move {
2644    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2645    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2646    /// # stream.map(|i| assert!(expected.contains(&i)));
2647    /// # }));
2648    /// # }
2649    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2650        self,
2651        n: Stream<(K, V2), L, B2, O2, R2>,
2652    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2653    where
2654        K: Eq + Hash + Clone,
2655        R: MinRetries<R2>,
2656        V1: Clone,
2657        V2: Clone,
2658    {
2659        check_matching_location(&self.location, &n.location);
2660
2661        let ir_node = if B2::BOUNDED {
2662            HydroNode::JoinHalf {
2663                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2664                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2665                metadata: self.location.new_node_metadata(Stream::<
2666                    (K, (V1, V2)),
2667                    L,
2668                    B,
2669                    B2::PreserveOrderIfBounded<O>,
2670                    <R as MinRetries<R2>>::Min,
2671                >::collection_kind()),
2672            }
2673        } else {
2674            HydroNode::Join {
2675                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2676                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2677                metadata: self.location.new_node_metadata(Stream::<
2678                    (K, (V1, V2)),
2679                    L,
2680                    B,
2681                    B2::PreserveOrderIfBounded<O>,
2682                    <R as MinRetries<R2>>::Min,
2683                >::collection_kind()),
2684            }
2685        };
2686
2687        Stream::new(self.location.clone(), ir_node)
2688    }
2689
2690    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2691    /// computes the anti-join of the items in the input -- i.e. returns
2692    /// unique items in the first input that do not have a matching key
2693    /// in the second input.
2694    ///
2695    /// # Example
2696    /// ```rust
2697    /// # #[cfg(feature = "deploy")] {
2698    /// # use hydro_lang::prelude::*;
2699    /// # use futures::StreamExt;
2700    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2701    /// let tick = process.tick();
2702    /// let stream = process
2703    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2704    ///   .batch(&tick, nondet!(/** test */));
2705    /// let batch = process
2706    ///   .source_iter(q!(vec![1, 2]))
2707    ///   .batch(&tick, nondet!(/** test */));
2708    /// stream.anti_join(batch).all_ticks()
2709    /// # }, |mut stream| async move {
2710    /// # for w in vec![(3, 'c'), (4, 'd')] {
2711    /// #     assert_eq!(stream.next().await.unwrap(), w);
2712    /// # }
2713    /// # }));
2714    /// # }
2715    pub fn anti_join<O2: Ordering, R2: Retries>(
2716        self,
2717        n: Stream<K, L, Bounded, O2, R2>,
2718    ) -> Stream<(K, V1), L, B, O, R>
2719    where
2720        K: Eq + Hash,
2721    {
2722        check_matching_location(&self.location, &n.location);
2723
2724        Stream::new(
2725            self.location.clone(),
2726            HydroNode::AntiJoin {
2727                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2728                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2729                metadata: self
2730                    .location
2731                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2732            },
2733        )
2734    }
2735}
2736
2737impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2738    Stream<(K, V), L, B, O, R>
2739{
2740    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2741    /// is used as the key and the second element is added to the entries associated with that key.
2742    ///
2743    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2744    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2745    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2746    /// total ordering _within_ each group but no ordering _across_ groups.
2747    ///
2748    /// # Example
2749    /// ```rust
2750    /// # #[cfg(feature = "deploy")] {
2751    /// # use hydro_lang::prelude::*;
2752    /// # use futures::StreamExt;
2753    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2754    /// process
2755    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2756    ///     .into_keyed()
2757    /// #   .entries()
2758    /// # }, |mut stream| async move {
2759    /// // { 1: [2, 3], 2: [4] }
2760    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2761    /// #     assert_eq!(stream.next().await.unwrap(), w);
2762    /// # }
2763    /// # }));
2764    /// # }
2765    /// ```
2766    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2767        KeyedStream::new(
2768            self.location.clone(),
2769            HydroNode::Cast {
2770                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2771                metadata: self
2772                    .location
2773                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2774            },
2775        )
2776    }
2777}
2778
2779impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2780where
2781    K: Eq + Hash,
2782    L: Location<'a>,
2783{
2784    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2785    /// # Example
2786    /// ```rust
2787    /// # #[cfg(feature = "deploy")] {
2788    /// # use hydro_lang::prelude::*;
2789    /// # use futures::StreamExt;
2790    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2791    /// let tick = process.tick();
2792    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2793    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2794    /// batch.keys().all_ticks()
2795    /// # }, |mut stream| async move {
2796    /// // 1, 2
2797    /// # assert_eq!(stream.next().await.unwrap(), 1);
2798    /// # assert_eq!(stream.next().await.unwrap(), 2);
2799    /// # }));
2800    /// # }
2801    /// ```
2802    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2803        self.into_keyed()
2804            .fold(
2805                q!(|| ()),
2806                q!(
2807                    |_, _| {},
2808                    commutative = manual_proof!(/** values are ignored */),
2809                    idempotent = manual_proof!(/** values are ignored */)
2810                ),
2811            )
2812            .keys()
2813    }
2814}
2815
2816impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2817where
2818    L: Location<'a> + NoTick,
2819{
2820    /// Returns a stream corresponding to the latest batch of elements being atomically
2821    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2822    /// the order of the input.
2823    ///
2824    /// # Non-Determinism
2825    /// The batch boundaries are non-deterministic and may change across executions.
2826    pub fn batch_atomic(
2827        self,
2828        tick: &Tick<L>,
2829        _nondet: NonDet,
2830    ) -> Stream<T, Tick<L>, Bounded, O, R> {
2831        Stream::new(
2832            tick.clone(),
2833            HydroNode::Batch {
2834                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2835                metadata: tick
2836                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2837            },
2838        )
2839    }
2840
2841    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2842    /// See [`Stream::atomic`] for more details.
2843    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2844        Stream::new(
2845            self.location.tick.l.clone(),
2846            HydroNode::EndAtomic {
2847                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2848                metadata: self
2849                    .location
2850                    .tick
2851                    .l
2852                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2853            },
2854        )
2855    }
2856}
2857
2858impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2859where
2860    L: Location<'a> + NoTick + NoAtomic,
2861    F: Future<Output = T>,
2862{
2863    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2864    /// Future outputs are produced as available, regardless of input arrival order.
2865    ///
2866    /// # Example
2867    /// ```rust
2868    /// # #[cfg(feature = "deploy")] {
2869    /// # use std::collections::HashSet;
2870    /// # use futures::StreamExt;
2871    /// # use hydro_lang::prelude::*;
2872    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2873    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2874    ///     .map(q!(|x| async move {
2875    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2876    ///         x
2877    ///     }))
2878    ///     .resolve_futures()
2879    /// #   },
2880    /// #   |mut stream| async move {
2881    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2882    /// #       let mut output = HashSet::new();
2883    /// #       for _ in 1..10 {
2884    /// #           output.insert(stream.next().await.unwrap());
2885    /// #       }
2886    /// #       assert_eq!(
2887    /// #           output,
2888    /// #           HashSet::<i32>::from_iter(1..10)
2889    /// #       );
2890    /// #   },
2891    /// # ));
2892    /// # }
2893    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2894        Stream::new(
2895            self.location.clone(),
2896            HydroNode::ResolveFutures {
2897                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2898                metadata: self
2899                    .location
2900                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2901            },
2902        )
2903    }
2904
2905    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2906    /// Future outputs are produced in the same order as the input stream.
2907    ///
2908    /// # Example
2909    /// ```rust
2910    /// # #[cfg(feature = "deploy")] {
2911    /// # use std::collections::HashSet;
2912    /// # use futures::StreamExt;
2913    /// # use hydro_lang::prelude::*;
2914    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2915    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2916    ///     .map(q!(|x| async move {
2917    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2918    ///         x
2919    ///     }))
2920    ///     .resolve_futures_ordered()
2921    /// #   },
2922    /// #   |mut stream| async move {
2923    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2924    /// #       let mut output = Vec::new();
2925    /// #       for _ in 1..10 {
2926    /// #           output.push(stream.next().await.unwrap());
2927    /// #       }
2928    /// #       assert_eq!(
2929    /// #           output,
2930    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2931    /// #       );
2932    /// #   },
2933    /// # ));
2934    /// # }
2935    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2936        Stream::new(
2937            self.location.clone(),
2938            HydroNode::ResolveFuturesOrdered {
2939                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2940                metadata: self
2941                    .location
2942                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2943            },
2944        )
2945    }
2946}
2947
2948impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2949where
2950    L: Location<'a>,
2951{
2952    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2953    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2954    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2955        Stream::new(
2956            self.location.outer().clone(),
2957            HydroNode::YieldConcat {
2958                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2959                metadata: self
2960                    .location
2961                    .outer()
2962                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2963            },
2964        )
2965    }
2966
2967    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2968    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2969    ///
2970    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2971    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2972    /// stream's [`Tick`] context.
2973    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2974        let out_location = Atomic {
2975            tick: self.location.clone(),
2976        };
2977
2978        Stream::new(
2979            out_location.clone(),
2980            HydroNode::YieldConcat {
2981                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2982                metadata: out_location
2983                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2984            },
2985        )
2986    }
2987
2988    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2989    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2990    /// input.
2991    ///
2992    /// This API is particularly useful for stateful computation on batches of data, such as
2993    /// maintaining an accumulated state that is up to date with the current batch.
2994    ///
2995    /// # Example
2996    /// ```rust
2997    /// # #[cfg(feature = "deploy")] {
2998    /// # use hydro_lang::prelude::*;
2999    /// # use futures::StreamExt;
3000    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3001    /// let tick = process.tick();
3002    /// # // ticks are lazy by default, forces the second tick to run
3003    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3004    /// # let batch_first_tick = process
3005    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3006    /// #  .batch(&tick, nondet!(/** test */));
3007    /// # let batch_second_tick = process
3008    /// #   .source_iter(q!(vec![5, 6, 7]))
3009    /// #   .batch(&tick, nondet!(/** test */))
3010    /// #   .defer_tick(); // appears on the second tick
3011    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3012    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3013    ///
3014    /// input.batch(&tick, nondet!(/** test */))
3015    ///     .across_ticks(|s| s.count()).all_ticks()
3016    /// # }, |mut stream| async move {
3017    /// // [4, 7]
3018    /// assert_eq!(stream.next().await.unwrap(), 4);
3019    /// assert_eq!(stream.next().await.unwrap(), 7);
3020    /// # }));
3021    /// # }
3022    /// ```
3023    pub fn across_ticks<Out: BatchAtomic>(
3024        self,
3025        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3026    ) -> Out::Batched {
3027        thunk(self.all_ticks_atomic()).batched_atomic()
3028    }
3029
3030    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3031    /// always has the elements of `self` at tick `T - 1`.
3032    ///
3033    /// At tick `0`, the output stream is empty, since there is no previous tick.
3034    ///
3035    /// This operator enables stateful iterative processing with ticks, by sending data from one
3036    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3037    ///
3038    /// # Example
3039    /// ```rust
3040    /// # #[cfg(feature = "deploy")] {
3041    /// # use hydro_lang::prelude::*;
3042    /// # use futures::StreamExt;
3043    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3044    /// let tick = process.tick();
3045    /// // ticks are lazy by default, forces the second tick to run
3046    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3047    ///
3048    /// let batch_first_tick = process
3049    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3050    ///   .batch(&tick, nondet!(/** test */));
3051    /// let batch_second_tick = process
3052    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3053    ///   .batch(&tick, nondet!(/** test */))
3054    ///   .defer_tick(); // appears on the second tick
3055    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3056    ///
3057    /// changes_across_ticks.clone().filter_not_in(
3058    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3059    /// ).all_ticks()
3060    /// # }, |mut stream| async move {
3061    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3062    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3063    /// #     assert_eq!(stream.next().await.unwrap(), w);
3064    /// # }
3065    /// # }));
3066    /// # }
3067    /// ```
3068    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3069        Stream::new(
3070            self.location.clone(),
3071            HydroNode::DeferTick {
3072                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3073                metadata: self
3074                    .location
3075                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3076            },
3077        )
3078    }
3079}
3080
3081#[cfg(test)]
3082mod tests {
3083    #[cfg(feature = "deploy")]
3084    use futures::{SinkExt, StreamExt};
3085    #[cfg(feature = "deploy")]
3086    use hydro_deploy::Deployment;
3087    #[cfg(feature = "deploy")]
3088    use serde::{Deserialize, Serialize};
3089    #[cfg(any(feature = "deploy", feature = "sim"))]
3090    use stageleft::q;
3091
3092    #[cfg(any(feature = "deploy", feature = "sim"))]
3093    use crate::compile::builder::FlowBuilder;
3094    #[cfg(feature = "deploy")]
3095    use crate::live_collections::sliced::sliced;
3096    #[cfg(feature = "deploy")]
3097    use crate::live_collections::stream::ExactlyOnce;
3098    #[cfg(feature = "sim")]
3099    use crate::live_collections::stream::NoOrder;
3100    #[cfg(any(feature = "deploy", feature = "sim"))]
3101    use crate::live_collections::stream::TotalOrder;
3102    #[cfg(any(feature = "deploy", feature = "sim"))]
3103    use crate::location::Location;
3104    #[cfg(feature = "sim")]
3105    use crate::networking::TCP;
3106    #[cfg(any(feature = "deploy", feature = "sim"))]
3107    use crate::nondet::nondet;
3108
3109    mod backtrace_chained_ops;
3110
3111    #[cfg(feature = "deploy")]
3112    struct P1 {}
3113    #[cfg(feature = "deploy")]
3114    struct P2 {}
3115
3116    #[cfg(feature = "deploy")]
3117    #[derive(Serialize, Deserialize, Debug)]
3118    struct SendOverNetwork {
3119        n: u32,
3120    }
3121
3122    #[cfg(feature = "deploy")]
3123    #[tokio::test]
3124    async fn first_ten_distributed() {
3125        use crate::networking::TCP;
3126
3127        let mut deployment = Deployment::new();
3128
3129        let mut flow = FlowBuilder::new();
3130        let first_node = flow.process::<P1>();
3131        let second_node = flow.process::<P2>();
3132        let external = flow.external::<P2>();
3133
3134        let numbers = first_node.source_iter(q!(0..10));
3135        let out_port = numbers
3136            .map(q!(|n| SendOverNetwork { n }))
3137            .send(&second_node, TCP.fail_stop().bincode())
3138            .send_bincode_external(&external);
3139
3140        let nodes = flow
3141            .with_process(&first_node, deployment.Localhost())
3142            .with_process(&second_node, deployment.Localhost())
3143            .with_external(&external, deployment.Localhost())
3144            .deploy(&mut deployment);
3145
3146        deployment.deploy().await.unwrap();
3147
3148        let mut external_out = nodes.connect(out_port).await;
3149
3150        deployment.start().await.unwrap();
3151
3152        for i in 0..10 {
3153            assert_eq!(external_out.next().await.unwrap().n, i);
3154        }
3155    }
3156
3157    #[cfg(feature = "deploy")]
3158    #[tokio::test]
3159    async fn first_cardinality() {
3160        let mut deployment = Deployment::new();
3161
3162        let mut flow = FlowBuilder::new();
3163        let node = flow.process::<()>();
3164        let external = flow.external::<()>();
3165
3166        let node_tick = node.tick();
3167        let count = node_tick
3168            .singleton(q!([1, 2, 3]))
3169            .into_stream()
3170            .flatten_ordered()
3171            .first()
3172            .into_stream()
3173            .count()
3174            .all_ticks()
3175            .send_bincode_external(&external);
3176
3177        let nodes = flow
3178            .with_process(&node, deployment.Localhost())
3179            .with_external(&external, deployment.Localhost())
3180            .deploy(&mut deployment);
3181
3182        deployment.deploy().await.unwrap();
3183
3184        let mut external_out = nodes.connect(count).await;
3185
3186        deployment.start().await.unwrap();
3187
3188        assert_eq!(external_out.next().await.unwrap(), 1);
3189    }
3190
3191    #[cfg(feature = "deploy")]
3192    #[tokio::test]
3193    async fn unbounded_reduce_remembers_state() {
3194        let mut deployment = Deployment::new();
3195
3196        let mut flow = FlowBuilder::new();
3197        let node = flow.process::<()>();
3198        let external = flow.external::<()>();
3199
3200        let (input_port, input) = node.source_external_bincode(&external);
3201        let out = input
3202            .reduce(q!(|acc, v| *acc += v))
3203            .sample_eager(nondet!(/** test */))
3204            .send_bincode_external(&external);
3205
3206        let nodes = flow
3207            .with_process(&node, deployment.Localhost())
3208            .with_external(&external, deployment.Localhost())
3209            .deploy(&mut deployment);
3210
3211        deployment.deploy().await.unwrap();
3212
3213        let mut external_in = nodes.connect(input_port).await;
3214        let mut external_out = nodes.connect(out).await;
3215
3216        deployment.start().await.unwrap();
3217
3218        external_in.send(1).await.unwrap();
3219        assert_eq!(external_out.next().await.unwrap(), 1);
3220
3221        external_in.send(2).await.unwrap();
3222        assert_eq!(external_out.next().await.unwrap(), 3);
3223    }
3224
3225    #[cfg(feature = "deploy")]
3226    #[tokio::test]
3227    async fn top_level_bounded_cross_singleton() {
3228        let mut deployment = Deployment::new();
3229
3230        let mut flow = FlowBuilder::new();
3231        let node = flow.process::<()>();
3232        let external = flow.external::<()>();
3233
3234        let (input_port, input) =
3235            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3236
3237        let out = input
3238            .cross_singleton(
3239                node.source_iter(q!(vec![1, 2, 3]))
3240                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3241            )
3242            .send_bincode_external(&external);
3243
3244        let nodes = flow
3245            .with_process(&node, deployment.Localhost())
3246            .with_external(&external, deployment.Localhost())
3247            .deploy(&mut deployment);
3248
3249        deployment.deploy().await.unwrap();
3250
3251        let mut external_in = nodes.connect(input_port).await;
3252        let mut external_out = nodes.connect(out).await;
3253
3254        deployment.start().await.unwrap();
3255
3256        external_in.send(1).await.unwrap();
3257        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3258
3259        external_in.send(2).await.unwrap();
3260        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3261    }
3262
3263    #[cfg(feature = "deploy")]
3264    #[tokio::test]
3265    async fn top_level_bounded_reduce_cardinality() {
3266        let mut deployment = Deployment::new();
3267
3268        let mut flow = FlowBuilder::new();
3269        let node = flow.process::<()>();
3270        let external = flow.external::<()>();
3271
3272        let (input_port, input) =
3273            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3274
3275        let out = sliced! {
3276            let input = use(input, nondet!(/** test */));
3277            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3278            input.cross_singleton(v.into_stream().count())
3279        }
3280        .send_bincode_external(&external);
3281
3282        let nodes = flow
3283            .with_process(&node, deployment.Localhost())
3284            .with_external(&external, deployment.Localhost())
3285            .deploy(&mut deployment);
3286
3287        deployment.deploy().await.unwrap();
3288
3289        let mut external_in = nodes.connect(input_port).await;
3290        let mut external_out = nodes.connect(out).await;
3291
3292        deployment.start().await.unwrap();
3293
3294        external_in.send(1).await.unwrap();
3295        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3296
3297        external_in.send(2).await.unwrap();
3298        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3299    }
3300
3301    #[cfg(feature = "deploy")]
3302    #[tokio::test]
3303    async fn top_level_bounded_into_singleton_cardinality() {
3304        let mut deployment = Deployment::new();
3305
3306        let mut flow = FlowBuilder::new();
3307        let node = flow.process::<()>();
3308        let external = flow.external::<()>();
3309
3310        let (input_port, input) =
3311            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3312
3313        let out = sliced! {
3314            let input = use(input, nondet!(/** test */));
3315            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3316            input.cross_singleton(v.into_stream().count())
3317        }
3318        .send_bincode_external(&external);
3319
3320        let nodes = flow
3321            .with_process(&node, deployment.Localhost())
3322            .with_external(&external, deployment.Localhost())
3323            .deploy(&mut deployment);
3324
3325        deployment.deploy().await.unwrap();
3326
3327        let mut external_in = nodes.connect(input_port).await;
3328        let mut external_out = nodes.connect(out).await;
3329
3330        deployment.start().await.unwrap();
3331
3332        external_in.send(1).await.unwrap();
3333        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3334
3335        external_in.send(2).await.unwrap();
3336        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3337    }
3338
3339    #[cfg(feature = "deploy")]
3340    #[tokio::test]
3341    async fn atomic_fold_replays_each_tick() {
3342        let mut deployment = Deployment::new();
3343
3344        let mut flow = FlowBuilder::new();
3345        let node = flow.process::<()>();
3346        let external = flow.external::<()>();
3347
3348        let (input_port, input) =
3349            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3350        let tick = node.tick();
3351
3352        let out = input
3353            .batch(&tick, nondet!(/** test */))
3354            .cross_singleton(
3355                node.source_iter(q!(vec![1, 2, 3]))
3356                    .atomic()
3357                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3358                    .snapshot_atomic(&tick, nondet!(/** test */)),
3359            )
3360            .all_ticks()
3361            .send_bincode_external(&external);
3362
3363        let nodes = flow
3364            .with_process(&node, deployment.Localhost())
3365            .with_external(&external, deployment.Localhost())
3366            .deploy(&mut deployment);
3367
3368        deployment.deploy().await.unwrap();
3369
3370        let mut external_in = nodes.connect(input_port).await;
3371        let mut external_out = nodes.connect(out).await;
3372
3373        deployment.start().await.unwrap();
3374
3375        external_in.send(1).await.unwrap();
3376        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3377
3378        external_in.send(2).await.unwrap();
3379        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3380    }
3381
3382    #[cfg(feature = "deploy")]
3383    #[tokio::test]
3384    async fn unbounded_scan_remembers_state() {
3385        let mut deployment = Deployment::new();
3386
3387        let mut flow = FlowBuilder::new();
3388        let node = flow.process::<()>();
3389        let external = flow.external::<()>();
3390
3391        let (input_port, input) = node.source_external_bincode(&external);
3392        let out = input
3393            .scan(
3394                q!(|| 0),
3395                q!(|acc, v| {
3396                    *acc += v;
3397                    Some(*acc)
3398                }),
3399            )
3400            .send_bincode_external(&external);
3401
3402        let nodes = flow
3403            .with_process(&node, deployment.Localhost())
3404            .with_external(&external, deployment.Localhost())
3405            .deploy(&mut deployment);
3406
3407        deployment.deploy().await.unwrap();
3408
3409        let mut external_in = nodes.connect(input_port).await;
3410        let mut external_out = nodes.connect(out).await;
3411
3412        deployment.start().await.unwrap();
3413
3414        external_in.send(1).await.unwrap();
3415        assert_eq!(external_out.next().await.unwrap(), 1);
3416
3417        external_in.send(2).await.unwrap();
3418        assert_eq!(external_out.next().await.unwrap(), 3);
3419    }
3420
3421    #[cfg(feature = "deploy")]
3422    #[tokio::test]
3423    async fn unbounded_enumerate_remembers_state() {
3424        let mut deployment = Deployment::new();
3425
3426        let mut flow = FlowBuilder::new();
3427        let node = flow.process::<()>();
3428        let external = flow.external::<()>();
3429
3430        let (input_port, input) = node.source_external_bincode(&external);
3431        let out = input.enumerate().send_bincode_external(&external);
3432
3433        let nodes = flow
3434            .with_process(&node, deployment.Localhost())
3435            .with_external(&external, deployment.Localhost())
3436            .deploy(&mut deployment);
3437
3438        deployment.deploy().await.unwrap();
3439
3440        let mut external_in = nodes.connect(input_port).await;
3441        let mut external_out = nodes.connect(out).await;
3442
3443        deployment.start().await.unwrap();
3444
3445        external_in.send(1).await.unwrap();
3446        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3447
3448        external_in.send(2).await.unwrap();
3449        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3450    }
3451
3452    #[cfg(feature = "deploy")]
3453    #[tokio::test]
3454    async fn unbounded_unique_remembers_state() {
3455        let mut deployment = Deployment::new();
3456
3457        let mut flow = FlowBuilder::new();
3458        let node = flow.process::<()>();
3459        let external = flow.external::<()>();
3460
3461        let (input_port, input) =
3462            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3463        let out = input.unique().send_bincode_external(&external);
3464
3465        let nodes = flow
3466            .with_process(&node, deployment.Localhost())
3467            .with_external(&external, deployment.Localhost())
3468            .deploy(&mut deployment);
3469
3470        deployment.deploy().await.unwrap();
3471
3472        let mut external_in = nodes.connect(input_port).await;
3473        let mut external_out = nodes.connect(out).await;
3474
3475        deployment.start().await.unwrap();
3476
3477        external_in.send(1).await.unwrap();
3478        assert_eq!(external_out.next().await.unwrap(), 1);
3479
3480        external_in.send(2).await.unwrap();
3481        assert_eq!(external_out.next().await.unwrap(), 2);
3482
3483        external_in.send(1).await.unwrap();
3484        external_in.send(3).await.unwrap();
3485        assert_eq!(external_out.next().await.unwrap(), 3);
3486    }
3487
3488    #[cfg(feature = "sim")]
3489    #[test]
3490    #[should_panic]
3491    fn sim_batch_nondet_size() {
3492        let mut flow = FlowBuilder::new();
3493        let node = flow.process::<()>();
3494
3495        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3496
3497        let tick = node.tick();
3498        let out_recv = input
3499            .batch(&tick, nondet!(/** test */))
3500            .count()
3501            .all_ticks()
3502            .sim_output();
3503
3504        flow.sim().exhaustive(async || {
3505            in_send.send(());
3506            in_send.send(());
3507            in_send.send(());
3508
3509            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3510        });
3511    }
3512
3513    #[cfg(feature = "sim")]
3514    #[test]
3515    fn sim_batch_preserves_order() {
3516        let mut flow = FlowBuilder::new();
3517        let node = flow.process::<()>();
3518
3519        let (in_send, input) = node.sim_input();
3520
3521        let tick = node.tick();
3522        let out_recv = input
3523            .batch(&tick, nondet!(/** test */))
3524            .all_ticks()
3525            .sim_output();
3526
3527        flow.sim().exhaustive(async || {
3528            in_send.send(1);
3529            in_send.send(2);
3530            in_send.send(3);
3531
3532            out_recv.assert_yields_only([1, 2, 3]).await;
3533        });
3534    }
3535
3536    #[cfg(feature = "sim")]
3537    #[test]
3538    #[should_panic]
3539    fn sim_batch_unordered_shuffles() {
3540        let mut flow = FlowBuilder::new();
3541        let node = flow.process::<()>();
3542
3543        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3544
3545        let tick = node.tick();
3546        let batch = input.batch(&tick, nondet!(/** test */));
3547        let out_recv = batch
3548            .clone()
3549            .min()
3550            .zip(batch.max())
3551            .all_ticks()
3552            .sim_output();
3553
3554        flow.sim().exhaustive(async || {
3555            in_send.send_many_unordered([1, 2, 3]);
3556
3557            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3558                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3559            }
3560        });
3561    }
3562
3563    #[cfg(feature = "sim")]
3564    #[test]
3565    fn sim_batch_unordered_shuffles_count() {
3566        let mut flow = FlowBuilder::new();
3567        let node = flow.process::<()>();
3568
3569        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3570
3571        let tick = node.tick();
3572        let batch = input.batch(&tick, nondet!(/** test */));
3573        let out_recv = batch.all_ticks().sim_output();
3574
3575        let instance_count = flow.sim().exhaustive(async || {
3576            in_send.send_many_unordered([1, 2, 3, 4]);
3577            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3578        });
3579
3580        assert_eq!(
3581            instance_count,
3582            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3583        )
3584    }
3585
3586    #[cfg(feature = "sim")]
3587    #[test]
3588    #[should_panic]
3589    fn sim_observe_order_batched() {
3590        let mut flow = FlowBuilder::new();
3591        let node = flow.process::<()>();
3592
3593        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3594
3595        let tick = node.tick();
3596        let batch = input.batch(&tick, nondet!(/** test */));
3597        let out_recv = batch
3598            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3599            .all_ticks()
3600            .sim_output();
3601
3602        flow.sim().exhaustive(async || {
3603            in_send.send_many_unordered([1, 2, 3, 4]);
3604            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3605        });
3606    }
3607
3608    #[cfg(feature = "sim")]
3609    #[test]
3610    fn sim_observe_order_batched_count() {
3611        let mut flow = FlowBuilder::new();
3612        let node = flow.process::<()>();
3613
3614        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3615
3616        let tick = node.tick();
3617        let batch = input.batch(&tick, nondet!(/** test */));
3618        let out_recv = batch
3619            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3620            .all_ticks()
3621            .sim_output();
3622
3623        let instance_count = flow.sim().exhaustive(async || {
3624            in_send.send_many_unordered([1, 2, 3, 4]);
3625            let _ = out_recv.collect::<Vec<_>>().await;
3626        });
3627
3628        assert_eq!(
3629            instance_count,
3630            192 // 4! * 2^{4 - 1}
3631        )
3632    }
3633
3634    #[cfg(feature = "sim")]
3635    #[test]
3636    fn sim_unordered_count_instance_count() {
3637        let mut flow = FlowBuilder::new();
3638        let node = flow.process::<()>();
3639
3640        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3641
3642        let tick = node.tick();
3643        let out_recv = input
3644            .count()
3645            .snapshot(&tick, nondet!(/** test */))
3646            .all_ticks()
3647            .sim_output();
3648
3649        let instance_count = flow.sim().exhaustive(async || {
3650            in_send.send_many_unordered([1, 2, 3, 4]);
3651            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3652        });
3653
3654        assert_eq!(
3655            instance_count,
3656            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3657        )
3658    }
3659
3660    #[cfg(feature = "sim")]
3661    #[test]
3662    fn sim_top_level_assume_ordering() {
3663        let mut flow = FlowBuilder::new();
3664        let node = flow.process::<()>();
3665
3666        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3667
3668        let out_recv = input
3669            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3670            .sim_output();
3671
3672        let instance_count = flow.sim().exhaustive(async || {
3673            in_send.send_many_unordered([1, 2, 3]);
3674            let mut out = out_recv.collect::<Vec<_>>().await;
3675            out.sort();
3676            assert_eq!(out, vec![1, 2, 3]);
3677        });
3678
3679        assert_eq!(instance_count, 6)
3680    }
3681
3682    #[cfg(feature = "sim")]
3683    #[test]
3684    fn sim_top_level_assume_ordering_cycle_back() {
3685        let mut flow = FlowBuilder::new();
3686        let node = flow.process::<()>();
3687        let node2 = flow.process::<()>();
3688
3689        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3690
3691        let (complete_cycle_back, cycle_back) =
3692            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3693        let ordered = input
3694            .merge_unordered(cycle_back)
3695            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3696        complete_cycle_back.complete(
3697            ordered
3698                .clone()
3699                .map(q!(|v| v + 1))
3700                .filter(q!(|v| v % 2 == 1))
3701                .send(&node2, TCP.fail_stop().bincode())
3702                .send(&node, TCP.fail_stop().bincode()),
3703        );
3704
3705        let out_recv = ordered.sim_output();
3706
3707        let mut saw = false;
3708        let instance_count = flow.sim().exhaustive(async || {
3709            in_send.send_many_unordered([0, 2]);
3710            let out = out_recv.collect::<Vec<_>>().await;
3711
3712            if out.starts_with(&[0, 1, 2]) {
3713                saw = true;
3714            }
3715        });
3716
3717        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3718        assert_eq!(instance_count, 6);
3719    }
3720
3721    #[cfg(feature = "sim")]
3722    #[test]
3723    fn sim_top_level_assume_ordering_cycle_back_tick() {
3724        let mut flow = FlowBuilder::new();
3725        let node = flow.process::<()>();
3726        let node2 = flow.process::<()>();
3727
3728        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3729
3730        let (complete_cycle_back, cycle_back) =
3731            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3732        let ordered = input
3733            .merge_unordered(cycle_back)
3734            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3735        complete_cycle_back.complete(
3736            ordered
3737                .clone()
3738                .batch(&node.tick(), nondet!(/** test */))
3739                .all_ticks()
3740                .map(q!(|v| v + 1))
3741                .filter(q!(|v| v % 2 == 1))
3742                .send(&node2, TCP.fail_stop().bincode())
3743                .send(&node, TCP.fail_stop().bincode()),
3744        );
3745
3746        let out_recv = ordered.sim_output();
3747
3748        let mut saw = false;
3749        let instance_count = flow.sim().exhaustive(async || {
3750            in_send.send_many_unordered([0, 2]);
3751            let out = out_recv.collect::<Vec<_>>().await;
3752
3753            if out.starts_with(&[0, 1, 2]) {
3754                saw = true;
3755            }
3756        });
3757
3758        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3759        assert_eq!(instance_count, 58);
3760    }
3761
3762    #[cfg(feature = "sim")]
3763    #[test]
3764    fn sim_top_level_assume_ordering_multiple() {
3765        let mut flow = FlowBuilder::new();
3766        let node = flow.process::<()>();
3767        let node2 = flow.process::<()>();
3768
3769        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3770        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3771
3772        let (complete_cycle_back, cycle_back) =
3773            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3774        let input1_ordered = input
3775            .clone()
3776            .merge_unordered(cycle_back)
3777            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3778        let foo = input1_ordered
3779            .clone()
3780            .map(q!(|v| v + 3))
3781            .weaken_ordering::<NoOrder>()
3782            .merge_unordered(input2)
3783            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3784
3785        complete_cycle_back.complete(
3786            foo.filter(q!(|v| *v == 3))
3787                .send(&node2, TCP.fail_stop().bincode())
3788                .send(&node, TCP.fail_stop().bincode()),
3789        );
3790
3791        let out_recv = input1_ordered.sim_output();
3792
3793        let mut saw = false;
3794        let instance_count = flow.sim().exhaustive(async || {
3795            in_send.send_many_unordered([0, 1]);
3796            let out = out_recv.collect::<Vec<_>>().await;
3797
3798            if out.starts_with(&[0, 3, 1]) {
3799                saw = true;
3800            }
3801        });
3802
3803        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3804        assert_eq!(instance_count, 24);
3805    }
3806
3807    #[cfg(feature = "sim")]
3808    #[test]
3809    fn sim_atomic_assume_ordering_cycle_back() {
3810        let mut flow = FlowBuilder::new();
3811        let node = flow.process::<()>();
3812        let node2 = flow.process::<()>();
3813
3814        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3815
3816        let (complete_cycle_back, cycle_back) =
3817            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3818        let ordered = input
3819            .merge_unordered(cycle_back)
3820            .atomic()
3821            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3822            .end_atomic();
3823        complete_cycle_back.complete(
3824            ordered
3825                .clone()
3826                .map(q!(|v| v + 1))
3827                .filter(q!(|v| v % 2 == 1))
3828                .send(&node2, TCP.fail_stop().bincode())
3829                .send(&node, TCP.fail_stop().bincode()),
3830        );
3831
3832        let out_recv = ordered.sim_output();
3833
3834        let instance_count = flow.sim().exhaustive(async || {
3835            in_send.send_many_unordered([0, 2]);
3836            let out = out_recv.collect::<Vec<_>>().await;
3837            assert_eq!(out.len(), 4);
3838        });
3839        assert_eq!(instance_count, 22);
3840    }
3841
3842    #[cfg(feature = "deploy")]
3843    #[tokio::test]
3844    async fn partition_evens_odds() {
3845        let mut deployment = Deployment::new();
3846
3847        let mut flow = FlowBuilder::new();
3848        let node = flow.process::<()>();
3849        let external = flow.external::<()>();
3850
3851        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3852        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3853        let evens_port = evens.send_bincode_external(&external);
3854        let odds_port = odds.send_bincode_external(&external);
3855
3856        let nodes = flow
3857            .with_process(&node, deployment.Localhost())
3858            .with_external(&external, deployment.Localhost())
3859            .deploy(&mut deployment);
3860
3861        deployment.deploy().await.unwrap();
3862
3863        let mut evens_out = nodes.connect(evens_port).await;
3864        let mut odds_out = nodes.connect(odds_port).await;
3865
3866        deployment.start().await.unwrap();
3867
3868        let mut even_results = Vec::new();
3869        for _ in 0..3 {
3870            even_results.push(evens_out.next().await.unwrap());
3871        }
3872        even_results.sort();
3873        assert_eq!(even_results, vec![2, 4, 6]);
3874
3875        let mut odd_results = Vec::new();
3876        for _ in 0..3 {
3877            odd_results.push(odds_out.next().await.unwrap());
3878        }
3879        odd_results.sort();
3880        assert_eq!(odd_results, vec![1, 3, 5]);
3881    }
3882
3883    #[cfg(feature = "deploy")]
3884    #[tokio::test]
3885    async fn unconsumed_inspect_still_runs() {
3886        use crate::deploy::DeployCrateWrapper;
3887
3888        let mut deployment = Deployment::new();
3889
3890        let mut flow = FlowBuilder::new();
3891        let node = flow.process::<()>();
3892
3893        // The return value of .inspect() is intentionally dropped.
3894        // Before the Null-root fix, this would silently do nothing.
3895        node.source_iter(q!(0..5))
3896            .inspect(q!(|x| println!("inspect: {}", x)));
3897
3898        let nodes = flow
3899            .with_process(&node, deployment.Localhost())
3900            .deploy(&mut deployment);
3901
3902        deployment.deploy().await.unwrap();
3903
3904        let mut stdout = nodes.get_process(&node).stdout();
3905
3906        deployment.start().await.unwrap();
3907
3908        let mut lines = Vec::new();
3909        for _ in 0..5 {
3910            lines.push(stdout.recv().await.unwrap());
3911        }
3912        lines.sort();
3913        assert_eq!(
3914            lines,
3915            vec![
3916                "inspect: 0",
3917                "inspect: 1",
3918                "inspect: 2",
3919                "inspect: 3",
3920                "inspect: 4",
3921            ]
3922        );
3923    }
3924
3925    #[cfg(feature = "sim")]
3926    #[test]
3927    fn sim_limit() {
3928        let mut flow = FlowBuilder::new();
3929        let node = flow.process::<()>();
3930
3931        let (in_send, input) = node.sim_input();
3932
3933        let out_recv = input.limit(q!(3)).sim_output();
3934
3935        flow.sim().exhaustive(async || {
3936            in_send.send(1);
3937            in_send.send(2);
3938            in_send.send(3);
3939            in_send.send(4);
3940            in_send.send(5);
3941
3942            out_recv.assert_yields_only([1, 2, 3]).await;
3943        });
3944    }
3945
3946    #[cfg(feature = "sim")]
3947    #[test]
3948    fn sim_limit_zero() {
3949        let mut flow = FlowBuilder::new();
3950        let node = flow.process::<()>();
3951
3952        let (in_send, input) = node.sim_input();
3953
3954        let out_recv = input.limit(q!(0)).sim_output();
3955
3956        flow.sim().exhaustive(async || {
3957            in_send.send(1);
3958            in_send.send(2);
3959
3960            out_recv.assert_yields_only::<i32, _>([]).await;
3961        });
3962    }
3963
3964    #[cfg(feature = "sim")]
3965    #[test]
3966    fn sim_merge_ordered() {
3967        let mut flow = FlowBuilder::new();
3968        let node = flow.process::<()>();
3969
3970        let (in_send, input) = node.sim_input();
3971        let (in_send2, input2) = node.sim_input();
3972
3973        let out_recv = input
3974            .merge_ordered(input2, nondet!(/** test */))
3975            .sim_output();
3976
3977        let mut saw_out_of_order = false;
3978        let instances = flow.sim().exhaustive(async || {
3979            in_send.send(1);
3980            in_send.send(2);
3981            in_send2.send(3);
3982            in_send2.send(4);
3983
3984            let out = out_recv.collect::<Vec<_>>().await;
3985
3986            if out == [1, 3, 2, 4] {
3987                saw_out_of_order = true;
3988            }
3989
3990            // Assert ordering preservation: elements from each input must
3991            // appear in their original relative order.
3992            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
3993            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
3994            assert_eq!(
3995                first_elements,
3996                vec![1, 2],
3997                "first input order violated: {:?}",
3998                out
3999            );
4000            assert_eq!(
4001                second_elements,
4002                vec![3, 4],
4003                "second input order violated: {:?}",
4004                out
4005            );
4006
4007            first_elements.append(&mut second_elements);
4008            first_elements.sort();
4009            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4010        });
4011
4012        assert!(saw_out_of_order);
4013        assert_eq!(instances, 6);
4014    }
4015
4016    /// Tests that merge_ordered passes through elements when only one input
4017    /// has data.
4018    #[cfg(feature = "sim")]
4019    #[test]
4020    fn sim_merge_ordered_one_empty() {
4021        let mut flow = FlowBuilder::new();
4022        let node = flow.process::<()>();
4023
4024        let (in_send, input) = node.sim_input();
4025        let (_in_send2, input2) = node.sim_input();
4026
4027        let out_recv = input
4028            .merge_ordered(input2, nondet!(/** test */))
4029            .sim_output();
4030
4031        let instances = flow.sim().exhaustive(async || {
4032            in_send.send(1);
4033            in_send.send(2);
4034
4035            let out = out_recv.collect::<Vec<_>>().await;
4036            assert_eq!(out, vec![1, 2]);
4037        });
4038
4039        // Only one possible interleaving when one input is empty
4040        assert_eq!(instances, 1);
4041    }
4042
4043    /// Tests that merge_ordered correctly handles feedback cycles.
4044    /// An element output from merge_ordered is filtered and cycled back to
4045    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4046    /// element to arrive and potentially be emitted before elements still
4047    /// waiting on the other input.
4048    #[cfg(feature = "sim")]
4049    #[test]
4050    fn sim_merge_ordered_cycle_back() {
4051        let mut flow = FlowBuilder::new();
4052        let node = flow.process::<()>();
4053
4054        let (in_send, input) = node.sim_input();
4055
4056        // Create a forward ref for the cycle back
4057        let (complete_cycle_back, cycle_back) =
4058            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4059
4060        // merge_ordered: input (external) with cycle_back
4061        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4062
4063        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4064        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4065
4066        let out_recv = merged.sim_output();
4067
4068        // Send 1 and 2. Element 1 should cycle back as 10.
4069        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4070        let mut saw_cycle_before_second = false;
4071        flow.sim().exhaustive(async || {
4072            in_send.send(1);
4073            in_send.send(2);
4074
4075            let out = out_recv.collect::<Vec<_>>().await;
4076
4077            // 10 must always come after 1 (causal dependency)
4078            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4079            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4080            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4081
4082            // Check if we see [1, 10, 2] — the cycled element beats the second input
4083            if out == [1, 10, 2] {
4084                saw_cycle_before_second = true;
4085            }
4086
4087            let mut sorted = out;
4088            sorted.sort();
4089            assert_eq!(sorted, vec![1, 2, 10]);
4090        });
4091
4092        assert!(
4093            saw_cycle_before_second,
4094            "never saw the cycled element arrive before the second input element"
4095        );
4096    }
4097
4098    /// Tests that merge_ordered correctly interleaves when one input has a
4099    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4100    /// element 2 should be able to appear after b's elements.
4101    #[cfg(feature = "sim")]
4102    #[test]
4103    fn sim_merge_ordered_delayed() {
4104        let mut flow = FlowBuilder::new();
4105        let node = flow.process::<()>();
4106
4107        let (in_send, input) = node.sim_input();
4108        let (in_send2, input2) = node.sim_input();
4109
4110        let out_recv = input
4111            .merge_ordered(input2, nondet!(/** test */))
4112            .sim_output();
4113
4114        let mut saw_delayed_interleaving = false;
4115        flow.sim().exhaustive(async || {
4116            // Send 1 from a, and 3, 4 from b
4117            in_send.send(1);
4118            in_send2.send(3);
4119            in_send2.send(4);
4120
4121            // Collect what's available so far
4122            let first_batch = out_recv.collect::<Vec<_>>().await;
4123
4124            // Now send the delayed element 2 from a
4125            in_send.send(2);
4126            let second_batch = out_recv.collect::<Vec<_>>().await;
4127
4128            let mut all: Vec<_> = first_batch
4129                .iter()
4130                .chain(second_batch.iter())
4131                .copied()
4132                .collect();
4133
4134            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4135            if all == [1, 3, 4, 2] {
4136                saw_delayed_interleaving = true;
4137            }
4138
4139            all.sort();
4140            assert_eq!(all, vec![1, 2, 3, 4]);
4141        });
4142
4143        assert!(saw_delayed_interleaving);
4144    }
4145
4146    /// Deploy test: merge_ordered with a delayed element on one input.
4147    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4148    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4149    /// both inputs are pulled and the delayed element arrives later.
4150    #[cfg(feature = "deploy")]
4151    #[tokio::test]
4152    async fn deploy_merge_ordered_delayed() {
4153        let mut deployment = Deployment::new();
4154
4155        let mut flow = FlowBuilder::new();
4156        let node = flow.process::<()>();
4157        let external = flow.external::<()>();
4158
4159        let (input_a_port, input_a) = node.source_external_bincode(&external);
4160        let (input_b_port, input_b) = node.source_external_bincode(&external);
4161
4162        let out = input_a
4163            .assume_ordering(nondet!(/** test */))
4164            .merge_ordered(
4165                input_b.assume_ordering(nondet!(/** test */)),
4166                nondet!(/** test */),
4167            )
4168            .send_bincode_external(&external);
4169
4170        let nodes = flow
4171            .with_process(&node, deployment.Localhost())
4172            .with_external(&external, deployment.Localhost())
4173            .deploy(&mut deployment);
4174
4175        deployment.deploy().await.unwrap();
4176
4177        let mut ext_a = nodes.connect(input_a_port).await;
4178        let mut ext_b = nodes.connect(input_b_port).await;
4179        let mut ext_out = nodes.connect(out).await;
4180
4181        deployment.start().await.unwrap();
4182
4183        // Send a=1, b=3, b=4
4184        ext_a.send(1).await.unwrap();
4185        ext_b.send(3).await.unwrap();
4186        ext_b.send(4).await.unwrap();
4187
4188        // Collect the first 3 elements
4189        let mut received = Vec::new();
4190        for _ in 0..3 {
4191            received.push(ext_out.next().await.unwrap());
4192        }
4193
4194        // Now send the delayed a=2
4195        ext_a.send(2).await.unwrap();
4196        received.push(ext_out.next().await.unwrap());
4197
4198        // All elements should be present
4199        received.sort();
4200        assert_eq!(received, vec![1, 2, 3, 4]);
4201    }
4202
4203    #[cfg(feature = "deploy")]
4204    #[tokio::test]
4205    async fn monotone_fold_threshold() {
4206        use crate::properties::manual_proof;
4207
4208        let mut deployment = Deployment::new();
4209
4210        let mut flow = FlowBuilder::new();
4211        let node = flow.process::<()>();
4212        let external = flow.external::<()>();
4213
4214        let in_unbounded: super::Stream<_, _> =
4215            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4216        let sum = in_unbounded.fold(
4217            q!(|| 0),
4218            q!(
4219                |sum, v| {
4220                    *sum += v;
4221                },
4222                monotone = manual_proof!(/** test */)
4223            ),
4224        );
4225
4226        let threshold_out = sum
4227            .threshold_greater_or_equal(node.singleton(q!(7)))
4228            .send_bincode_external(&external);
4229
4230        let nodes = flow
4231            .with_process(&node, deployment.Localhost())
4232            .with_external(&external, deployment.Localhost())
4233            .deploy(&mut deployment);
4234
4235        deployment.deploy().await.unwrap();
4236
4237        let mut threshold_out = nodes.connect(threshold_out).await;
4238
4239        deployment.start().await.unwrap();
4240
4241        assert_eq!(threshold_out.next().await.unwrap(), 7);
4242    }
4243
4244    #[cfg(feature = "deploy")]
4245    #[tokio::test]
4246    async fn monotone_count_threshold() {
4247        let mut deployment = Deployment::new();
4248
4249        let mut flow = FlowBuilder::new();
4250        let node = flow.process::<()>();
4251        let external = flow.external::<()>();
4252
4253        let in_unbounded: super::Stream<_, _> =
4254            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4255        let sum = in_unbounded.count();
4256
4257        let threshold_out = sum
4258            .threshold_greater_or_equal(node.singleton(q!(3)))
4259            .send_bincode_external(&external);
4260
4261        let nodes = flow
4262            .with_process(&node, deployment.Localhost())
4263            .with_external(&external, deployment.Localhost())
4264            .deploy(&mut deployment);
4265
4266        deployment.deploy().await.unwrap();
4267
4268        let mut threshold_out = nodes.connect(threshold_out).await;
4269
4270        deployment.start().await.unwrap();
4271
4272        assert_eq!(threshold_out.next().await.unwrap(), 3);
4273    }
4274
4275    #[cfg(feature = "deploy")]
4276    #[tokio::test]
4277    async fn monotone_map_order_preserving_threshold() {
4278        use crate::properties::manual_proof;
4279
4280        let mut deployment = Deployment::new();
4281
4282        let mut flow = FlowBuilder::new();
4283        let node = flow.process::<()>();
4284        let external = flow.external::<()>();
4285
4286        let in_unbounded: super::Stream<_, _> =
4287            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4288        let sum = in_unbounded.fold(
4289            q!(|| 0),
4290            q!(
4291                |sum, v| {
4292                    *sum += v;
4293                },
4294                monotone = manual_proof!(/** test */)
4295            ),
4296        );
4297
4298        // map with order_preserving should preserve monotonicity
4299        let doubled = sum.map(q!(
4300            |v| v * 2,
4301            order_preserving = manual_proof!(/** doubling preserves order */)
4302        ));
4303
4304        let threshold_out = doubled
4305            .threshold_greater_or_equal(node.singleton(q!(14)))
4306            .send_bincode_external(&external);
4307
4308        let nodes = flow
4309            .with_process(&node, deployment.Localhost())
4310            .with_external(&external, deployment.Localhost())
4311            .deploy(&mut deployment);
4312
4313        deployment.deploy().await.unwrap();
4314
4315        let mut threshold_out = nodes.connect(threshold_out).await;
4316
4317        deployment.start().await.unwrap();
4318
4319        assert_eq!(threshold_out.next().await.unwrap(), 14);
4320    }
4321
4322    // === Compile-time type tests for join/cross_product ordering ===
4323
4324    #[cfg(any(feature = "deploy", feature = "sim"))]
4325    mod join_ordering_type_tests {
4326        use crate::live_collections::boundedness::{Bounded, Unbounded};
4327        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4328        use crate::location::{Location, Process};
4329
4330        #[expect(dead_code, reason = "compile-time type test")]
4331        fn join_unbounded_with_bounded_preserves_order<'a>(
4332            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4333            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4334        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4335            left.join(right)
4336        }
4337
4338        #[expect(dead_code, reason = "compile-time type test")]
4339        fn join_unbounded_with_unbounded_is_no_order<'a>(
4340            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4341            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4342        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4343            left.join(right)
4344        }
4345
4346        #[expect(dead_code, reason = "compile-time type test")]
4347        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4348            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4349            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4350        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4351            left.join(right)
4352        }
4353
4354        #[expect(dead_code, reason = "compile-time type test")]
4355        fn join_unbounded_noorder_with_bounded<'a>(
4356            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4357            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4358        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4359            left.join(right)
4360        }
4361
4362        // === Compile-time type tests for cross_product ordering ===
4363
4364        #[expect(dead_code, reason = "compile-time type test")]
4365        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4366            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4367            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4368        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4369            left.cross_product(right)
4370        }
4371
4372        #[expect(dead_code, reason = "compile-time type test")]
4373        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4374            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4375            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4376        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4377            left.cross_product(right)
4378        }
4379
4380        #[expect(dead_code, reason = "compile-time type test")]
4381        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4382            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4383            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4384        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4385            left.cross_product(right)
4386        }
4387    } // mod join_ordering_type_tests
4388
4389    // === Runtime correctness tests for bounded join/cross_product ===
4390
4391    #[cfg(feature = "sim")]
4392    #[test]
4393    fn cross_product_mixed_boundedness_correctness() {
4394        use stageleft::q;
4395
4396        use crate::compile::builder::FlowBuilder;
4397        use crate::nondet::nondet;
4398
4399        let mut flow = FlowBuilder::new();
4400        let process = flow.process::<()>();
4401        let tick = process.tick();
4402
4403        let left = process.source_iter(q!(vec![1, 2]));
4404        let right = process
4405            .source_iter(q!(vec!['a', 'b']))
4406            .batch(&tick, nondet!(/** test */))
4407            .all_ticks();
4408
4409        let out = left.cross_product(right).sim_output();
4410
4411        flow.sim().exhaustive(async || {
4412            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4413                .await;
4414        });
4415    }
4416
4417    #[cfg(feature = "sim")]
4418    #[test]
4419    fn join_mixed_boundedness_correctness() {
4420        use stageleft::q;
4421
4422        use crate::compile::builder::FlowBuilder;
4423        use crate::nondet::nondet;
4424
4425        let mut flow = FlowBuilder::new();
4426        let process = flow.process::<()>();
4427        let tick = process.tick();
4428
4429        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4430        let right = process
4431            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4432            .batch(&tick, nondet!(/** test */))
4433            .all_ticks();
4434
4435        let out = left.join(right).sim_output();
4436
4437        flow.sim().exhaustive(async || {
4438            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4439                .await;
4440        });
4441    }
4442}