hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45 /// The [`StreamOrder`] corresponding to this type.
46 const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82 /// The weaker of the two orderings.
83 type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88 type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93 type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99 MinRetries<Self, Min = Self>
100 + MinRetries<ExactlyOnce, Min = Self>
101 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103 /// The [`StreamRetry`] corresponding to this type.
104 const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136 /// The weaker of the two retry guarantees.
137 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142 type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147 type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153 label = "required here",
154 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166 label = "required here",
167 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192/// (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196 Type,
197 Loc,
198 Bound: Boundedness = Unbounded,
199 Order: Ordering = TotalOrder,
200 Retry: Retries = ExactlyOnce,
201> {
202 pub(crate) location: Loc,
203 pub(crate) ir_node: RefCell<HydroNode>,
204 pub(crate) flow_state: FlowState,
205
206 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210 fn drop(&mut self) {
211 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214 input: Box::new(ir_node),
215 op_metadata: HydroIrOpMetadata::new(),
216 });
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222 for Stream<T, L, Unbounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227 let new_meta = stream
228 .location
229 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231 Stream {
232 location: stream.location.clone(),
233 flow_state: stream.flow_state.clone(),
234 ir_node: RefCell::new(HydroNode::Cast {
235 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236 metadata: new_meta,
237 }),
238 _phantom: PhantomData,
239 }
240 }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244 for Stream<T, L, B, NoOrder, R>
245where
246 L: Location<'a>,
247{
248 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249 stream.weaken_ordering()
250 }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254 for Stream<T, L, B, O, AtLeastOnce>
255where
256 L: Location<'a>,
257{
258 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259 stream.weaken_retries()
260 }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265 L: Location<'a>,
266{
267 fn defer_tick(self) -> Self {
268 Stream::defer_tick(self)
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280 Stream::new(
281 location.clone(),
282 HydroNode::CycleSource {
283 cycle_id,
284 metadata: location.new_node_metadata(Self::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291 for Stream<T, Tick<L>, Bounded, O, R>
292where
293 L: Location<'a>,
294{
295 type Location = Tick<L>;
296
297 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299 location.clone(),
300 HydroNode::DeferTick {
301 input: Box::new(HydroNode::CycleSource {
302 cycle_id,
303 metadata: location.new_node_metadata(Self::collection_kind()),
304 }),
305 metadata: location.new_node_metadata(Self::collection_kind()),
306 },
307 );
308
309 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310 }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314 for Stream<T, Tick<L>, Bounded, O, R>
315where
316 L: Location<'a>,
317{
318 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319 assert_eq!(
320 Location::id(&self.location),
321 expected_location,
322 "locations do not match"
323 );
324 self.location
325 .flow_state()
326 .borrow_mut()
327 .push_root(HydroRoot::CycleSink {
328 cycle_id,
329 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330 op_metadata: HydroIrOpMetadata::new(),
331 });
332 }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336 for Stream<T, L, B, O, R>
337where
338 L: Location<'a> + NoTick,
339{
340 type Location = L;
341
342 fn create_source(cycle_id: CycleId, location: L) -> Self {
343 Stream::new(
344 location.clone(),
345 HydroNode::CycleSource {
346 cycle_id,
347 metadata: location.new_node_metadata(Self::collection_kind()),
348 },
349 )
350 }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354 for Stream<T, L, B, O, R>
355where
356 L: Location<'a> + NoTick,
357{
358 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359 assert_eq!(
360 Location::id(&self.location),
361 expected_location,
362 "locations do not match"
363 );
364 self.location
365 .flow_state()
366 .borrow_mut()
367 .push_root(HydroRoot::CycleSink {
368 cycle_id,
369 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370 op_metadata: HydroIrOpMetadata::new(),
371 });
372 }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377 T: Clone,
378 L: Location<'a>,
379{
380 fn clone(&self) -> Self {
381 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383 *self.ir_node.borrow_mut() = HydroNode::Tee {
384 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385 metadata: self.location.new_node_metadata(Self::collection_kind()),
386 };
387 }
388
389 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
390 unreachable!()
391 };
392 Stream {
393 location: self.location.clone(),
394 flow_state: self.flow_state.clone(),
395 ir_node: HydroNode::Tee {
396 inner: SharedNode(inner.0.clone()),
397 metadata: metadata.clone(),
398 }
399 .into(),
400 _phantom: PhantomData,
401 }
402 }
403}
404
405impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
406where
407 L: Location<'a>,
408{
409 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
410 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
411 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
412
413 let flow_state = location.flow_state().clone();
414 Stream {
415 location,
416 flow_state,
417 ir_node: RefCell::new(ir_node),
418 _phantom: PhantomData,
419 }
420 }
421
422 /// Returns the [`Location`] where this stream is being materialized.
423 pub fn location(&self) -> &L {
424 &self.location
425 }
426
427 pub(crate) fn collection_kind() -> CollectionKind {
428 CollectionKind::Stream {
429 bound: B::BOUND_KIND,
430 order: O::ORDERING_KIND,
431 retry: R::RETRIES_KIND,
432 element_type: quote_type::<T>().into(),
433 }
434 }
435
436 /// Produces a stream based on invoking `f` on each element.
437 /// If you do not want to modify the stream and instead only want to view
438 /// each item use [`Stream::inspect`] instead.
439 ///
440 /// # Example
441 /// ```rust
442 /// # #[cfg(feature = "deploy")] {
443 /// # use hydro_lang::prelude::*;
444 /// # use futures::StreamExt;
445 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
446 /// let words = process.source_iter(q!(vec!["hello", "world"]));
447 /// words.map(q!(|x| x.to_uppercase()))
448 /// # }, |mut stream| async move {
449 /// # for w in vec!["HELLO", "WORLD"] {
450 /// # assert_eq!(stream.next().await.unwrap(), w);
451 /// # }
452 /// # }));
453 /// # }
454 /// ```
455 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
456 where
457 F: Fn(T) -> U + 'a,
458 {
459 let (f, singleton_refs) = crate::singleton_ref::with_singleton_capture(|| {
460 f.splice_fn1_ctx(&self.location).into()
461 });
462 Stream::new(
463 self.location.clone(),
464 HydroNode::Map {
465 f,
466 singleton_refs,
467 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
468 metadata: self
469 .location
470 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
471 },
472 )
473 }
474
475 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
476 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
477 /// for the output type `U` must produce items in a **deterministic** order.
478 ///
479 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
480 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
481 ///
482 /// # Example
483 /// ```rust
484 /// # #[cfg(feature = "deploy")] {
485 /// # use hydro_lang::prelude::*;
486 /// # use futures::StreamExt;
487 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
488 /// process
489 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
490 /// .flat_map_ordered(q!(|x| x))
491 /// # }, |mut stream| async move {
492 /// // 1, 2, 3, 4
493 /// # for w in (1..5) {
494 /// # assert_eq!(stream.next().await.unwrap(), w);
495 /// # }
496 /// # }));
497 /// # }
498 /// ```
499 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
500 where
501 I: IntoIterator<Item = U>,
502 F: Fn(T) -> I + 'a,
503 {
504 let f = f.splice_fn1_ctx(&self.location).into();
505 Stream::new(
506 self.location.clone(),
507 HydroNode::FlatMap {
508 f,
509 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
510 metadata: self
511 .location
512 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
513 },
514 )
515 }
516
517 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
518 /// for the output type `U` to produce items in any order.
519 ///
520 /// # Example
521 /// ```rust
522 /// # #[cfg(feature = "deploy")] {
523 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
524 /// # use futures::StreamExt;
525 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
526 /// process
527 /// .source_iter(q!(vec![
528 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
529 /// std::collections::HashSet::from_iter(vec![3, 4]),
530 /// ]))
531 /// .flat_map_unordered(q!(|x| x))
532 /// # }, |mut stream| async move {
533 /// // 1, 2, 3, 4, but in no particular order
534 /// # let mut results = Vec::new();
535 /// # for w in (1..5) {
536 /// # results.push(stream.next().await.unwrap());
537 /// # }
538 /// # results.sort();
539 /// # assert_eq!(results, vec![1, 2, 3, 4]);
540 /// # }));
541 /// # }
542 /// ```
543 pub fn flat_map_unordered<U, I, F>(
544 self,
545 f: impl IntoQuotedMut<'a, F, L>,
546 ) -> Stream<U, L, B, NoOrder, R>
547 where
548 I: IntoIterator<Item = U>,
549 F: Fn(T) -> I + 'a,
550 {
551 let f = f.splice_fn1_ctx(&self.location).into();
552 Stream::new(
553 self.location.clone(),
554 HydroNode::FlatMap {
555 f,
556 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
557 metadata: self
558 .location
559 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
560 },
561 )
562 }
563
564 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
565 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
566 ///
567 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
568 /// not deterministic, use [`Stream::flatten_unordered`] instead.
569 ///
570 /// ```rust
571 /// # #[cfg(feature = "deploy")] {
572 /// # use hydro_lang::prelude::*;
573 /// # use futures::StreamExt;
574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
575 /// process
576 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
577 /// .flatten_ordered()
578 /// # }, |mut stream| async move {
579 /// // 1, 2, 3, 4
580 /// # for w in (1..5) {
581 /// # assert_eq!(stream.next().await.unwrap(), w);
582 /// # }
583 /// # }));
584 /// # }
585 /// ```
586 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
587 where
588 T: IntoIterator<Item = U>,
589 {
590 self.flat_map_ordered(q!(|d| d))
591 }
592
593 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
594 /// for the element type `T` to produce items in any order.
595 ///
596 /// # Example
597 /// ```rust
598 /// # #[cfg(feature = "deploy")] {
599 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
600 /// # use futures::StreamExt;
601 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
602 /// process
603 /// .source_iter(q!(vec![
604 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
605 /// std::collections::HashSet::from_iter(vec![3, 4]),
606 /// ]))
607 /// .flatten_unordered()
608 /// # }, |mut stream| async move {
609 /// // 1, 2, 3, 4, but in no particular order
610 /// # let mut results = Vec::new();
611 /// # for w in (1..5) {
612 /// # results.push(stream.next().await.unwrap());
613 /// # }
614 /// # results.sort();
615 /// # assert_eq!(results, vec![1, 2, 3, 4]);
616 /// # }));
617 /// # }
618 /// ```
619 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
620 where
621 T: IntoIterator<Item = U>,
622 {
623 self.flat_map_unordered(q!(|d| d))
624 }
625
626 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
627 /// then emit the elements of that stream one by one. When the inner stream yields
628 /// `Pending`, this operator yields as well.
629 pub fn flat_map_stream_blocking<U, S, F>(
630 self,
631 f: impl IntoQuotedMut<'a, F, L>,
632 ) -> Stream<U, L, B, O, R>
633 where
634 S: futures::Stream<Item = U>,
635 F: Fn(T) -> S + 'a,
636 {
637 let f = f.splice_fn1_ctx(&self.location).into();
638 Stream::new(
639 self.location.clone(),
640 HydroNode::FlatMapStreamBlocking {
641 f,
642 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
643 metadata: self
644 .location
645 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
646 },
647 )
648 }
649
650 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
651 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
652 /// yields as well.
653 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
654 where
655 T: futures::Stream<Item = U>,
656 {
657 self.flat_map_stream_blocking(q!(|d| d))
658 }
659
660 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
661 /// `f`, preserving the order of the elements.
662 ///
663 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
664 /// not modify or take ownership of the values. If you need to modify the values while filtering
665 /// use [`Stream::filter_map`] instead.
666 ///
667 /// # Example
668 /// ```rust
669 /// # #[cfg(feature = "deploy")] {
670 /// # use hydro_lang::prelude::*;
671 /// # use futures::StreamExt;
672 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673 /// process
674 /// .source_iter(q!(vec![1, 2, 3, 4]))
675 /// .filter(q!(|&x| x > 2))
676 /// # }, |mut stream| async move {
677 /// // 3, 4
678 /// # for w in (3..5) {
679 /// # assert_eq!(stream.next().await.unwrap(), w);
680 /// # }
681 /// # }));
682 /// # }
683 /// ```
684 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
685 where
686 F: Fn(&T) -> bool + 'a,
687 {
688 let f = f.splice_fn1_borrow_ctx(&self.location).into();
689 Stream::new(
690 self.location.clone(),
691 HydroNode::Filter {
692 f,
693 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
694 metadata: self.location.new_node_metadata(Self::collection_kind()),
695 },
696 )
697 }
698
699 /// Splits the stream into two streams based on a predicate, without cloning elements.
700 ///
701 /// Elements for which `f` returns `true` are sent to the first output stream,
702 /// and elements for which `f` returns `false` are sent to the second output stream.
703 ///
704 /// Unlike using `filter` twice, this only evaluates the predicate once per element
705 /// and does not require `T: Clone`.
706 ///
707 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
708 /// the predicate is only used for routing; the element itself is moved to the
709 /// appropriate output stream.
710 ///
711 /// # Example
712 /// ```rust
713 /// # #[cfg(feature = "deploy")] {
714 /// # use hydro_lang::prelude::*;
715 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
716 /// # use futures::StreamExt;
717 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
718 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
719 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
720 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
721 /// evens.map(q!(|x| (x, true)))
722 /// .merge_unordered(odds.map(q!(|x| (x, false))))
723 /// # }, |mut stream| async move {
724 /// # let mut results = Vec::new();
725 /// # for _ in 0..6 {
726 /// # results.push(stream.next().await.unwrap());
727 /// # }
728 /// # results.sort();
729 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
730 /// # }));
731 /// # }
732 /// ```
733 #[expect(
734 clippy::type_complexity,
735 reason = "return type mirrors the input stream type"
736 )]
737 pub fn partition<F>(
738 self,
739 f: impl IntoQuotedMut<'a, F, L>,
740 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
741 where
742 F: Fn(&T) -> bool + 'a,
743 {
744 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
745 let shared = SharedNode(Rc::new(RefCell::new(
746 self.ir_node.replace(HydroNode::Placeholder),
747 )));
748
749 let true_stream = Stream::new(
750 self.location.clone(),
751 HydroNode::Partition {
752 inner: SharedNode(shared.0.clone()),
753 f: f.clone(),
754 is_true: true,
755 metadata: self.location.new_node_metadata(Self::collection_kind()),
756 },
757 );
758
759 let false_stream = Stream::new(
760 self.location.clone(),
761 HydroNode::Partition {
762 inner: SharedNode(shared.0),
763 f,
764 is_true: false,
765 metadata: self.location.new_node_metadata(Self::collection_kind()),
766 },
767 );
768
769 (true_stream, false_stream)
770 }
771
772 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
773 ///
774 /// # Example
775 /// ```rust
776 /// # #[cfg(feature = "deploy")] {
777 /// # use hydro_lang::prelude::*;
778 /// # use futures::StreamExt;
779 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
780 /// process
781 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
782 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
783 /// # }, |mut stream| async move {
784 /// // 1, 2
785 /// # for w in (1..3) {
786 /// # assert_eq!(stream.next().await.unwrap(), w);
787 /// # }
788 /// # }));
789 /// # }
790 /// ```
791 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
792 where
793 F: Fn(T) -> Option<U> + 'a,
794 {
795 let f = f.splice_fn1_ctx(&self.location).into();
796 Stream::new(
797 self.location.clone(),
798 HydroNode::FilterMap {
799 f,
800 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
801 metadata: self
802 .location
803 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
804 },
805 )
806 }
807
808 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
809 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
810 /// If `other` is an empty [`Optional`], no values will be produced.
811 ///
812 /// # Example
813 /// ```rust
814 /// # #[cfg(feature = "deploy")] {
815 /// # use hydro_lang::prelude::*;
816 /// # use futures::StreamExt;
817 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818 /// let tick = process.tick();
819 /// let batch = process
820 /// .source_iter(q!(vec![1, 2, 3, 4]))
821 /// .batch(&tick, nondet!(/** test */));
822 /// let count = batch.clone().count(); // `count()` returns a singleton
823 /// batch.cross_singleton(count).all_ticks()
824 /// # }, |mut stream| async move {
825 /// // (1, 4), (2, 4), (3, 4), (4, 4)
826 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
827 /// # assert_eq!(stream.next().await.unwrap(), w);
828 /// # }
829 /// # }));
830 /// # }
831 /// ```
832 pub fn cross_singleton<O2>(
833 self,
834 other: impl Into<Optional<O2, L, Bounded>>,
835 ) -> Stream<(T, O2), L, B, O, R>
836 where
837 O2: Clone,
838 {
839 let other: Optional<O2, L, Bounded> = other.into();
840 check_matching_location(&self.location, &other.location);
841
842 Stream::new(
843 self.location.clone(),
844 HydroNode::CrossSingleton {
845 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
846 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
847 metadata: self
848 .location
849 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
850 },
851 )
852 }
853
854 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
855 ///
856 /// # Example
857 /// ```rust
858 /// # #[cfg(feature = "deploy")] {
859 /// # use hydro_lang::prelude::*;
860 /// # use futures::StreamExt;
861 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862 /// let tick = process.tick();
863 /// // ticks are lazy by default, forces the second tick to run
864 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865 ///
866 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
867 /// let batch_first_tick = process
868 /// .source_iter(q!(vec![1, 2, 3, 4]))
869 /// .batch(&tick, nondet!(/** test */));
870 /// let batch_second_tick = process
871 /// .source_iter(q!(vec![5, 6, 7, 8]))
872 /// .batch(&tick, nondet!(/** test */))
873 /// .defer_tick();
874 /// batch_first_tick.chain(batch_second_tick)
875 /// .filter_if(signal)
876 /// .all_ticks()
877 /// # }, |mut stream| async move {
878 /// // [1, 2, 3, 4]
879 /// # for w in vec![1, 2, 3, 4] {
880 /// # assert_eq!(stream.next().await.unwrap(), w);
881 /// # }
882 /// # }));
883 /// # }
884 /// ```
885 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
886 self.cross_singleton(signal.filter(q!(|b| *b)))
887 .map(q!(|(d, _)| d))
888 }
889
890 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
891 ///
892 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
893 /// leader of a cluster.
894 ///
895 /// # Example
896 /// ```rust
897 /// # #[cfg(feature = "deploy")] {
898 /// # use hydro_lang::prelude::*;
899 /// # use futures::StreamExt;
900 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901 /// let tick = process.tick();
902 /// // ticks are lazy by default, forces the second tick to run
903 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904 ///
905 /// let batch_first_tick = process
906 /// .source_iter(q!(vec![1, 2, 3, 4]))
907 /// .batch(&tick, nondet!(/** test */));
908 /// let batch_second_tick = process
909 /// .source_iter(q!(vec![5, 6, 7, 8]))
910 /// .batch(&tick, nondet!(/** test */))
911 /// .defer_tick(); // appears on the second tick
912 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
913 /// batch_first_tick.chain(batch_second_tick)
914 /// .filter_if_some(some_on_first_tick)
915 /// .all_ticks()
916 /// # }, |mut stream| async move {
917 /// // [1, 2, 3, 4]
918 /// # for w in vec![1, 2, 3, 4] {
919 /// # assert_eq!(stream.next().await.unwrap(), w);
920 /// # }
921 /// # }));
922 /// # }
923 /// ```
924 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
925 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
926 self.filter_if(signal.is_some())
927 }
928
929 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
930 ///
931 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
932 /// some local state.
933 ///
934 /// # Example
935 /// ```rust
936 /// # #[cfg(feature = "deploy")] {
937 /// # use hydro_lang::prelude::*;
938 /// # use futures::StreamExt;
939 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
940 /// let tick = process.tick();
941 /// // ticks are lazy by default, forces the second tick to run
942 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
943 ///
944 /// let batch_first_tick = process
945 /// .source_iter(q!(vec![1, 2, 3, 4]))
946 /// .batch(&tick, nondet!(/** test */));
947 /// let batch_second_tick = process
948 /// .source_iter(q!(vec![5, 6, 7, 8]))
949 /// .batch(&tick, nondet!(/** test */))
950 /// .defer_tick(); // appears on the second tick
951 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
952 /// batch_first_tick.chain(batch_second_tick)
953 /// .filter_if_none(some_on_first_tick)
954 /// .all_ticks()
955 /// # }, |mut stream| async move {
956 /// // [5, 6, 7, 8]
957 /// # for w in vec![5, 6, 7, 8] {
958 /// # assert_eq!(stream.next().await.unwrap(), w);
959 /// # }
960 /// # }));
961 /// # }
962 /// ```
963 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
964 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
965 self.filter_if(other.is_none())
966 }
967
968 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
969 /// returning all tupled pairs.
970 ///
971 /// When the right side is [`Bounded`], it is accumulated first and the left side streams
972 /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
973 /// symmetric hash join is used and ordering is [`NoOrder`].
974 ///
975 /// # Example
976 /// ```rust
977 /// # #[cfg(feature = "deploy")] {
978 /// # use hydro_lang::prelude::*;
979 /// # use std::collections::HashSet;
980 /// # use futures::StreamExt;
981 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
982 /// let tick = process.tick();
983 /// let stream1 = process.source_iter(q!(vec![1, 2]));
984 /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
985 /// stream1.cross_product(stream2)
986 /// # }, |mut stream| async move {
987 /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
988 /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
989 /// # stream.map(|i| assert!(expected.contains(&i)));
990 /// # }));
991 /// # }
992 pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
993 self,
994 other: Stream<T2, L, B2, O2, R>,
995 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
996 where
997 T: Clone,
998 T2: Clone,
999 {
1000 self.map(q!(|v| ((), v)))
1001 .join(other.map(q!(|v| ((), v))))
1002 .map(q!(|((), (v1, v2))| (v1, v2)))
1003 }
1004
1005 /// Takes one stream as input and filters out any duplicate occurrences. The output
1006 /// contains all unique values from the input.
1007 ///
1008 /// # Example
1009 /// ```rust
1010 /// # #[cfg(feature = "deploy")] {
1011 /// # use hydro_lang::prelude::*;
1012 /// # use futures::StreamExt;
1013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014 /// let tick = process.tick();
1015 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1016 /// # }, |mut stream| async move {
1017 /// # for w in vec![1, 2, 3, 4] {
1018 /// # assert_eq!(stream.next().await.unwrap(), w);
1019 /// # }
1020 /// # }));
1021 /// # }
1022 /// ```
1023 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1024 where
1025 T: Eq + Hash,
1026 {
1027 Stream::new(
1028 self.location.clone(),
1029 HydroNode::Unique {
1030 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1031 metadata: self
1032 .location
1033 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1034 },
1035 )
1036 }
1037
1038 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1039 ///
1040 /// The `other` stream must be [`Bounded`], since this function will wait until
1041 /// all its elements are available before producing any output.
1042 /// # Example
1043 /// ```rust
1044 /// # #[cfg(feature = "deploy")] {
1045 /// # use hydro_lang::prelude::*;
1046 /// # use futures::StreamExt;
1047 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1048 /// let tick = process.tick();
1049 /// let stream = process
1050 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1051 /// .batch(&tick, nondet!(/** test */));
1052 /// let batch = process
1053 /// .source_iter(q!(vec![1, 2]))
1054 /// .batch(&tick, nondet!(/** test */));
1055 /// stream.filter_not_in(batch).all_ticks()
1056 /// # }, |mut stream| async move {
1057 /// # for w in vec![3, 4] {
1058 /// # assert_eq!(stream.next().await.unwrap(), w);
1059 /// # }
1060 /// # }));
1061 /// # }
1062 /// ```
1063 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1064 where
1065 T: Eq + Hash,
1066 B2: IsBounded,
1067 {
1068 check_matching_location(&self.location, &other.location);
1069
1070 Stream::new(
1071 self.location.clone(),
1072 HydroNode::Difference {
1073 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1074 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1075 metadata: self
1076 .location
1077 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1078 },
1079 )
1080 }
1081
1082 /// An operator which allows you to "inspect" each element of a stream without
1083 /// modifying it. The closure `f` is called on a reference to each item. This is
1084 /// mainly useful for debugging, and should not be used to generate side-effects.
1085 ///
1086 /// # Example
1087 /// ```rust
1088 /// # #[cfg(feature = "deploy")] {
1089 /// # use hydro_lang::prelude::*;
1090 /// # use futures::StreamExt;
1091 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092 /// let nums = process.source_iter(q!(vec![1, 2]));
1093 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1094 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1095 /// # }, |mut stream| async move {
1096 /// # for w in vec![1, 2] {
1097 /// # assert_eq!(stream.next().await.unwrap(), w);
1098 /// # }
1099 /// # }));
1100 /// # }
1101 /// ```
1102 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1103 where
1104 F: Fn(&T) + 'a,
1105 {
1106 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1107
1108 Stream::new(
1109 self.location.clone(),
1110 HydroNode::Inspect {
1111 f,
1112 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1113 metadata: self.location.new_node_metadata(Self::collection_kind()),
1114 },
1115 )
1116 }
1117
1118 /// Executes the provided closure for every element in this stream.
1119 ///
1120 /// Because the closure may have side effects, the stream must have deterministic order
1121 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1122 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1123 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1124 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1125 where
1126 O: IsOrdered,
1127 R: IsExactlyOnce,
1128 {
1129 let f = f.splice_fn1_ctx(&self.location).into();
1130 self.location
1131 .flow_state()
1132 .borrow_mut()
1133 .push_root(HydroRoot::ForEach {
1134 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1135 f,
1136 op_metadata: HydroIrOpMetadata::new(),
1137 });
1138 }
1139
1140 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1141 /// TCP socket to some other server. You should _not_ use this API for interacting with
1142 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1143 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1144 /// interaction with asynchronous sinks.
1145 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1146 where
1147 O: IsOrdered,
1148 R: IsExactlyOnce,
1149 S: 'a + futures::Sink<T> + Unpin,
1150 {
1151 self.location
1152 .flow_state()
1153 .borrow_mut()
1154 .push_root(HydroRoot::DestSink {
1155 sink: sink.splice_typed_ctx(&self.location).into(),
1156 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1157 op_metadata: HydroIrOpMetadata::new(),
1158 });
1159 }
1160
1161 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1162 ///
1163 /// # Example
1164 /// ```rust
1165 /// # #[cfg(feature = "deploy")] {
1166 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1167 /// # use futures::StreamExt;
1168 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1169 /// let tick = process.tick();
1170 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1171 /// numbers.enumerate()
1172 /// # }, |mut stream| async move {
1173 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1174 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1175 /// # assert_eq!(stream.next().await.unwrap(), w);
1176 /// # }
1177 /// # }));
1178 /// # }
1179 /// ```
1180 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1181 where
1182 O: IsOrdered,
1183 R: IsExactlyOnce,
1184 {
1185 Stream::new(
1186 self.location.clone(),
1187 HydroNode::Enumerate {
1188 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1189 metadata: self.location.new_node_metadata(Stream::<
1190 (usize, T),
1191 L,
1192 B,
1193 TotalOrder,
1194 ExactlyOnce,
1195 >::collection_kind()),
1196 },
1197 )
1198 }
1199
1200 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1201 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1202 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1203 ///
1204 /// Depending on the input stream guarantees, the closure may need to be commutative
1205 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1206 ///
1207 /// # Example
1208 /// ```rust
1209 /// # #[cfg(feature = "deploy")] {
1210 /// # use hydro_lang::prelude::*;
1211 /// # use futures::StreamExt;
1212 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1213 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1214 /// words
1215 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1216 /// .into_stream()
1217 /// # }, |mut stream| async move {
1218 /// // "HELLOWORLD"
1219 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1220 /// # }));
1221 /// # }
1222 /// ```
1223 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1224 self,
1225 init: impl IntoQuotedMut<'a, I, L>,
1226 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1227 ) -> Singleton<A, L, B2>
1228 where
1229 I: Fn() -> A + 'a,
1230 F: Fn(&mut A, T),
1231 C: ValidCommutativityFor<O>,
1232 Idemp: ValidIdempotenceFor<R>,
1233 B: ApplyMonotoneStream<M, B2>,
1234 {
1235 let init = init.splice_fn0_ctx(&self.location).into();
1236 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1237 proof.register_proof(&comb);
1238
1239 // Only assume_retries (for idempotence), not assume_ordering.
1240 // The fold hook in the simulator handles ordering non-determinism directly.
1241 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1242 let retried: Stream<T, L, B, O, ExactlyOnce> = self.assume_retries(nondet);
1243
1244 let core = HydroNode::Fold {
1245 init,
1246 acc: comb.into(),
1247 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1248 metadata: retried
1249 .location
1250 .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1251 };
1252
1253 Singleton::new(retried.location.clone(), core)
1254 }
1255
1256 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1257 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1258 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1259 /// reference, so that it can be modified in place.
1260 ///
1261 /// Depending on the input stream guarantees, the closure may need to be commutative
1262 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1263 ///
1264 /// # Example
1265 /// ```rust
1266 /// # #[cfg(feature = "deploy")] {
1267 /// # use hydro_lang::prelude::*;
1268 /// # use futures::StreamExt;
1269 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1270 /// let bools = process.source_iter(q!(vec![false, true, false]));
1271 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1272 /// # }, |mut stream| async move {
1273 /// // true
1274 /// # assert_eq!(stream.next().await.unwrap(), true);
1275 /// # }));
1276 /// # }
1277 /// ```
1278 pub fn reduce<F, C, Idemp>(
1279 self,
1280 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1281 ) -> Optional<T, L, B>
1282 where
1283 F: Fn(&mut T, T) + 'a,
1284 C: ValidCommutativityFor<O>,
1285 Idemp: ValidIdempotenceFor<R>,
1286 {
1287 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1288 proof.register_proof(&f);
1289
1290 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1291 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1292
1293 let core = HydroNode::Reduce {
1294 f: f.into(),
1295 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1296 metadata: ordered_etc
1297 .location
1298 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1299 };
1300
1301 Optional::new(ordered_etc.location.clone(), core)
1302 }
1303
1304 /// Computes the maximum element in the stream as an [`Optional`], which
1305 /// will be empty until the first element in the input arrives.
1306 ///
1307 /// # Example
1308 /// ```rust
1309 /// # #[cfg(feature = "deploy")] {
1310 /// # use hydro_lang::prelude::*;
1311 /// # use futures::StreamExt;
1312 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1313 /// let tick = process.tick();
1314 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1315 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1316 /// batch.max().all_ticks()
1317 /// # }, |mut stream| async move {
1318 /// // 4
1319 /// # assert_eq!(stream.next().await.unwrap(), 4);
1320 /// # }));
1321 /// # }
1322 /// ```
1323 pub fn max(self) -> Optional<T, L, B>
1324 where
1325 T: Ord,
1326 {
1327 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1328 .assume_ordering_trusted_bounded::<TotalOrder>(
1329 nondet!(/** max is commutative, but order affects intermediates */),
1330 )
1331 .reduce(q!(|curr, new| {
1332 if new > *curr {
1333 *curr = new;
1334 }
1335 }))
1336 }
1337
1338 /// Computes the minimum element in the stream as an [`Optional`], which
1339 /// will be empty until the first element in the input arrives.
1340 ///
1341 /// # Example
1342 /// ```rust
1343 /// # #[cfg(feature = "deploy")] {
1344 /// # use hydro_lang::prelude::*;
1345 /// # use futures::StreamExt;
1346 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347 /// let tick = process.tick();
1348 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350 /// batch.min().all_ticks()
1351 /// # }, |mut stream| async move {
1352 /// // 1
1353 /// # assert_eq!(stream.next().await.unwrap(), 1);
1354 /// # }));
1355 /// # }
1356 /// ```
1357 pub fn min(self) -> Optional<T, L, B>
1358 where
1359 T: Ord,
1360 {
1361 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1362 .assume_ordering_trusted_bounded::<TotalOrder>(
1363 nondet!(/** max is commutative, but order affects intermediates */),
1364 )
1365 .reduce(q!(|curr, new| {
1366 if new < *curr {
1367 *curr = new;
1368 }
1369 }))
1370 }
1371
1372 /// Computes the first element in the stream as an [`Optional`], which
1373 /// will be empty until the first element in the input arrives.
1374 ///
1375 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1376 /// re-ordering of elements may cause the first element to change.
1377 ///
1378 /// # Example
1379 /// ```rust
1380 /// # #[cfg(feature = "deploy")] {
1381 /// # use hydro_lang::prelude::*;
1382 /// # use futures::StreamExt;
1383 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384 /// let tick = process.tick();
1385 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1386 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1387 /// batch.first().all_ticks()
1388 /// # }, |mut stream| async move {
1389 /// // 1
1390 /// # assert_eq!(stream.next().await.unwrap(), 1);
1391 /// # }));
1392 /// # }
1393 /// ```
1394 pub fn first(self) -> Optional<T, L, B>
1395 where
1396 O: IsOrdered,
1397 {
1398 self.make_totally_ordered()
1399 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1400 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1401 .reduce(q!(|_, _| {}))
1402 }
1403
1404 /// Computes the last element in the stream as an [`Optional`], which
1405 /// will be empty until an element in the input arrives.
1406 ///
1407 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1408 /// re-ordering of elements may cause the last element to change.
1409 ///
1410 /// # Example
1411 /// ```rust
1412 /// # #[cfg(feature = "deploy")] {
1413 /// # use hydro_lang::prelude::*;
1414 /// # use futures::StreamExt;
1415 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1416 /// let tick = process.tick();
1417 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1418 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1419 /// batch.last().all_ticks()
1420 /// # }, |mut stream| async move {
1421 /// // 4
1422 /// # assert_eq!(stream.next().await.unwrap(), 4);
1423 /// # }));
1424 /// # }
1425 /// ```
1426 pub fn last(self) -> Optional<T, L, B>
1427 where
1428 O: IsOrdered,
1429 {
1430 self.make_totally_ordered()
1431 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1432 .reduce(q!(|curr, new| *curr = new))
1433 }
1434
1435 /// Returns a stream containing at most the first `n` elements of the input stream,
1436 /// preserving the original order. Similar to `LIMIT` in SQL.
1437 ///
1438 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1439 /// retries, since the result depends on the order and cardinality of elements.
1440 ///
1441 /// # Example
1442 /// ```rust
1443 /// # #[cfg(feature = "deploy")] {
1444 /// # use hydro_lang::prelude::*;
1445 /// # use futures::StreamExt;
1446 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1447 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1448 /// numbers.limit(q!(3))
1449 /// # }, |mut stream| async move {
1450 /// // 10, 20, 30
1451 /// # for w in vec![10, 20, 30] {
1452 /// # assert_eq!(stream.next().await.unwrap(), w);
1453 /// # }
1454 /// # }));
1455 /// # }
1456 /// ```
1457 pub fn limit(
1458 self,
1459 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1460 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1461 where
1462 O: IsOrdered,
1463 R: IsExactlyOnce,
1464 {
1465 self.generator(
1466 q!(|| 0usize),
1467 q!(move |count, item| {
1468 if *count == n {
1469 Generate::Break
1470 } else {
1471 *count += 1;
1472 if *count == n {
1473 Generate::Return(item)
1474 } else {
1475 Generate::Yield(item)
1476 }
1477 }
1478 }),
1479 )
1480 }
1481
1482 /// Collects all the elements of this stream into a single [`Vec`] element.
1483 ///
1484 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1485 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1486 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1487 /// the vector at an arbitrary point in time.
1488 ///
1489 /// # Example
1490 /// ```rust
1491 /// # #[cfg(feature = "deploy")] {
1492 /// # use hydro_lang::prelude::*;
1493 /// # use futures::StreamExt;
1494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1495 /// let tick = process.tick();
1496 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1497 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1498 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1499 /// # }, |mut stream| async move {
1500 /// // [ vec![1, 2, 3, 4] ]
1501 /// # for w in vec![vec![1, 2, 3, 4]] {
1502 /// # assert_eq!(stream.next().await.unwrap(), w);
1503 /// # }
1504 /// # }));
1505 /// # }
1506 /// ```
1507 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1508 where
1509 O: IsOrdered,
1510 R: IsExactlyOnce,
1511 {
1512 self.make_totally_ordered().make_exactly_once().fold(
1513 q!(|| vec![]),
1514 q!(|acc, v| {
1515 acc.push(v);
1516 }),
1517 )
1518 }
1519
1520 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1521 /// and emitting each intermediate result.
1522 ///
1523 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1524 /// containing all intermediate accumulated values. The scan operation can also terminate early
1525 /// by returning `None`.
1526 ///
1527 /// The function takes a mutable reference to the accumulator and the current element, and returns
1528 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1529 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1530 ///
1531 /// # Examples
1532 ///
1533 /// Basic usage - running sum:
1534 /// ```rust
1535 /// # #[cfg(feature = "deploy")] {
1536 /// # use hydro_lang::prelude::*;
1537 /// # use futures::StreamExt;
1538 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1540 /// q!(|| 0),
1541 /// q!(|acc, x| {
1542 /// *acc += x;
1543 /// Some(*acc)
1544 /// }),
1545 /// )
1546 /// # }, |mut stream| async move {
1547 /// // Output: 1, 3, 6, 10
1548 /// # for w in vec![1, 3, 6, 10] {
1549 /// # assert_eq!(stream.next().await.unwrap(), w);
1550 /// # }
1551 /// # }));
1552 /// # }
1553 /// ```
1554 ///
1555 /// Early termination example:
1556 /// ```rust
1557 /// # #[cfg(feature = "deploy")] {
1558 /// # use hydro_lang::prelude::*;
1559 /// # use futures::StreamExt;
1560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1562 /// q!(|| 1),
1563 /// q!(|state, x| {
1564 /// *state = *state * x;
1565 /// if *state > 6 {
1566 /// None // Terminate the stream
1567 /// } else {
1568 /// Some(-*state)
1569 /// }
1570 /// }),
1571 /// )
1572 /// # }, |mut stream| async move {
1573 /// // Output: -1, -2, -6
1574 /// # for w in vec![-1, -2, -6] {
1575 /// # assert_eq!(stream.next().await.unwrap(), w);
1576 /// # }
1577 /// # }));
1578 /// # }
1579 /// ```
1580 pub fn scan<A, U, I, F>(
1581 self,
1582 init: impl IntoQuotedMut<'a, I, L>,
1583 f: impl IntoQuotedMut<'a, F, L>,
1584 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1585 where
1586 O: IsOrdered,
1587 R: IsExactlyOnce,
1588 I: Fn() -> A + 'a,
1589 F: Fn(&mut A, T) -> Option<U> + 'a,
1590 {
1591 let init = init.splice_fn0_ctx(&self.location).into();
1592 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1593
1594 Stream::new(
1595 self.location.clone(),
1596 HydroNode::Scan {
1597 init,
1598 acc: f,
1599 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1600 metadata: self.location.new_node_metadata(
1601 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1602 ),
1603 },
1604 )
1605 }
1606
1607 /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1608 /// stream, maintaining an internal state (accumulator) and emitting the values returned
1609 /// by the function.
1610 ///
1611 /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1612 /// future. The future is polled to completion. If it resolves to `Some`, the value is
1613 /// emitted. If it resolves to `None`, the item is filtered out.
1614 ///
1615 /// # Examples
1616 ///
1617 /// ```rust
1618 /// # #[cfg(feature = "deploy")] {
1619 /// # use hydro_lang::prelude::*;
1620 /// # use futures::StreamExt;
1621 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1622 /// process
1623 /// .source_iter(q!(vec![1, 2, 3, 4]))
1624 /// .scan_async_blocking(
1625 /// q!(|| 0),
1626 /// q!(|acc, x| {
1627 /// *acc += x;
1628 /// let val = *acc;
1629 /// async move { Some(val) }
1630 /// }),
1631 /// )
1632 /// # }, |mut stream| async move {
1633 /// // Output: 1, 3, 6, 10
1634 /// # for w in vec![1, 3, 6, 10] {
1635 /// # assert_eq!(stream.next().await.unwrap(), w);
1636 /// # }
1637 /// # }));
1638 /// # }
1639 /// ```
1640 pub fn scan_async_blocking<A, U, I, F, Fut>(
1641 self,
1642 init: impl IntoQuotedMut<'a, I, L>,
1643 f: impl IntoQuotedMut<'a, F, L>,
1644 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1645 where
1646 O: IsOrdered,
1647 R: IsExactlyOnce,
1648 I: Fn() -> A + 'a,
1649 F: Fn(&mut A, T) -> Fut + 'a,
1650 Fut: Future<Output = Option<U>> + 'a,
1651 {
1652 let init = init.splice_fn0_ctx(&self.location).into();
1653 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1654
1655 Stream::new(
1656 self.location.clone(),
1657 HydroNode::ScanAsyncBlocking {
1658 init,
1659 acc: f,
1660 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1661 metadata: self.location.new_node_metadata(
1662 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1663 ),
1664 },
1665 )
1666 }
1667
1668 /// Iteratively processes the elements of the stream using a state machine that can yield
1669 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1670 /// syntax in Rust, without requiring special syntax.
1671 ///
1672 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1673 /// state. The second argument defines the processing logic, taking in a mutable reference
1674 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1675 /// variants define what is emitted and whether further inputs should be processed.
1676 ///
1677 /// # Example
1678 /// ```rust
1679 /// # #[cfg(feature = "deploy")] {
1680 /// # use hydro_lang::prelude::*;
1681 /// # use futures::StreamExt;
1682 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1683 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1684 /// q!(|| 0),
1685 /// q!(|acc, x| {
1686 /// *acc += x;
1687 /// if *acc > 100 {
1688 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1689 /// } else if *acc % 2 == 0 {
1690 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1691 /// } else {
1692 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1693 /// }
1694 /// }),
1695 /// )
1696 /// # }, |mut stream| async move {
1697 /// // Output: "even", "done!"
1698 /// # let mut results = Vec::new();
1699 /// # for _ in 0..2 {
1700 /// # results.push(stream.next().await.unwrap());
1701 /// # }
1702 /// # results.sort();
1703 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1704 /// # }));
1705 /// # }
1706 /// ```
1707 pub fn generator<A, U, I, F>(
1708 self,
1709 init: impl IntoQuotedMut<'a, I, L> + Copy,
1710 f: impl IntoQuotedMut<'a, F, L> + Copy,
1711 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1712 where
1713 O: IsOrdered,
1714 R: IsExactlyOnce,
1715 I: Fn() -> A + 'a,
1716 F: Fn(&mut A, T) -> Generate<U> + 'a,
1717 {
1718 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1719 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1720
1721 let this = self.make_totally_ordered().make_exactly_once();
1722
1723 // State is Option<Option<A>>:
1724 // None = not yet initialized
1725 // Some(Some(a)) = active with state a
1726 // Some(None) = terminated
1727 let scan_init = q!(|| None)
1728 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1729 .into();
1730 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1731 if state.is_none() {
1732 *state = Some(Some(init()));
1733 }
1734 match state {
1735 Some(Some(state_value)) => match f(state_value, v) {
1736 Generate::Yield(out) => Some(Some(out)),
1737 Generate::Return(out) => {
1738 *state = Some(None);
1739 Some(Some(out))
1740 }
1741 // Unlike KeyedStream, we can terminate the scan directly on
1742 // Break/Return because there is only one state (no other keys
1743 // that still need processing).
1744 Generate::Break => None,
1745 Generate::Continue => Some(None),
1746 },
1747 // State is Some(None) after Return; terminate the scan.
1748 _ => None,
1749 }
1750 })
1751 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1752 .into();
1753
1754 let scan_node = HydroNode::Scan {
1755 init: scan_init,
1756 acc: scan_f,
1757 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1758 metadata: this.location.new_node_metadata(Stream::<
1759 Option<U>,
1760 L,
1761 B,
1762 TotalOrder,
1763 ExactlyOnce,
1764 >::collection_kind()),
1765 };
1766
1767 let flatten_f = q!(|d| d)
1768 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1769 .into();
1770 let flatten_node = HydroNode::FlatMap {
1771 f: flatten_f,
1772 input: Box::new(scan_node),
1773 metadata: this
1774 .location
1775 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1776 };
1777
1778 Stream::new(this.location.clone(), flatten_node)
1779 }
1780
1781 /// Given a time interval, returns a stream corresponding to samples taken from the
1782 /// stream roughly at that interval. The output will have elements in the same order
1783 /// as the input, but with arbitrary elements skipped between samples. There is also
1784 /// no guarantee on the exact timing of the samples.
1785 ///
1786 /// # Non-Determinism
1787 /// The output stream is non-deterministic in which elements are sampled, since this
1788 /// is controlled by a clock.
1789 pub fn sample_every(
1790 self,
1791 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1792 nondet: NonDet,
1793 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1794 where
1795 L: NoTick + NoAtomic,
1796 {
1797 let samples = self.location.source_interval(interval);
1798
1799 let tick = self.location.tick();
1800 self.batch(&tick, nondet)
1801 .filter_if(samples.batch(&tick, nondet).first().is_some())
1802 .all_ticks()
1803 .weaken_retries()
1804 }
1805
1806 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1807 /// stream has not emitted a value since that duration.
1808 ///
1809 /// # Non-Determinism
1810 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1811 /// samples take place, timeouts may be non-deterministically generated or missed,
1812 /// and the notification of the timeout may be delayed as well. There is also no
1813 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1814 /// detected based on when the next sample is taken.
1815 pub fn timeout(
1816 self,
1817 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1818 nondet: NonDet,
1819 ) -> Optional<(), L, Unbounded>
1820 where
1821 L: NoTick + NoAtomic,
1822 {
1823 let tick = self.location.tick();
1824
1825 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1826 q!(|| None),
1827 q!(
1828 |latest, _| {
1829 *latest = Some(Instant::now());
1830 },
1831 commutative = manual_proof!(/** TODO */)
1832 ),
1833 );
1834
1835 latest_received
1836 .snapshot(&tick, nondet)
1837 .filter_map(q!(move |latest_received| {
1838 if let Some(latest_received) = latest_received {
1839 if Instant::now().duration_since(latest_received) > duration {
1840 Some(())
1841 } else {
1842 None
1843 }
1844 } else {
1845 Some(())
1846 }
1847 }))
1848 .latest()
1849 }
1850
1851 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1852 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1853 ///
1854 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1855 /// processed before an acknowledgement is emitted.
1856 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1857 let id = self.location.flow_state().borrow_mut().next_clock_id();
1858 let out_location = Atomic {
1859 tick: Tick {
1860 id,
1861 l: self.location.clone(),
1862 },
1863 };
1864 Stream::new(
1865 out_location.clone(),
1866 HydroNode::BeginAtomic {
1867 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1868 metadata: out_location
1869 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1870 },
1871 )
1872 }
1873
1874 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1875 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1876 /// the order of the input. The output stream will execute in the [`Tick`] that was
1877 /// used to create the atomic section.
1878 ///
1879 /// # Non-Determinism
1880 /// The batch boundaries are non-deterministic and may change across executions.
1881 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1882 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1883 Stream::new(
1884 tick.clone(),
1885 HydroNode::Batch {
1886 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1887 metadata: tick
1888 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1889 },
1890 )
1891 }
1892
1893 /// An operator which allows you to "name" a `HydroNode`.
1894 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1895 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1896 {
1897 let mut node = self.ir_node.borrow_mut();
1898 let metadata = node.metadata_mut();
1899 metadata.tag = Some(name.to_owned());
1900 }
1901 self
1902 }
1903
1904 /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
1905 /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
1906 /// so uses must be carefully vetted.
1907 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
1908 where
1909 B: IsBounded,
1910 {
1911 Optional::new(
1912 self.location.clone(),
1913 HydroNode::Cast {
1914 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1915 metadata: self
1916 .location
1917 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1918 },
1919 )
1920 }
1921
1922 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
1923 if O::ORDERING_KIND == O2::ORDERING_KIND {
1924 Stream::new(
1925 self.location.clone(),
1926 self.ir_node.replace(HydroNode::Placeholder),
1927 )
1928 } else {
1929 panic!(
1930 "Runtime ordering {:?} did not match requested cast {:?}.",
1931 O::ORDERING_KIND,
1932 O2::ORDERING_KIND
1933 )
1934 }
1935 }
1936
1937 /// Explicitly "casts" the stream to a type with a different ordering
1938 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1939 /// by the type-system.
1940 ///
1941 /// # Non-Determinism
1942 /// This function is used as an escape hatch, and any mistakes in the
1943 /// provided ordering guarantee will propagate into the guarantees
1944 /// for the rest of the program.
1945 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1946 if O::ORDERING_KIND == O2::ORDERING_KIND {
1947 self.use_ordering_type()
1948 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1949 // We can always weaken the ordering guarantee
1950 Stream::new(
1951 self.location.clone(),
1952 HydroNode::Cast {
1953 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1954 metadata: self
1955 .location
1956 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1957 },
1958 )
1959 } else {
1960 Stream::new(
1961 self.location.clone(),
1962 HydroNode::ObserveNonDet {
1963 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1964 trusted: false,
1965 metadata: self
1966 .location
1967 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1968 },
1969 )
1970 }
1971 }
1972
1973 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1974 // intermediate states will not be revealed
1975 fn assume_ordering_trusted_bounded<O2: Ordering>(
1976 self,
1977 nondet: NonDet,
1978 ) -> Stream<T, L, B, O2, R> {
1979 if B::BOUNDED {
1980 self.assume_ordering_trusted(nondet)
1981 } else {
1982 self.assume_ordering(nondet)
1983 }
1984 }
1985
1986 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1987 // is not observable
1988 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1989 self,
1990 _nondet: NonDet,
1991 ) -> Stream<T, L, B, O2, R> {
1992 if O::ORDERING_KIND == O2::ORDERING_KIND {
1993 Stream::new(
1994 self.location.clone(),
1995 self.ir_node.replace(HydroNode::Placeholder),
1996 )
1997 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1998 // We can always weaken the ordering guarantee
1999 Stream::new(
2000 self.location.clone(),
2001 HydroNode::Cast {
2002 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2003 metadata: self
2004 .location
2005 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2006 },
2007 )
2008 } else {
2009 Stream::new(
2010 self.location.clone(),
2011 HydroNode::ObserveNonDet {
2012 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2013 trusted: true,
2014 metadata: self
2015 .location
2016 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2017 },
2018 )
2019 }
2020 }
2021
2022 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2023 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2024 /// which is always safe because that is the weakest possible guarantee.
2025 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2026 self.weaken_ordering::<NoOrder>()
2027 }
2028
2029 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2030 /// enforcing that `O2` is weaker than the input ordering guarantee.
2031 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2032 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2033 self.assume_ordering::<O2>(nondet)
2034 }
2035
2036 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2037 /// implies that `O == TotalOrder`.
2038 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2039 where
2040 O: IsOrdered,
2041 {
2042 self.assume_ordering(nondet!(/** no-op */))
2043 }
2044
2045 /// Explicitly "casts" the stream to a type with a different retries
2046 /// guarantee. Useful in unsafe code where the lack of retries cannot
2047 /// be proven by the type-system.
2048 ///
2049 /// # Non-Determinism
2050 /// This function is used as an escape hatch, and any mistakes in the
2051 /// provided retries guarantee will propagate into the guarantees
2052 /// for the rest of the program.
2053 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2054 if R::RETRIES_KIND == R2::RETRIES_KIND {
2055 Stream::new(
2056 self.location.clone(),
2057 self.ir_node.replace(HydroNode::Placeholder),
2058 )
2059 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2060 // We can always weaken the retries guarantee
2061 Stream::new(
2062 self.location.clone(),
2063 HydroNode::Cast {
2064 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2065 metadata: self
2066 .location
2067 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2068 },
2069 )
2070 } else {
2071 Stream::new(
2072 self.location.clone(),
2073 HydroNode::ObserveNonDet {
2074 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2075 trusted: false,
2076 metadata: self
2077 .location
2078 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2079 },
2080 )
2081 }
2082 }
2083
2084 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2085 // is not observable
2086 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2087 if R::RETRIES_KIND == R2::RETRIES_KIND {
2088 Stream::new(
2089 self.location.clone(),
2090 self.ir_node.replace(HydroNode::Placeholder),
2091 )
2092 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2093 // We can always weaken the retries guarantee
2094 Stream::new(
2095 self.location.clone(),
2096 HydroNode::Cast {
2097 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2098 metadata: self
2099 .location
2100 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2101 },
2102 )
2103 } else {
2104 Stream::new(
2105 self.location.clone(),
2106 HydroNode::ObserveNonDet {
2107 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2108 trusted: true,
2109 metadata: self
2110 .location
2111 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2112 },
2113 )
2114 }
2115 }
2116
2117 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2118 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2119 /// which is always safe because that is the weakest possible guarantee.
2120 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2121 self.weaken_retries::<AtLeastOnce>()
2122 }
2123
2124 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2125 /// enforcing that `R2` is weaker than the input retries guarantee.
2126 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2127 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2128 self.assume_retries::<R2>(nondet)
2129 }
2130
2131 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2132 /// implies that `R == ExactlyOnce`.
2133 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2134 where
2135 R: IsExactlyOnce,
2136 {
2137 self.assume_retries(nondet!(/** no-op */))
2138 }
2139
2140 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2141 /// implies that `B == Bounded`.
2142 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2143 where
2144 B: IsBounded,
2145 {
2146 self.weaken_boundedness()
2147 }
2148
2149 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2150 /// which implies that `B == Bounded`.
2151 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2152 if B::BOUNDED == B2::BOUNDED {
2153 Stream::new(
2154 self.location.clone(),
2155 self.ir_node.replace(HydroNode::Placeholder),
2156 )
2157 } else {
2158 // We can always weaken the boundedness
2159 Stream::new(
2160 self.location.clone(),
2161 HydroNode::Cast {
2162 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2163 metadata: self
2164 .location
2165 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2166 },
2167 )
2168 }
2169 }
2170}
2171
2172impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2173where
2174 L: Location<'a>,
2175{
2176 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2177 ///
2178 /// # Example
2179 /// ```rust
2180 /// # #[cfg(feature = "deploy")] {
2181 /// # use hydro_lang::prelude::*;
2182 /// # use futures::StreamExt;
2183 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2184 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2185 /// # }, |mut stream| async move {
2186 /// // 1, 2, 3
2187 /// # for w in vec![1, 2, 3] {
2188 /// # assert_eq!(stream.next().await.unwrap(), w);
2189 /// # }
2190 /// # }));
2191 /// # }
2192 /// ```
2193 pub fn cloned(self) -> Stream<T, L, B, O, R>
2194 where
2195 T: Clone,
2196 {
2197 self.map(q!(|d| d.clone()))
2198 }
2199}
2200
2201impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2202where
2203 L: Location<'a>,
2204{
2205 /// Computes the number of elements in the stream as a [`Singleton`].
2206 ///
2207 /// # Example
2208 /// ```rust
2209 /// # #[cfg(feature = "deploy")] {
2210 /// # use hydro_lang::prelude::*;
2211 /// # use futures::StreamExt;
2212 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2213 /// let tick = process.tick();
2214 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2215 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2216 /// batch.count().all_ticks()
2217 /// # }, |mut stream| async move {
2218 /// // 4
2219 /// # assert_eq!(stream.next().await.unwrap(), 4);
2220 /// # }));
2221 /// # }
2222 /// ```
2223 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2224 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2225 /// Order does not affect eventual count, and also does not affect intermediate states.
2226 ))
2227 .fold(
2228 q!(|| 0usize),
2229 q!(
2230 |count, _| *count += 1,
2231 monotone = manual_proof!(/** += 1 is monotone */)
2232 ),
2233 )
2234 }
2235}
2236
2237impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2238 /// Produces a new stream that merges the elements of the two input streams.
2239 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2240 ///
2241 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2242 /// [`Bounded`], you can use [`Stream::chain`] instead.
2243 ///
2244 /// # Example
2245 /// ```rust
2246 /// # #[cfg(feature = "deploy")] {
2247 /// # use hydro_lang::prelude::*;
2248 /// # use futures::StreamExt;
2249 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2250 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2251 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2252 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2253 /// # }, |mut stream| async move {
2254 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2255 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2256 /// # assert_eq!(stream.next().await.unwrap(), w);
2257 /// # }
2258 /// # }));
2259 /// # }
2260 /// ```
2261 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2262 self,
2263 other: Stream<T, L, Unbounded, O2, R2>,
2264 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2265 where
2266 R: MinRetries<R2>,
2267 {
2268 Stream::new(
2269 self.location.clone(),
2270 HydroNode::Chain {
2271 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2272 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2273 metadata: self.location.new_node_metadata(Stream::<
2274 T,
2275 L,
2276 Unbounded,
2277 NoOrder,
2278 <R as MinRetries<R2>>::Min,
2279 >::collection_kind()),
2280 },
2281 )
2282 }
2283
2284 /// Deprecated: use [`Stream::merge_unordered`] instead.
2285 #[deprecated(note = "use `merge_unordered` instead")]
2286 pub fn interleave<O2: Ordering, R2: Retries>(
2287 self,
2288 other: Stream<T, L, Unbounded, O2, R2>,
2289 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2290 where
2291 R: MinRetries<R2>,
2292 {
2293 self.merge_unordered(other)
2294 }
2295}
2296
2297impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2298 /// Produces a new stream that combines the elements of the two input streams,
2299 /// preserving the relative order of elements within each input.
2300 ///
2301 /// # Non-Determinism
2302 /// The order in which elements *across* the two streams will be interleaved is
2303 /// non-deterministic, so the order of elements will vary across runs. If the output
2304 /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2305 /// but emits an unordered stream. For deterministic first-then-second ordering on
2306 /// bounded streams, use [`Stream::chain`].
2307 ///
2308 /// # Example
2309 /// ```rust
2310 /// # #[cfg(feature = "deploy")] {
2311 /// # use hydro_lang::prelude::*;
2312 /// # use futures::StreamExt;
2313 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2314 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2315 /// # process.source_iter(q!(vec![1, 3])).into();
2316 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2317 /// # }, |mut stream| async move {
2318 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2319 /// # for w in vec![1, 3, 2, 4] {
2320 /// # assert_eq!(stream.next().await.unwrap(), w);
2321 /// # }
2322 /// # }));
2323 /// # }
2324 /// ```
2325 pub fn merge_ordered<R2: Retries>(
2326 self,
2327 other: Stream<T, L, B, TotalOrder, R2>,
2328 _nondet: NonDet,
2329 ) -> Stream<T, L, B, TotalOrder, <R as MinRetries<R2>>::Min>
2330 where
2331 R: MinRetries<R2>,
2332 {
2333 Stream::new(
2334 self.location.clone(),
2335 HydroNode::MergeOrdered {
2336 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2337 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2338 metadata: self.location.new_node_metadata(Stream::<
2339 T,
2340 L,
2341 B,
2342 TotalOrder,
2343 <R as MinRetries<R2>>::Min,
2344 >::collection_kind()),
2345 },
2346 )
2347 }
2348}
2349
2350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2351where
2352 L: Location<'a>,
2353{
2354 /// Produces a new stream that emits the input elements in sorted order.
2355 ///
2356 /// The input stream can have any ordering guarantee, but the output stream
2357 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2358 /// elements in the input stream are available, so it requires the input stream
2359 /// to be [`Bounded`].
2360 ///
2361 /// # Example
2362 /// ```rust
2363 /// # #[cfg(feature = "deploy")] {
2364 /// # use hydro_lang::prelude::*;
2365 /// # use futures::StreamExt;
2366 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2367 /// let tick = process.tick();
2368 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2369 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2370 /// batch.sort().all_ticks()
2371 /// # }, |mut stream| async move {
2372 /// // 1, 2, 3, 4
2373 /// # for w in (1..5) {
2374 /// # assert_eq!(stream.next().await.unwrap(), w);
2375 /// # }
2376 /// # }));
2377 /// # }
2378 /// ```
2379 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2380 where
2381 B: IsBounded,
2382 T: Ord,
2383 {
2384 let this = self.make_bounded();
2385 Stream::new(
2386 this.location.clone(),
2387 HydroNode::Sort {
2388 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2389 metadata: this
2390 .location
2391 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2392 },
2393 )
2394 }
2395
2396 /// Produces a new stream that first emits the elements of the `self` stream,
2397 /// and then emits the elements of the `other` stream. The output stream has
2398 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2399 /// [`TotalOrder`] guarantee.
2400 ///
2401 /// Currently, both input streams must be [`Bounded`]. This operator will block
2402 /// on the first stream until all its elements are available. In a future version,
2403 /// we will relax the requirement on the `other` stream.
2404 ///
2405 /// # Example
2406 /// ```rust
2407 /// # #[cfg(feature = "deploy")] {
2408 /// # use hydro_lang::prelude::*;
2409 /// # use futures::StreamExt;
2410 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2411 /// let tick = process.tick();
2412 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2413 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2414 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2415 /// # }, |mut stream| async move {
2416 /// // 2, 3, 4, 5, 1, 2, 3, 4
2417 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2418 /// # assert_eq!(stream.next().await.unwrap(), w);
2419 /// # }
2420 /// # }));
2421 /// # }
2422 /// ```
2423 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2424 self,
2425 other: Stream<T, L, B2, O2, R2>,
2426 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2427 where
2428 B: IsBounded,
2429 O: MinOrder<O2>,
2430 R: MinRetries<R2>,
2431 {
2432 check_matching_location(&self.location, &other.location);
2433
2434 Stream::new(
2435 self.location.clone(),
2436 HydroNode::Chain {
2437 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2438 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2439 metadata: self.location.new_node_metadata(Stream::<
2440 T,
2441 L,
2442 B2,
2443 <O as MinOrder<O2>>::Min,
2444 <R as MinRetries<R2>>::Min,
2445 >::collection_kind()),
2446 },
2447 )
2448 }
2449
2450 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2451 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2452 /// because this is compiled into a nested loop.
2453 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2454 self,
2455 other: Stream<T2, L, Bounded, O2, R>,
2456 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2457 where
2458 B: IsBounded,
2459 T: Clone,
2460 T2: Clone,
2461 {
2462 let this = self.make_bounded();
2463 check_matching_location(&this.location, &other.location);
2464
2465 Stream::new(
2466 this.location.clone(),
2467 HydroNode::CrossProduct {
2468 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2469 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2470 metadata: this.location.new_node_metadata(Stream::<
2471 (T, T2),
2472 L,
2473 Bounded,
2474 <O2 as MinOrder<O>>::Min,
2475 R,
2476 >::collection_kind()),
2477 },
2478 )
2479 }
2480
2481 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2482 /// `self` used as the values for *each* key.
2483 ///
2484 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2485 /// values. For example, it can be used to send the same set of elements to several cluster
2486 /// members, if the membership information is available as a [`KeyedSingleton`].
2487 ///
2488 /// # Example
2489 /// ```rust
2490 /// # #[cfg(feature = "deploy")] {
2491 /// # use hydro_lang::prelude::*;
2492 /// # use futures::StreamExt;
2493 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2494 /// # let tick = process.tick();
2495 /// let keyed_singleton = // { 1: (), 2: () }
2496 /// # process
2497 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2498 /// # .into_keyed()
2499 /// # .batch(&tick, nondet!(/** test */))
2500 /// # .first();
2501 /// let stream = // [ "a", "b" ]
2502 /// # process
2503 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2504 /// # .batch(&tick, nondet!(/** test */));
2505 /// stream.repeat_with_keys(keyed_singleton)
2506 /// # .entries().all_ticks()
2507 /// # }, |mut stream| async move {
2508 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2509 /// # let mut results = Vec::new();
2510 /// # for _ in 0..4 {
2511 /// # results.push(stream.next().await.unwrap());
2512 /// # }
2513 /// # results.sort();
2514 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2515 /// # }));
2516 /// # }
2517 /// ```
2518 pub fn repeat_with_keys<K, V2>(
2519 self,
2520 keys: KeyedSingleton<K, V2, L, Bounded>,
2521 ) -> KeyedStream<K, T, L, Bounded, O, R>
2522 where
2523 B: IsBounded,
2524 K: Clone,
2525 T: Clone,
2526 {
2527 keys.keys()
2528 .weaken_retries()
2529 .assume_ordering_trusted::<TotalOrder>(
2530 nondet!(/** keyed stream does not depend on ordering of keys */),
2531 )
2532 .cross_product_nested_loop(self.make_bounded())
2533 .into_keyed()
2534 }
2535
2536 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2537 /// execution until all results are available. The output order is based on when futures
2538 /// complete, and may be different than the input order.
2539 ///
2540 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2541 /// while futures are pending, this variant blocks until the futures resolve.
2542 ///
2543 /// # Example
2544 /// ```rust
2545 /// # #[cfg(feature = "deploy")] {
2546 /// # use std::collections::HashSet;
2547 /// # use futures::StreamExt;
2548 /// # use hydro_lang::prelude::*;
2549 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2550 /// process
2551 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2552 /// .map(q!(|x| async move {
2553 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2554 /// x
2555 /// }))
2556 /// .resolve_futures_blocking()
2557 /// # },
2558 /// # |mut stream| async move {
2559 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2560 /// # let mut output = HashSet::new();
2561 /// # for _ in 1..10 {
2562 /// # output.insert(stream.next().await.unwrap());
2563 /// # }
2564 /// # assert_eq!(
2565 /// # output,
2566 /// # HashSet::<i32>::from_iter(1..10)
2567 /// # );
2568 /// # },
2569 /// # ));
2570 /// # }
2571 /// ```
2572 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2573 where
2574 T: Future,
2575 {
2576 Stream::new(
2577 self.location.clone(),
2578 HydroNode::ResolveFuturesBlocking {
2579 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2580 metadata: self
2581 .location
2582 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2583 },
2584 )
2585 }
2586
2587 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2588 ///
2589 /// # Example
2590 /// ```rust
2591 /// # #[cfg(feature = "deploy")] {
2592 /// # use hydro_lang::prelude::*;
2593 /// # use futures::StreamExt;
2594 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2595 /// let tick = process.tick();
2596 /// let empty: Stream<i32, _, Bounded> = process
2597 /// .source_iter(q!(Vec::<i32>::new()))
2598 /// .batch(&tick, nondet!(/** test */));
2599 /// empty.is_empty().all_ticks()
2600 /// # }, |mut stream| async move {
2601 /// // true
2602 /// # assert_eq!(stream.next().await.unwrap(), true);
2603 /// # }));
2604 /// # }
2605 /// ```
2606 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2607 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2608 where
2609 B: IsBounded,
2610 {
2611 self.make_bounded()
2612 .assume_ordering_trusted::<TotalOrder>(
2613 nondet!(/** is_empty intermediates unaffected by order */),
2614 )
2615 .first()
2616 .is_none()
2617 }
2618}
2619
2620impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2621where
2622 L: Location<'a>,
2623{
2624 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2625 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2626 /// by equi-joining the two streams on the key attribute `K`.
2627 ///
2628 /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2629 /// and streams the left side through, preserving the left side's ordering. When both
2630 /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2631 ///
2632 /// # Example
2633 /// ```rust
2634 /// # #[cfg(feature = "deploy")] {
2635 /// # use hydro_lang::prelude::*;
2636 /// # use std::collections::HashSet;
2637 /// # use futures::StreamExt;
2638 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2639 /// let tick = process.tick();
2640 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2641 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2642 /// stream1.join(stream2)
2643 /// # }, |mut stream| async move {
2644 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2645 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2646 /// # stream.map(|i| assert!(expected.contains(&i)));
2647 /// # }));
2648 /// # }
2649 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2650 self,
2651 n: Stream<(K, V2), L, B2, O2, R2>,
2652 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2653 where
2654 K: Eq + Hash + Clone,
2655 R: MinRetries<R2>,
2656 V1: Clone,
2657 V2: Clone,
2658 {
2659 check_matching_location(&self.location, &n.location);
2660
2661 let ir_node = if B2::BOUNDED {
2662 HydroNode::JoinHalf {
2663 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2664 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2665 metadata: self.location.new_node_metadata(Stream::<
2666 (K, (V1, V2)),
2667 L,
2668 B,
2669 B2::PreserveOrderIfBounded<O>,
2670 <R as MinRetries<R2>>::Min,
2671 >::collection_kind()),
2672 }
2673 } else {
2674 HydroNode::Join {
2675 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2676 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2677 metadata: self.location.new_node_metadata(Stream::<
2678 (K, (V1, V2)),
2679 L,
2680 B,
2681 B2::PreserveOrderIfBounded<O>,
2682 <R as MinRetries<R2>>::Min,
2683 >::collection_kind()),
2684 }
2685 };
2686
2687 Stream::new(self.location.clone(), ir_node)
2688 }
2689
2690 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2691 /// computes the anti-join of the items in the input -- i.e. returns
2692 /// unique items in the first input that do not have a matching key
2693 /// in the second input.
2694 ///
2695 /// # Example
2696 /// ```rust
2697 /// # #[cfg(feature = "deploy")] {
2698 /// # use hydro_lang::prelude::*;
2699 /// # use futures::StreamExt;
2700 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2701 /// let tick = process.tick();
2702 /// let stream = process
2703 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2704 /// .batch(&tick, nondet!(/** test */));
2705 /// let batch = process
2706 /// .source_iter(q!(vec![1, 2]))
2707 /// .batch(&tick, nondet!(/** test */));
2708 /// stream.anti_join(batch).all_ticks()
2709 /// # }, |mut stream| async move {
2710 /// # for w in vec![(3, 'c'), (4, 'd')] {
2711 /// # assert_eq!(stream.next().await.unwrap(), w);
2712 /// # }
2713 /// # }));
2714 /// # }
2715 pub fn anti_join<O2: Ordering, R2: Retries>(
2716 self,
2717 n: Stream<K, L, Bounded, O2, R2>,
2718 ) -> Stream<(K, V1), L, B, O, R>
2719 where
2720 K: Eq + Hash,
2721 {
2722 check_matching_location(&self.location, &n.location);
2723
2724 Stream::new(
2725 self.location.clone(),
2726 HydroNode::AntiJoin {
2727 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2728 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2729 metadata: self
2730 .location
2731 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2732 },
2733 )
2734 }
2735}
2736
2737impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2738 Stream<(K, V), L, B, O, R>
2739{
2740 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2741 /// is used as the key and the second element is added to the entries associated with that key.
2742 ///
2743 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2744 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2745 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2746 /// total ordering _within_ each group but no ordering _across_ groups.
2747 ///
2748 /// # Example
2749 /// ```rust
2750 /// # #[cfg(feature = "deploy")] {
2751 /// # use hydro_lang::prelude::*;
2752 /// # use futures::StreamExt;
2753 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2754 /// process
2755 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2756 /// .into_keyed()
2757 /// # .entries()
2758 /// # }, |mut stream| async move {
2759 /// // { 1: [2, 3], 2: [4] }
2760 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2761 /// # assert_eq!(stream.next().await.unwrap(), w);
2762 /// # }
2763 /// # }));
2764 /// # }
2765 /// ```
2766 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2767 KeyedStream::new(
2768 self.location.clone(),
2769 HydroNode::Cast {
2770 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2771 metadata: self
2772 .location
2773 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2774 },
2775 )
2776 }
2777}
2778
2779impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2780where
2781 K: Eq + Hash,
2782 L: Location<'a>,
2783{
2784 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2785 /// # Example
2786 /// ```rust
2787 /// # #[cfg(feature = "deploy")] {
2788 /// # use hydro_lang::prelude::*;
2789 /// # use futures::StreamExt;
2790 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2791 /// let tick = process.tick();
2792 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2793 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2794 /// batch.keys().all_ticks()
2795 /// # }, |mut stream| async move {
2796 /// // 1, 2
2797 /// # assert_eq!(stream.next().await.unwrap(), 1);
2798 /// # assert_eq!(stream.next().await.unwrap(), 2);
2799 /// # }));
2800 /// # }
2801 /// ```
2802 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2803 self.into_keyed()
2804 .fold(
2805 q!(|| ()),
2806 q!(
2807 |_, _| {},
2808 commutative = manual_proof!(/** values are ignored */),
2809 idempotent = manual_proof!(/** values are ignored */)
2810 ),
2811 )
2812 .keys()
2813 }
2814}
2815
2816impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2817where
2818 L: Location<'a> + NoTick,
2819{
2820 /// Returns a stream corresponding to the latest batch of elements being atomically
2821 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2822 /// the order of the input.
2823 ///
2824 /// # Non-Determinism
2825 /// The batch boundaries are non-deterministic and may change across executions.
2826 pub fn batch_atomic(
2827 self,
2828 tick: &Tick<L>,
2829 _nondet: NonDet,
2830 ) -> Stream<T, Tick<L>, Bounded, O, R> {
2831 Stream::new(
2832 tick.clone(),
2833 HydroNode::Batch {
2834 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2835 metadata: tick
2836 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2837 },
2838 )
2839 }
2840
2841 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2842 /// See [`Stream::atomic`] for more details.
2843 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2844 Stream::new(
2845 self.location.tick.l.clone(),
2846 HydroNode::EndAtomic {
2847 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2848 metadata: self
2849 .location
2850 .tick
2851 .l
2852 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2853 },
2854 )
2855 }
2856}
2857
2858impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2859where
2860 L: Location<'a> + NoTick + NoAtomic,
2861 F: Future<Output = T>,
2862{
2863 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2864 /// Future outputs are produced as available, regardless of input arrival order.
2865 ///
2866 /// # Example
2867 /// ```rust
2868 /// # #[cfg(feature = "deploy")] {
2869 /// # use std::collections::HashSet;
2870 /// # use futures::StreamExt;
2871 /// # use hydro_lang::prelude::*;
2872 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2873 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2874 /// .map(q!(|x| async move {
2875 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2876 /// x
2877 /// }))
2878 /// .resolve_futures()
2879 /// # },
2880 /// # |mut stream| async move {
2881 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2882 /// # let mut output = HashSet::new();
2883 /// # for _ in 1..10 {
2884 /// # output.insert(stream.next().await.unwrap());
2885 /// # }
2886 /// # assert_eq!(
2887 /// # output,
2888 /// # HashSet::<i32>::from_iter(1..10)
2889 /// # );
2890 /// # },
2891 /// # ));
2892 /// # }
2893 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2894 Stream::new(
2895 self.location.clone(),
2896 HydroNode::ResolveFutures {
2897 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2898 metadata: self
2899 .location
2900 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2901 },
2902 )
2903 }
2904
2905 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2906 /// Future outputs are produced in the same order as the input stream.
2907 ///
2908 /// # Example
2909 /// ```rust
2910 /// # #[cfg(feature = "deploy")] {
2911 /// # use std::collections::HashSet;
2912 /// # use futures::StreamExt;
2913 /// # use hydro_lang::prelude::*;
2914 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2915 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2916 /// .map(q!(|x| async move {
2917 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2918 /// x
2919 /// }))
2920 /// .resolve_futures_ordered()
2921 /// # },
2922 /// # |mut stream| async move {
2923 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2924 /// # let mut output = Vec::new();
2925 /// # for _ in 1..10 {
2926 /// # output.push(stream.next().await.unwrap());
2927 /// # }
2928 /// # assert_eq!(
2929 /// # output,
2930 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2931 /// # );
2932 /// # },
2933 /// # ));
2934 /// # }
2935 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2936 Stream::new(
2937 self.location.clone(),
2938 HydroNode::ResolveFuturesOrdered {
2939 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2940 metadata: self
2941 .location
2942 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2943 },
2944 )
2945 }
2946}
2947
2948impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2949where
2950 L: Location<'a>,
2951{
2952 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2953 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2954 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2955 Stream::new(
2956 self.location.outer().clone(),
2957 HydroNode::YieldConcat {
2958 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2959 metadata: self
2960 .location
2961 .outer()
2962 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2963 },
2964 )
2965 }
2966
2967 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2968 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2969 ///
2970 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2971 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2972 /// stream's [`Tick`] context.
2973 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2974 let out_location = Atomic {
2975 tick: self.location.clone(),
2976 };
2977
2978 Stream::new(
2979 out_location.clone(),
2980 HydroNode::YieldConcat {
2981 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2982 metadata: out_location
2983 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2984 },
2985 )
2986 }
2987
2988 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2989 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2990 /// input.
2991 ///
2992 /// This API is particularly useful for stateful computation on batches of data, such as
2993 /// maintaining an accumulated state that is up to date with the current batch.
2994 ///
2995 /// # Example
2996 /// ```rust
2997 /// # #[cfg(feature = "deploy")] {
2998 /// # use hydro_lang::prelude::*;
2999 /// # use futures::StreamExt;
3000 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3001 /// let tick = process.tick();
3002 /// # // ticks are lazy by default, forces the second tick to run
3003 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3004 /// # let batch_first_tick = process
3005 /// # .source_iter(q!(vec![1, 2, 3, 4]))
3006 /// # .batch(&tick, nondet!(/** test */));
3007 /// # let batch_second_tick = process
3008 /// # .source_iter(q!(vec![5, 6, 7]))
3009 /// # .batch(&tick, nondet!(/** test */))
3010 /// # .defer_tick(); // appears on the second tick
3011 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3012 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3013 ///
3014 /// input.batch(&tick, nondet!(/** test */))
3015 /// .across_ticks(|s| s.count()).all_ticks()
3016 /// # }, |mut stream| async move {
3017 /// // [4, 7]
3018 /// assert_eq!(stream.next().await.unwrap(), 4);
3019 /// assert_eq!(stream.next().await.unwrap(), 7);
3020 /// # }));
3021 /// # }
3022 /// ```
3023 pub fn across_ticks<Out: BatchAtomic>(
3024 self,
3025 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3026 ) -> Out::Batched {
3027 thunk(self.all_ticks_atomic()).batched_atomic()
3028 }
3029
3030 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3031 /// always has the elements of `self` at tick `T - 1`.
3032 ///
3033 /// At tick `0`, the output stream is empty, since there is no previous tick.
3034 ///
3035 /// This operator enables stateful iterative processing with ticks, by sending data from one
3036 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3037 ///
3038 /// # Example
3039 /// ```rust
3040 /// # #[cfg(feature = "deploy")] {
3041 /// # use hydro_lang::prelude::*;
3042 /// # use futures::StreamExt;
3043 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3044 /// let tick = process.tick();
3045 /// // ticks are lazy by default, forces the second tick to run
3046 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3047 ///
3048 /// let batch_first_tick = process
3049 /// .source_iter(q!(vec![1, 2, 3, 4]))
3050 /// .batch(&tick, nondet!(/** test */));
3051 /// let batch_second_tick = process
3052 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
3053 /// .batch(&tick, nondet!(/** test */))
3054 /// .defer_tick(); // appears on the second tick
3055 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3056 ///
3057 /// changes_across_ticks.clone().filter_not_in(
3058 /// changes_across_ticks.defer_tick() // the elements from the previous tick
3059 /// ).all_ticks()
3060 /// # }, |mut stream| async move {
3061 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3062 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3063 /// # assert_eq!(stream.next().await.unwrap(), w);
3064 /// # }
3065 /// # }));
3066 /// # }
3067 /// ```
3068 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3069 Stream::new(
3070 self.location.clone(),
3071 HydroNode::DeferTick {
3072 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3073 metadata: self
3074 .location
3075 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3076 },
3077 )
3078 }
3079}
3080
3081#[cfg(test)]
3082mod tests {
3083 #[cfg(feature = "deploy")]
3084 use futures::{SinkExt, StreamExt};
3085 #[cfg(feature = "deploy")]
3086 use hydro_deploy::Deployment;
3087 #[cfg(feature = "deploy")]
3088 use serde::{Deserialize, Serialize};
3089 #[cfg(any(feature = "deploy", feature = "sim"))]
3090 use stageleft::q;
3091
3092 #[cfg(any(feature = "deploy", feature = "sim"))]
3093 use crate::compile::builder::FlowBuilder;
3094 #[cfg(feature = "deploy")]
3095 use crate::live_collections::sliced::sliced;
3096 #[cfg(feature = "deploy")]
3097 use crate::live_collections::stream::ExactlyOnce;
3098 #[cfg(feature = "sim")]
3099 use crate::live_collections::stream::NoOrder;
3100 #[cfg(any(feature = "deploy", feature = "sim"))]
3101 use crate::live_collections::stream::TotalOrder;
3102 #[cfg(any(feature = "deploy", feature = "sim"))]
3103 use crate::location::Location;
3104 #[cfg(feature = "sim")]
3105 use crate::networking::TCP;
3106 #[cfg(any(feature = "deploy", feature = "sim"))]
3107 use crate::nondet::nondet;
3108
3109 mod backtrace_chained_ops;
3110
3111 #[cfg(feature = "deploy")]
3112 struct P1 {}
3113 #[cfg(feature = "deploy")]
3114 struct P2 {}
3115
3116 #[cfg(feature = "deploy")]
3117 #[derive(Serialize, Deserialize, Debug)]
3118 struct SendOverNetwork {
3119 n: u32,
3120 }
3121
3122 #[cfg(feature = "deploy")]
3123 #[tokio::test]
3124 async fn first_ten_distributed() {
3125 use crate::networking::TCP;
3126
3127 let mut deployment = Deployment::new();
3128
3129 let mut flow = FlowBuilder::new();
3130 let first_node = flow.process::<P1>();
3131 let second_node = flow.process::<P2>();
3132 let external = flow.external::<P2>();
3133
3134 let numbers = first_node.source_iter(q!(0..10));
3135 let out_port = numbers
3136 .map(q!(|n| SendOverNetwork { n }))
3137 .send(&second_node, TCP.fail_stop().bincode())
3138 .send_bincode_external(&external);
3139
3140 let nodes = flow
3141 .with_process(&first_node, deployment.Localhost())
3142 .with_process(&second_node, deployment.Localhost())
3143 .with_external(&external, deployment.Localhost())
3144 .deploy(&mut deployment);
3145
3146 deployment.deploy().await.unwrap();
3147
3148 let mut external_out = nodes.connect(out_port).await;
3149
3150 deployment.start().await.unwrap();
3151
3152 for i in 0..10 {
3153 assert_eq!(external_out.next().await.unwrap().n, i);
3154 }
3155 }
3156
3157 #[cfg(feature = "deploy")]
3158 #[tokio::test]
3159 async fn first_cardinality() {
3160 let mut deployment = Deployment::new();
3161
3162 let mut flow = FlowBuilder::new();
3163 let node = flow.process::<()>();
3164 let external = flow.external::<()>();
3165
3166 let node_tick = node.tick();
3167 let count = node_tick
3168 .singleton(q!([1, 2, 3]))
3169 .into_stream()
3170 .flatten_ordered()
3171 .first()
3172 .into_stream()
3173 .count()
3174 .all_ticks()
3175 .send_bincode_external(&external);
3176
3177 let nodes = flow
3178 .with_process(&node, deployment.Localhost())
3179 .with_external(&external, deployment.Localhost())
3180 .deploy(&mut deployment);
3181
3182 deployment.deploy().await.unwrap();
3183
3184 let mut external_out = nodes.connect(count).await;
3185
3186 deployment.start().await.unwrap();
3187
3188 assert_eq!(external_out.next().await.unwrap(), 1);
3189 }
3190
3191 #[cfg(feature = "deploy")]
3192 #[tokio::test]
3193 async fn unbounded_reduce_remembers_state() {
3194 let mut deployment = Deployment::new();
3195
3196 let mut flow = FlowBuilder::new();
3197 let node = flow.process::<()>();
3198 let external = flow.external::<()>();
3199
3200 let (input_port, input) = node.source_external_bincode(&external);
3201 let out = input
3202 .reduce(q!(|acc, v| *acc += v))
3203 .sample_eager(nondet!(/** test */))
3204 .send_bincode_external(&external);
3205
3206 let nodes = flow
3207 .with_process(&node, deployment.Localhost())
3208 .with_external(&external, deployment.Localhost())
3209 .deploy(&mut deployment);
3210
3211 deployment.deploy().await.unwrap();
3212
3213 let mut external_in = nodes.connect(input_port).await;
3214 let mut external_out = nodes.connect(out).await;
3215
3216 deployment.start().await.unwrap();
3217
3218 external_in.send(1).await.unwrap();
3219 assert_eq!(external_out.next().await.unwrap(), 1);
3220
3221 external_in.send(2).await.unwrap();
3222 assert_eq!(external_out.next().await.unwrap(), 3);
3223 }
3224
3225 #[cfg(feature = "deploy")]
3226 #[tokio::test]
3227 async fn top_level_bounded_cross_singleton() {
3228 let mut deployment = Deployment::new();
3229
3230 let mut flow = FlowBuilder::new();
3231 let node = flow.process::<()>();
3232 let external = flow.external::<()>();
3233
3234 let (input_port, input) =
3235 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3236
3237 let out = input
3238 .cross_singleton(
3239 node.source_iter(q!(vec![1, 2, 3]))
3240 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3241 )
3242 .send_bincode_external(&external);
3243
3244 let nodes = flow
3245 .with_process(&node, deployment.Localhost())
3246 .with_external(&external, deployment.Localhost())
3247 .deploy(&mut deployment);
3248
3249 deployment.deploy().await.unwrap();
3250
3251 let mut external_in = nodes.connect(input_port).await;
3252 let mut external_out = nodes.connect(out).await;
3253
3254 deployment.start().await.unwrap();
3255
3256 external_in.send(1).await.unwrap();
3257 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3258
3259 external_in.send(2).await.unwrap();
3260 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3261 }
3262
3263 #[cfg(feature = "deploy")]
3264 #[tokio::test]
3265 async fn top_level_bounded_reduce_cardinality() {
3266 let mut deployment = Deployment::new();
3267
3268 let mut flow = FlowBuilder::new();
3269 let node = flow.process::<()>();
3270 let external = flow.external::<()>();
3271
3272 let (input_port, input) =
3273 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3274
3275 let out = sliced! {
3276 let input = use(input, nondet!(/** test */));
3277 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3278 input.cross_singleton(v.into_stream().count())
3279 }
3280 .send_bincode_external(&external);
3281
3282 let nodes = flow
3283 .with_process(&node, deployment.Localhost())
3284 .with_external(&external, deployment.Localhost())
3285 .deploy(&mut deployment);
3286
3287 deployment.deploy().await.unwrap();
3288
3289 let mut external_in = nodes.connect(input_port).await;
3290 let mut external_out = nodes.connect(out).await;
3291
3292 deployment.start().await.unwrap();
3293
3294 external_in.send(1).await.unwrap();
3295 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3296
3297 external_in.send(2).await.unwrap();
3298 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3299 }
3300
3301 #[cfg(feature = "deploy")]
3302 #[tokio::test]
3303 async fn top_level_bounded_into_singleton_cardinality() {
3304 let mut deployment = Deployment::new();
3305
3306 let mut flow = FlowBuilder::new();
3307 let node = flow.process::<()>();
3308 let external = flow.external::<()>();
3309
3310 let (input_port, input) =
3311 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3312
3313 let out = sliced! {
3314 let input = use(input, nondet!(/** test */));
3315 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3316 input.cross_singleton(v.into_stream().count())
3317 }
3318 .send_bincode_external(&external);
3319
3320 let nodes = flow
3321 .with_process(&node, deployment.Localhost())
3322 .with_external(&external, deployment.Localhost())
3323 .deploy(&mut deployment);
3324
3325 deployment.deploy().await.unwrap();
3326
3327 let mut external_in = nodes.connect(input_port).await;
3328 let mut external_out = nodes.connect(out).await;
3329
3330 deployment.start().await.unwrap();
3331
3332 external_in.send(1).await.unwrap();
3333 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3334
3335 external_in.send(2).await.unwrap();
3336 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3337 }
3338
3339 #[cfg(feature = "deploy")]
3340 #[tokio::test]
3341 async fn atomic_fold_replays_each_tick() {
3342 let mut deployment = Deployment::new();
3343
3344 let mut flow = FlowBuilder::new();
3345 let node = flow.process::<()>();
3346 let external = flow.external::<()>();
3347
3348 let (input_port, input) =
3349 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3350 let tick = node.tick();
3351
3352 let out = input
3353 .batch(&tick, nondet!(/** test */))
3354 .cross_singleton(
3355 node.source_iter(q!(vec![1, 2, 3]))
3356 .atomic()
3357 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3358 .snapshot_atomic(&tick, nondet!(/** test */)),
3359 )
3360 .all_ticks()
3361 .send_bincode_external(&external);
3362
3363 let nodes = flow
3364 .with_process(&node, deployment.Localhost())
3365 .with_external(&external, deployment.Localhost())
3366 .deploy(&mut deployment);
3367
3368 deployment.deploy().await.unwrap();
3369
3370 let mut external_in = nodes.connect(input_port).await;
3371 let mut external_out = nodes.connect(out).await;
3372
3373 deployment.start().await.unwrap();
3374
3375 external_in.send(1).await.unwrap();
3376 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3377
3378 external_in.send(2).await.unwrap();
3379 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3380 }
3381
3382 #[cfg(feature = "deploy")]
3383 #[tokio::test]
3384 async fn unbounded_scan_remembers_state() {
3385 let mut deployment = Deployment::new();
3386
3387 let mut flow = FlowBuilder::new();
3388 let node = flow.process::<()>();
3389 let external = flow.external::<()>();
3390
3391 let (input_port, input) = node.source_external_bincode(&external);
3392 let out = input
3393 .scan(
3394 q!(|| 0),
3395 q!(|acc, v| {
3396 *acc += v;
3397 Some(*acc)
3398 }),
3399 )
3400 .send_bincode_external(&external);
3401
3402 let nodes = flow
3403 .with_process(&node, deployment.Localhost())
3404 .with_external(&external, deployment.Localhost())
3405 .deploy(&mut deployment);
3406
3407 deployment.deploy().await.unwrap();
3408
3409 let mut external_in = nodes.connect(input_port).await;
3410 let mut external_out = nodes.connect(out).await;
3411
3412 deployment.start().await.unwrap();
3413
3414 external_in.send(1).await.unwrap();
3415 assert_eq!(external_out.next().await.unwrap(), 1);
3416
3417 external_in.send(2).await.unwrap();
3418 assert_eq!(external_out.next().await.unwrap(), 3);
3419 }
3420
3421 #[cfg(feature = "deploy")]
3422 #[tokio::test]
3423 async fn unbounded_enumerate_remembers_state() {
3424 let mut deployment = Deployment::new();
3425
3426 let mut flow = FlowBuilder::new();
3427 let node = flow.process::<()>();
3428 let external = flow.external::<()>();
3429
3430 let (input_port, input) = node.source_external_bincode(&external);
3431 let out = input.enumerate().send_bincode_external(&external);
3432
3433 let nodes = flow
3434 .with_process(&node, deployment.Localhost())
3435 .with_external(&external, deployment.Localhost())
3436 .deploy(&mut deployment);
3437
3438 deployment.deploy().await.unwrap();
3439
3440 let mut external_in = nodes.connect(input_port).await;
3441 let mut external_out = nodes.connect(out).await;
3442
3443 deployment.start().await.unwrap();
3444
3445 external_in.send(1).await.unwrap();
3446 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3447
3448 external_in.send(2).await.unwrap();
3449 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3450 }
3451
3452 #[cfg(feature = "deploy")]
3453 #[tokio::test]
3454 async fn unbounded_unique_remembers_state() {
3455 let mut deployment = Deployment::new();
3456
3457 let mut flow = FlowBuilder::new();
3458 let node = flow.process::<()>();
3459 let external = flow.external::<()>();
3460
3461 let (input_port, input) =
3462 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3463 let out = input.unique().send_bincode_external(&external);
3464
3465 let nodes = flow
3466 .with_process(&node, deployment.Localhost())
3467 .with_external(&external, deployment.Localhost())
3468 .deploy(&mut deployment);
3469
3470 deployment.deploy().await.unwrap();
3471
3472 let mut external_in = nodes.connect(input_port).await;
3473 let mut external_out = nodes.connect(out).await;
3474
3475 deployment.start().await.unwrap();
3476
3477 external_in.send(1).await.unwrap();
3478 assert_eq!(external_out.next().await.unwrap(), 1);
3479
3480 external_in.send(2).await.unwrap();
3481 assert_eq!(external_out.next().await.unwrap(), 2);
3482
3483 external_in.send(1).await.unwrap();
3484 external_in.send(3).await.unwrap();
3485 assert_eq!(external_out.next().await.unwrap(), 3);
3486 }
3487
3488 #[cfg(feature = "sim")]
3489 #[test]
3490 #[should_panic]
3491 fn sim_batch_nondet_size() {
3492 let mut flow = FlowBuilder::new();
3493 let node = flow.process::<()>();
3494
3495 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3496
3497 let tick = node.tick();
3498 let out_recv = input
3499 .batch(&tick, nondet!(/** test */))
3500 .count()
3501 .all_ticks()
3502 .sim_output();
3503
3504 flow.sim().exhaustive(async || {
3505 in_send.send(());
3506 in_send.send(());
3507 in_send.send(());
3508
3509 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3510 });
3511 }
3512
3513 #[cfg(feature = "sim")]
3514 #[test]
3515 fn sim_batch_preserves_order() {
3516 let mut flow = FlowBuilder::new();
3517 let node = flow.process::<()>();
3518
3519 let (in_send, input) = node.sim_input();
3520
3521 let tick = node.tick();
3522 let out_recv = input
3523 .batch(&tick, nondet!(/** test */))
3524 .all_ticks()
3525 .sim_output();
3526
3527 flow.sim().exhaustive(async || {
3528 in_send.send(1);
3529 in_send.send(2);
3530 in_send.send(3);
3531
3532 out_recv.assert_yields_only([1, 2, 3]).await;
3533 });
3534 }
3535
3536 #[cfg(feature = "sim")]
3537 #[test]
3538 #[should_panic]
3539 fn sim_batch_unordered_shuffles() {
3540 let mut flow = FlowBuilder::new();
3541 let node = flow.process::<()>();
3542
3543 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3544
3545 let tick = node.tick();
3546 let batch = input.batch(&tick, nondet!(/** test */));
3547 let out_recv = batch
3548 .clone()
3549 .min()
3550 .zip(batch.max())
3551 .all_ticks()
3552 .sim_output();
3553
3554 flow.sim().exhaustive(async || {
3555 in_send.send_many_unordered([1, 2, 3]);
3556
3557 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3558 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3559 }
3560 });
3561 }
3562
3563 #[cfg(feature = "sim")]
3564 #[test]
3565 fn sim_batch_unordered_shuffles_count() {
3566 let mut flow = FlowBuilder::new();
3567 let node = flow.process::<()>();
3568
3569 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3570
3571 let tick = node.tick();
3572 let batch = input.batch(&tick, nondet!(/** test */));
3573 let out_recv = batch.all_ticks().sim_output();
3574
3575 let instance_count = flow.sim().exhaustive(async || {
3576 in_send.send_many_unordered([1, 2, 3, 4]);
3577 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3578 });
3579
3580 assert_eq!(
3581 instance_count,
3582 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3583 )
3584 }
3585
3586 #[cfg(feature = "sim")]
3587 #[test]
3588 #[should_panic]
3589 fn sim_observe_order_batched() {
3590 let mut flow = FlowBuilder::new();
3591 let node = flow.process::<()>();
3592
3593 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3594
3595 let tick = node.tick();
3596 let batch = input.batch(&tick, nondet!(/** test */));
3597 let out_recv = batch
3598 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3599 .all_ticks()
3600 .sim_output();
3601
3602 flow.sim().exhaustive(async || {
3603 in_send.send_many_unordered([1, 2, 3, 4]);
3604 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3605 });
3606 }
3607
3608 #[cfg(feature = "sim")]
3609 #[test]
3610 fn sim_observe_order_batched_count() {
3611 let mut flow = FlowBuilder::new();
3612 let node = flow.process::<()>();
3613
3614 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3615
3616 let tick = node.tick();
3617 let batch = input.batch(&tick, nondet!(/** test */));
3618 let out_recv = batch
3619 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3620 .all_ticks()
3621 .sim_output();
3622
3623 let instance_count = flow.sim().exhaustive(async || {
3624 in_send.send_many_unordered([1, 2, 3, 4]);
3625 let _ = out_recv.collect::<Vec<_>>().await;
3626 });
3627
3628 assert_eq!(
3629 instance_count,
3630 192 // 4! * 2^{4 - 1}
3631 )
3632 }
3633
3634 #[cfg(feature = "sim")]
3635 #[test]
3636 fn sim_unordered_count_instance_count() {
3637 let mut flow = FlowBuilder::new();
3638 let node = flow.process::<()>();
3639
3640 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3641
3642 let tick = node.tick();
3643 let out_recv = input
3644 .count()
3645 .snapshot(&tick, nondet!(/** test */))
3646 .all_ticks()
3647 .sim_output();
3648
3649 let instance_count = flow.sim().exhaustive(async || {
3650 in_send.send_many_unordered([1, 2, 3, 4]);
3651 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3652 });
3653
3654 assert_eq!(
3655 instance_count,
3656 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3657 )
3658 }
3659
3660 #[cfg(feature = "sim")]
3661 #[test]
3662 fn sim_top_level_assume_ordering() {
3663 let mut flow = FlowBuilder::new();
3664 let node = flow.process::<()>();
3665
3666 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3667
3668 let out_recv = input
3669 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3670 .sim_output();
3671
3672 let instance_count = flow.sim().exhaustive(async || {
3673 in_send.send_many_unordered([1, 2, 3]);
3674 let mut out = out_recv.collect::<Vec<_>>().await;
3675 out.sort();
3676 assert_eq!(out, vec![1, 2, 3]);
3677 });
3678
3679 assert_eq!(instance_count, 6)
3680 }
3681
3682 #[cfg(feature = "sim")]
3683 #[test]
3684 fn sim_top_level_assume_ordering_cycle_back() {
3685 let mut flow = FlowBuilder::new();
3686 let node = flow.process::<()>();
3687 let node2 = flow.process::<()>();
3688
3689 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3690
3691 let (complete_cycle_back, cycle_back) =
3692 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3693 let ordered = input
3694 .merge_unordered(cycle_back)
3695 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3696 complete_cycle_back.complete(
3697 ordered
3698 .clone()
3699 .map(q!(|v| v + 1))
3700 .filter(q!(|v| v % 2 == 1))
3701 .send(&node2, TCP.fail_stop().bincode())
3702 .send(&node, TCP.fail_stop().bincode()),
3703 );
3704
3705 let out_recv = ordered.sim_output();
3706
3707 let mut saw = false;
3708 let instance_count = flow.sim().exhaustive(async || {
3709 in_send.send_many_unordered([0, 2]);
3710 let out = out_recv.collect::<Vec<_>>().await;
3711
3712 if out.starts_with(&[0, 1, 2]) {
3713 saw = true;
3714 }
3715 });
3716
3717 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3718 assert_eq!(instance_count, 6);
3719 }
3720
3721 #[cfg(feature = "sim")]
3722 #[test]
3723 fn sim_top_level_assume_ordering_cycle_back_tick() {
3724 let mut flow = FlowBuilder::new();
3725 let node = flow.process::<()>();
3726 let node2 = flow.process::<()>();
3727
3728 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3729
3730 let (complete_cycle_back, cycle_back) =
3731 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3732 let ordered = input
3733 .merge_unordered(cycle_back)
3734 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3735 complete_cycle_back.complete(
3736 ordered
3737 .clone()
3738 .batch(&node.tick(), nondet!(/** test */))
3739 .all_ticks()
3740 .map(q!(|v| v + 1))
3741 .filter(q!(|v| v % 2 == 1))
3742 .send(&node2, TCP.fail_stop().bincode())
3743 .send(&node, TCP.fail_stop().bincode()),
3744 );
3745
3746 let out_recv = ordered.sim_output();
3747
3748 let mut saw = false;
3749 let instance_count = flow.sim().exhaustive(async || {
3750 in_send.send_many_unordered([0, 2]);
3751 let out = out_recv.collect::<Vec<_>>().await;
3752
3753 if out.starts_with(&[0, 1, 2]) {
3754 saw = true;
3755 }
3756 });
3757
3758 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3759 assert_eq!(instance_count, 58);
3760 }
3761
3762 #[cfg(feature = "sim")]
3763 #[test]
3764 fn sim_top_level_assume_ordering_multiple() {
3765 let mut flow = FlowBuilder::new();
3766 let node = flow.process::<()>();
3767 let node2 = flow.process::<()>();
3768
3769 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3770 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3771
3772 let (complete_cycle_back, cycle_back) =
3773 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3774 let input1_ordered = input
3775 .clone()
3776 .merge_unordered(cycle_back)
3777 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3778 let foo = input1_ordered
3779 .clone()
3780 .map(q!(|v| v + 3))
3781 .weaken_ordering::<NoOrder>()
3782 .merge_unordered(input2)
3783 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3784
3785 complete_cycle_back.complete(
3786 foo.filter(q!(|v| *v == 3))
3787 .send(&node2, TCP.fail_stop().bincode())
3788 .send(&node, TCP.fail_stop().bincode()),
3789 );
3790
3791 let out_recv = input1_ordered.sim_output();
3792
3793 let mut saw = false;
3794 let instance_count = flow.sim().exhaustive(async || {
3795 in_send.send_many_unordered([0, 1]);
3796 let out = out_recv.collect::<Vec<_>>().await;
3797
3798 if out.starts_with(&[0, 3, 1]) {
3799 saw = true;
3800 }
3801 });
3802
3803 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3804 assert_eq!(instance_count, 24);
3805 }
3806
3807 #[cfg(feature = "sim")]
3808 #[test]
3809 fn sim_atomic_assume_ordering_cycle_back() {
3810 let mut flow = FlowBuilder::new();
3811 let node = flow.process::<()>();
3812 let node2 = flow.process::<()>();
3813
3814 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3815
3816 let (complete_cycle_back, cycle_back) =
3817 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3818 let ordered = input
3819 .merge_unordered(cycle_back)
3820 .atomic()
3821 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3822 .end_atomic();
3823 complete_cycle_back.complete(
3824 ordered
3825 .clone()
3826 .map(q!(|v| v + 1))
3827 .filter(q!(|v| v % 2 == 1))
3828 .send(&node2, TCP.fail_stop().bincode())
3829 .send(&node, TCP.fail_stop().bincode()),
3830 );
3831
3832 let out_recv = ordered.sim_output();
3833
3834 let instance_count = flow.sim().exhaustive(async || {
3835 in_send.send_many_unordered([0, 2]);
3836 let out = out_recv.collect::<Vec<_>>().await;
3837 assert_eq!(out.len(), 4);
3838 });
3839 assert_eq!(instance_count, 22);
3840 }
3841
3842 #[cfg(feature = "deploy")]
3843 #[tokio::test]
3844 async fn partition_evens_odds() {
3845 let mut deployment = Deployment::new();
3846
3847 let mut flow = FlowBuilder::new();
3848 let node = flow.process::<()>();
3849 let external = flow.external::<()>();
3850
3851 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3852 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3853 let evens_port = evens.send_bincode_external(&external);
3854 let odds_port = odds.send_bincode_external(&external);
3855
3856 let nodes = flow
3857 .with_process(&node, deployment.Localhost())
3858 .with_external(&external, deployment.Localhost())
3859 .deploy(&mut deployment);
3860
3861 deployment.deploy().await.unwrap();
3862
3863 let mut evens_out = nodes.connect(evens_port).await;
3864 let mut odds_out = nodes.connect(odds_port).await;
3865
3866 deployment.start().await.unwrap();
3867
3868 let mut even_results = Vec::new();
3869 for _ in 0..3 {
3870 even_results.push(evens_out.next().await.unwrap());
3871 }
3872 even_results.sort();
3873 assert_eq!(even_results, vec![2, 4, 6]);
3874
3875 let mut odd_results = Vec::new();
3876 for _ in 0..3 {
3877 odd_results.push(odds_out.next().await.unwrap());
3878 }
3879 odd_results.sort();
3880 assert_eq!(odd_results, vec![1, 3, 5]);
3881 }
3882
3883 #[cfg(feature = "deploy")]
3884 #[tokio::test]
3885 async fn unconsumed_inspect_still_runs() {
3886 use crate::deploy::DeployCrateWrapper;
3887
3888 let mut deployment = Deployment::new();
3889
3890 let mut flow = FlowBuilder::new();
3891 let node = flow.process::<()>();
3892
3893 // The return value of .inspect() is intentionally dropped.
3894 // Before the Null-root fix, this would silently do nothing.
3895 node.source_iter(q!(0..5))
3896 .inspect(q!(|x| println!("inspect: {}", x)));
3897
3898 let nodes = flow
3899 .with_process(&node, deployment.Localhost())
3900 .deploy(&mut deployment);
3901
3902 deployment.deploy().await.unwrap();
3903
3904 let mut stdout = nodes.get_process(&node).stdout();
3905
3906 deployment.start().await.unwrap();
3907
3908 let mut lines = Vec::new();
3909 for _ in 0..5 {
3910 lines.push(stdout.recv().await.unwrap());
3911 }
3912 lines.sort();
3913 assert_eq!(
3914 lines,
3915 vec![
3916 "inspect: 0",
3917 "inspect: 1",
3918 "inspect: 2",
3919 "inspect: 3",
3920 "inspect: 4",
3921 ]
3922 );
3923 }
3924
3925 #[cfg(feature = "sim")]
3926 #[test]
3927 fn sim_limit() {
3928 let mut flow = FlowBuilder::new();
3929 let node = flow.process::<()>();
3930
3931 let (in_send, input) = node.sim_input();
3932
3933 let out_recv = input.limit(q!(3)).sim_output();
3934
3935 flow.sim().exhaustive(async || {
3936 in_send.send(1);
3937 in_send.send(2);
3938 in_send.send(3);
3939 in_send.send(4);
3940 in_send.send(5);
3941
3942 out_recv.assert_yields_only([1, 2, 3]).await;
3943 });
3944 }
3945
3946 #[cfg(feature = "sim")]
3947 #[test]
3948 fn sim_limit_zero() {
3949 let mut flow = FlowBuilder::new();
3950 let node = flow.process::<()>();
3951
3952 let (in_send, input) = node.sim_input();
3953
3954 let out_recv = input.limit(q!(0)).sim_output();
3955
3956 flow.sim().exhaustive(async || {
3957 in_send.send(1);
3958 in_send.send(2);
3959
3960 out_recv.assert_yields_only::<i32, _>([]).await;
3961 });
3962 }
3963
3964 #[cfg(feature = "sim")]
3965 #[test]
3966 fn sim_merge_ordered() {
3967 let mut flow = FlowBuilder::new();
3968 let node = flow.process::<()>();
3969
3970 let (in_send, input) = node.sim_input();
3971 let (in_send2, input2) = node.sim_input();
3972
3973 let out_recv = input
3974 .merge_ordered(input2, nondet!(/** test */))
3975 .sim_output();
3976
3977 let mut saw_out_of_order = false;
3978 let instances = flow.sim().exhaustive(async || {
3979 in_send.send(1);
3980 in_send.send(2);
3981 in_send2.send(3);
3982 in_send2.send(4);
3983
3984 let out = out_recv.collect::<Vec<_>>().await;
3985
3986 if out == [1, 3, 2, 4] {
3987 saw_out_of_order = true;
3988 }
3989
3990 // Assert ordering preservation: elements from each input must
3991 // appear in their original relative order.
3992 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
3993 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
3994 assert_eq!(
3995 first_elements,
3996 vec![1, 2],
3997 "first input order violated: {:?}",
3998 out
3999 );
4000 assert_eq!(
4001 second_elements,
4002 vec![3, 4],
4003 "second input order violated: {:?}",
4004 out
4005 );
4006
4007 first_elements.append(&mut second_elements);
4008 first_elements.sort();
4009 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4010 });
4011
4012 assert!(saw_out_of_order);
4013 assert_eq!(instances, 6);
4014 }
4015
4016 /// Tests that merge_ordered passes through elements when only one input
4017 /// has data.
4018 #[cfg(feature = "sim")]
4019 #[test]
4020 fn sim_merge_ordered_one_empty() {
4021 let mut flow = FlowBuilder::new();
4022 let node = flow.process::<()>();
4023
4024 let (in_send, input) = node.sim_input();
4025 let (_in_send2, input2) = node.sim_input();
4026
4027 let out_recv = input
4028 .merge_ordered(input2, nondet!(/** test */))
4029 .sim_output();
4030
4031 let instances = flow.sim().exhaustive(async || {
4032 in_send.send(1);
4033 in_send.send(2);
4034
4035 let out = out_recv.collect::<Vec<_>>().await;
4036 assert_eq!(out, vec![1, 2]);
4037 });
4038
4039 // Only one possible interleaving when one input is empty
4040 assert_eq!(instances, 1);
4041 }
4042
4043 /// Tests that merge_ordered correctly handles feedback cycles.
4044 /// An element output from merge_ordered is filtered and cycled back to
4045 /// one of its inputs. The one-at-a-time release must allow the cycled-back
4046 /// element to arrive and potentially be emitted before elements still
4047 /// waiting on the other input.
4048 #[cfg(feature = "sim")]
4049 #[test]
4050 fn sim_merge_ordered_cycle_back() {
4051 let mut flow = FlowBuilder::new();
4052 let node = flow.process::<()>();
4053
4054 let (in_send, input) = node.sim_input();
4055
4056 // Create a forward ref for the cycle back
4057 let (complete_cycle_back, cycle_back) =
4058 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4059
4060 // merge_ordered: input (external) with cycle_back
4061 let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4062
4063 // Cycle back: elements equal to 1 get mapped to 10 and fed back
4064 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4065
4066 let out_recv = merged.sim_output();
4067
4068 // Send 1 and 2. Element 1 should cycle back as 10.
4069 // Valid orderings must have 1 before 10 (since 10 depends on 1).
4070 let mut saw_cycle_before_second = false;
4071 flow.sim().exhaustive(async || {
4072 in_send.send(1);
4073 in_send.send(2);
4074
4075 let out = out_recv.collect::<Vec<_>>().await;
4076
4077 // 10 must always come after 1 (causal dependency)
4078 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4079 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4080 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4081
4082 // Check if we see [1, 10, 2] — the cycled element beats the second input
4083 if out == [1, 10, 2] {
4084 saw_cycle_before_second = true;
4085 }
4086
4087 let mut sorted = out;
4088 sorted.sort();
4089 assert_eq!(sorted, vec![1, 2, 10]);
4090 });
4091
4092 assert!(
4093 saw_cycle_before_second,
4094 "never saw the cycled element arrive before the second input element"
4095 );
4096 }
4097
4098 /// Tests that merge_ordered correctly interleaves when one input has a
4099 /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4100 /// element 2 should be able to appear after b's elements.
4101 #[cfg(feature = "sim")]
4102 #[test]
4103 fn sim_merge_ordered_delayed() {
4104 let mut flow = FlowBuilder::new();
4105 let node = flow.process::<()>();
4106
4107 let (in_send, input) = node.sim_input();
4108 let (in_send2, input2) = node.sim_input();
4109
4110 let out_recv = input
4111 .merge_ordered(input2, nondet!(/** test */))
4112 .sim_output();
4113
4114 let mut saw_delayed_interleaving = false;
4115 flow.sim().exhaustive(async || {
4116 // Send 1 from a, and 3, 4 from b
4117 in_send.send(1);
4118 in_send2.send(3);
4119 in_send2.send(4);
4120
4121 // Collect what's available so far
4122 let first_batch = out_recv.collect::<Vec<_>>().await;
4123
4124 // Now send the delayed element 2 from a
4125 in_send.send(2);
4126 let second_batch = out_recv.collect::<Vec<_>>().await;
4127
4128 let mut all: Vec<_> = first_batch
4129 .iter()
4130 .chain(second_batch.iter())
4131 .copied()
4132 .collect();
4133
4134 // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4135 if all == [1, 3, 4, 2] {
4136 saw_delayed_interleaving = true;
4137 }
4138
4139 all.sort();
4140 assert_eq!(all, vec![1, 2, 3, 4]);
4141 });
4142
4143 assert!(saw_delayed_interleaving);
4144 }
4145
4146 /// Deploy test: merge_ordered with a delayed element on one input.
4147 /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4148 /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4149 /// both inputs are pulled and the delayed element arrives later.
4150 #[cfg(feature = "deploy")]
4151 #[tokio::test]
4152 async fn deploy_merge_ordered_delayed() {
4153 let mut deployment = Deployment::new();
4154
4155 let mut flow = FlowBuilder::new();
4156 let node = flow.process::<()>();
4157 let external = flow.external::<()>();
4158
4159 let (input_a_port, input_a) = node.source_external_bincode(&external);
4160 let (input_b_port, input_b) = node.source_external_bincode(&external);
4161
4162 let out = input_a
4163 .assume_ordering(nondet!(/** test */))
4164 .merge_ordered(
4165 input_b.assume_ordering(nondet!(/** test */)),
4166 nondet!(/** test */),
4167 )
4168 .send_bincode_external(&external);
4169
4170 let nodes = flow
4171 .with_process(&node, deployment.Localhost())
4172 .with_external(&external, deployment.Localhost())
4173 .deploy(&mut deployment);
4174
4175 deployment.deploy().await.unwrap();
4176
4177 let mut ext_a = nodes.connect(input_a_port).await;
4178 let mut ext_b = nodes.connect(input_b_port).await;
4179 let mut ext_out = nodes.connect(out).await;
4180
4181 deployment.start().await.unwrap();
4182
4183 // Send a=1, b=3, b=4
4184 ext_a.send(1).await.unwrap();
4185 ext_b.send(3).await.unwrap();
4186 ext_b.send(4).await.unwrap();
4187
4188 // Collect the first 3 elements
4189 let mut received = Vec::new();
4190 for _ in 0..3 {
4191 received.push(ext_out.next().await.unwrap());
4192 }
4193
4194 // Now send the delayed a=2
4195 ext_a.send(2).await.unwrap();
4196 received.push(ext_out.next().await.unwrap());
4197
4198 // All elements should be present
4199 received.sort();
4200 assert_eq!(received, vec![1, 2, 3, 4]);
4201 }
4202
4203 #[cfg(feature = "deploy")]
4204 #[tokio::test]
4205 async fn monotone_fold_threshold() {
4206 use crate::properties::manual_proof;
4207
4208 let mut deployment = Deployment::new();
4209
4210 let mut flow = FlowBuilder::new();
4211 let node = flow.process::<()>();
4212 let external = flow.external::<()>();
4213
4214 let in_unbounded: super::Stream<_, _> =
4215 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4216 let sum = in_unbounded.fold(
4217 q!(|| 0),
4218 q!(
4219 |sum, v| {
4220 *sum += v;
4221 },
4222 monotone = manual_proof!(/** test */)
4223 ),
4224 );
4225
4226 let threshold_out = sum
4227 .threshold_greater_or_equal(node.singleton(q!(7)))
4228 .send_bincode_external(&external);
4229
4230 let nodes = flow
4231 .with_process(&node, deployment.Localhost())
4232 .with_external(&external, deployment.Localhost())
4233 .deploy(&mut deployment);
4234
4235 deployment.deploy().await.unwrap();
4236
4237 let mut threshold_out = nodes.connect(threshold_out).await;
4238
4239 deployment.start().await.unwrap();
4240
4241 assert_eq!(threshold_out.next().await.unwrap(), 7);
4242 }
4243
4244 #[cfg(feature = "deploy")]
4245 #[tokio::test]
4246 async fn monotone_count_threshold() {
4247 let mut deployment = Deployment::new();
4248
4249 let mut flow = FlowBuilder::new();
4250 let node = flow.process::<()>();
4251 let external = flow.external::<()>();
4252
4253 let in_unbounded: super::Stream<_, _> =
4254 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4255 let sum = in_unbounded.count();
4256
4257 let threshold_out = sum
4258 .threshold_greater_or_equal(node.singleton(q!(3)))
4259 .send_bincode_external(&external);
4260
4261 let nodes = flow
4262 .with_process(&node, deployment.Localhost())
4263 .with_external(&external, deployment.Localhost())
4264 .deploy(&mut deployment);
4265
4266 deployment.deploy().await.unwrap();
4267
4268 let mut threshold_out = nodes.connect(threshold_out).await;
4269
4270 deployment.start().await.unwrap();
4271
4272 assert_eq!(threshold_out.next().await.unwrap(), 3);
4273 }
4274
4275 #[cfg(feature = "deploy")]
4276 #[tokio::test]
4277 async fn monotone_map_order_preserving_threshold() {
4278 use crate::properties::manual_proof;
4279
4280 let mut deployment = Deployment::new();
4281
4282 let mut flow = FlowBuilder::new();
4283 let node = flow.process::<()>();
4284 let external = flow.external::<()>();
4285
4286 let in_unbounded: super::Stream<_, _> =
4287 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4288 let sum = in_unbounded.fold(
4289 q!(|| 0),
4290 q!(
4291 |sum, v| {
4292 *sum += v;
4293 },
4294 monotone = manual_proof!(/** test */)
4295 ),
4296 );
4297
4298 // map with order_preserving should preserve monotonicity
4299 let doubled = sum.map(q!(
4300 |v| v * 2,
4301 order_preserving = manual_proof!(/** doubling preserves order */)
4302 ));
4303
4304 let threshold_out = doubled
4305 .threshold_greater_or_equal(node.singleton(q!(14)))
4306 .send_bincode_external(&external);
4307
4308 let nodes = flow
4309 .with_process(&node, deployment.Localhost())
4310 .with_external(&external, deployment.Localhost())
4311 .deploy(&mut deployment);
4312
4313 deployment.deploy().await.unwrap();
4314
4315 let mut threshold_out = nodes.connect(threshold_out).await;
4316
4317 deployment.start().await.unwrap();
4318
4319 assert_eq!(threshold_out.next().await.unwrap(), 14);
4320 }
4321
4322 // === Compile-time type tests for join/cross_product ordering ===
4323
4324 #[cfg(any(feature = "deploy", feature = "sim"))]
4325 mod join_ordering_type_tests {
4326 use crate::live_collections::boundedness::{Bounded, Unbounded};
4327 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4328 use crate::location::{Location, Process};
4329
4330 #[expect(dead_code, reason = "compile-time type test")]
4331 fn join_unbounded_with_bounded_preserves_order<'a>(
4332 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4333 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4334 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4335 left.join(right)
4336 }
4337
4338 #[expect(dead_code, reason = "compile-time type test")]
4339 fn join_unbounded_with_unbounded_is_no_order<'a>(
4340 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4341 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4342 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4343 left.join(right)
4344 }
4345
4346 #[expect(dead_code, reason = "compile-time type test")]
4347 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4348 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4349 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4350 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4351 left.join(right)
4352 }
4353
4354 #[expect(dead_code, reason = "compile-time type test")]
4355 fn join_unbounded_noorder_with_bounded<'a>(
4356 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4357 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4358 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4359 left.join(right)
4360 }
4361
4362 // === Compile-time type tests for cross_product ordering ===
4363
4364 #[expect(dead_code, reason = "compile-time type test")]
4365 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4366 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4367 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4368 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4369 left.cross_product(right)
4370 }
4371
4372 #[expect(dead_code, reason = "compile-time type test")]
4373 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4374 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4375 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4376 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4377 left.cross_product(right)
4378 }
4379
4380 #[expect(dead_code, reason = "compile-time type test")]
4381 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4382 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4383 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4384 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4385 left.cross_product(right)
4386 }
4387 } // mod join_ordering_type_tests
4388
4389 // === Runtime correctness tests for bounded join/cross_product ===
4390
4391 #[cfg(feature = "sim")]
4392 #[test]
4393 fn cross_product_mixed_boundedness_correctness() {
4394 use stageleft::q;
4395
4396 use crate::compile::builder::FlowBuilder;
4397 use crate::nondet::nondet;
4398
4399 let mut flow = FlowBuilder::new();
4400 let process = flow.process::<()>();
4401 let tick = process.tick();
4402
4403 let left = process.source_iter(q!(vec![1, 2]));
4404 let right = process
4405 .source_iter(q!(vec!['a', 'b']))
4406 .batch(&tick, nondet!(/** test */))
4407 .all_ticks();
4408
4409 let out = left.cross_product(right).sim_output();
4410
4411 flow.sim().exhaustive(async || {
4412 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4413 .await;
4414 });
4415 }
4416
4417 #[cfg(feature = "sim")]
4418 #[test]
4419 fn join_mixed_boundedness_correctness() {
4420 use stageleft::q;
4421
4422 use crate::compile::builder::FlowBuilder;
4423 use crate::nondet::nondet;
4424
4425 let mut flow = FlowBuilder::new();
4426 let process = flow.process::<()>();
4427 let tick = process.tick();
4428
4429 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4430 let right = process
4431 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4432 .batch(&tick, nondet!(/** test */))
4433 .all_ticks();
4434
4435 let out = left.join(right).sim_output();
4436
4437 flow.sim().exhaustive(async || {
4438 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4439 .await;
4440 });
4441 }
4442}