Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28    ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42    /// Returns the [`SingletonBoundKind`] corresponding to this type.
43    fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47    type UnderlyingBound = Unbounded;
48
49    type StreamToMonotone = Monotonic;
50
51    fn bound_kind() -> SingletonBoundKind {
52        SingletonBoundKind::Unbounded
53    }
54}
55
56impl SingletonBound for Bounded {
57    type UnderlyingBound = Bounded;
58
59    type StreamToMonotone = Bounded;
60
61    fn bound_kind() -> SingletonBoundKind {
62        SingletonBoundKind::Bounded
63    }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70    type UnderlyingBound = Unbounded;
71
72    type StreamToMonotone = Monotonic;
73
74    fn bound_kind() -> SingletonBoundKind {
75        SingletonBoundKind::Monotonic
76    }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82    label = "required here",
83    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111    pub(crate) location: Loc,
112    pub(crate) ir_node: RefCell<HydroNode>,
113    pub(crate) flow_state: FlowState,
114
115    _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119    fn drop(&mut self) {
120        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123                input: Box::new(ir_node),
124                op_metadata: HydroIrOpMetadata::new(),
125            });
126        }
127    }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132    T: Clone,
133    L: Location<'a> + NoTick,
134{
135    fn from(value: Singleton<T, L, Bounded>) -> Self {
136        let tick = value.location().tick();
137        value.clone_into_tick(&tick).latest()
138    }
139}
140
141impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
142where
143    L: Location<'a>,
144{
145    type Location = Tick<L>;
146
147    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
148        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
149            location.clone(),
150            HydroNode::DeferTick {
151                input: Box::new(HydroNode::CycleSource {
152                    cycle_id,
153                    metadata: location.new_node_metadata(Self::collection_kind()),
154                }),
155                metadata: location
156                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
157            },
158        );
159
160        from_previous_tick.unwrap_or(initial)
161    }
162}
163
164impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
165where
166    L: Location<'a>,
167{
168    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
169        assert_eq!(
170            Location::id(&self.location),
171            expected_location,
172            "locations do not match"
173        );
174        self.location
175            .flow_state()
176            .borrow_mut()
177            .push_root(HydroRoot::CycleSink {
178                cycle_id,
179                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
180                op_metadata: HydroIrOpMetadata::new(),
181            });
182    }
183}
184
185impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
186where
187    L: Location<'a>,
188{
189    type Location = Tick<L>;
190
191    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
192        Singleton::new(
193            location.clone(),
194            HydroNode::CycleSource {
195                cycle_id,
196                metadata: location.new_node_metadata(Self::collection_kind()),
197            },
198        )
199    }
200}
201
202impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
203where
204    L: Location<'a>,
205{
206    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
207        assert_eq!(
208            Location::id(&self.location),
209            expected_location,
210            "locations do not match"
211        );
212        self.location
213            .flow_state()
214            .borrow_mut()
215            .push_root(HydroRoot::CycleSink {
216                cycle_id,
217                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
218                op_metadata: HydroIrOpMetadata::new(),
219            });
220    }
221}
222
223impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
224where
225    L: Location<'a> + NoTick,
226{
227    type Location = L;
228
229    fn create_source(cycle_id: CycleId, location: L) -> Self {
230        Singleton::new(
231            location.clone(),
232            HydroNode::CycleSource {
233                cycle_id,
234                metadata: location.new_node_metadata(Self::collection_kind()),
235            },
236        )
237    }
238}
239
240impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
241where
242    L: Location<'a> + NoTick,
243{
244    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
245        assert_eq!(
246            Location::id(&self.location),
247            expected_location,
248            "locations do not match"
249        );
250        self.location
251            .flow_state()
252            .borrow_mut()
253            .push_root(HydroRoot::CycleSink {
254                cycle_id,
255                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
256                op_metadata: HydroIrOpMetadata::new(),
257            });
258    }
259}
260
261impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
262where
263    T: Clone,
264    L: Location<'a>,
265{
266    fn clone(&self) -> Self {
267        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
268            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
269            *self.ir_node.borrow_mut() = HydroNode::Tee {
270                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
271                metadata: self.location.new_node_metadata(Self::collection_kind()),
272            };
273        }
274
275        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
276            Singleton {
277                location: self.location.clone(),
278                flow_state: self.flow_state.clone(),
279                ir_node: HydroNode::Tee {
280                    inner: SharedNode(inner.0.clone()),
281                    metadata: metadata.clone(),
282                }
283                .into(),
284                _phantom: PhantomData,
285            }
286        } else {
287            unreachable!()
288        }
289    }
290}
291
292#[cfg(stageleft_runtime)]
293fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
294    me: Singleton<T, Tick<L>, B>,
295    other: Optional<O, Tick<L>, B::UnderlyingBound>,
296) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
297    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
298    super::optional::zip_inside_tick(me_as_optional, other)
299}
300
301impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
302where
303    L: Location<'a>,
304{
305    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308        let flow_state = location.flow_state().clone();
309        Singleton {
310            location,
311            flow_state,
312            ir_node: RefCell::new(ir_node),
313            _phantom: PhantomData,
314        }
315    }
316
317    pub(crate) fn collection_kind() -> CollectionKind {
318        CollectionKind::Singleton {
319            bound: B::bound_kind(),
320            element_type: stageleft::quote_type::<T>().into(),
321        }
322    }
323
324    /// Returns the [`Location`] where this singleton is being materialized.
325    pub fn location(&self) -> &L {
326        &self.location
327    }
328
329    /// Creates a lightweight reference handle to this singleton that can be captured
330    /// inside `q!()` closures. The handle resolves to `&T` at runtime.
331    ///
332    /// The singleton must be bounded, otherwise reading it would be non-deterministic.
333    ///
334    /// ```rust
335    /// # #[cfg(feature = "deploy")] {
336    /// # use hydro_lang::prelude::*;
337    /// # use futures::StreamExt;
338    /// # tokio_test::block_on(async {
339    /// # let mut deployment = hydro_deploy::Deployment::new();
340    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
341    /// # let process = builder.process::<()>();
342    /// # let external = builder.external::<()>();
343    /// let my_count = process
344    ///     .source_iter(q!(0..5i32))
345    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
346    /// let count_ref = my_count.by_ref();
347    /// let out_port = process
348    ///     .source_iter(q!(1..=3i32))
349    ///     .map(q!(|x| x + *count_ref))
350    ///     .send_bincode_external(&external);
351    /// # let nodes = builder
352    /// #     .with_default_optimize()
353    /// #     .with_process(&process, deployment.Localhost())
354    /// #     .with_external(&external, deployment.Localhost())
355    /// #     .deploy(&mut deployment);
356    /// # deployment.deploy().await.unwrap();
357    /// # let mut out_recv = nodes.connect(out_port).await;
358    /// # deployment.start().await.unwrap();
359    /// # let mut results = Vec::new();
360    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
361    /// # results.sort();
362    /// // fold(0..5) = 10, so results are 11, 12, 13
363    /// # assert_eq!(results, vec![11, 12, 13]);
364    /// # });
365    /// # }
366    /// ```
367    pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, T, L>
368    where
369        B: IsBounded,
370    {
371        // Wrap in HydroNode::Singleton for materialization + identity tracking.
372        // If already a Singleton node, reuse it. If a Tee (from clone), wrap inner.
373        if !matches!(&*self.ir_node.borrow(), HydroNode::Singleton { .. }) {
374            let orig = self.ir_node.replace(HydroNode::Placeholder);
375            *self.ir_node.borrow_mut() = HydroNode::Singleton {
376                inner: SharedNode(Rc::new(RefCell::new(orig))),
377                metadata: self.location.new_node_metadata(Self::collection_kind()),
378            };
379        }
380        let borrow = self.ir_node.borrow();
381        let HydroNode::Singleton { inner, .. } = &*borrow else {
382            unreachable!()
383        };
384        crate::singleton_ref::SingletonRef::new(Rc::clone(&inner.0))
385    }
386
387    /// Drops the monotonicity property of the [`Singleton`].
388    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
389        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
390            Singleton::new(
391                self.location.clone(),
392                self.ir_node.replace(HydroNode::Placeholder),
393            )
394        } else {
395            Singleton::new(
396                self.location.clone(),
397                HydroNode::Cast {
398                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
399                    metadata:
400                        self.location.new_node_metadata(
401                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
402                        ),
403                },
404            )
405        }
406    }
407
408    /// Transforms the singleton value by applying a function `f` to it,
409    /// continuously as the input is updated.
410    ///
411    /// # Example
412    /// ```rust
413    /// # #[cfg(feature = "deploy")] {
414    /// # use hydro_lang::prelude::*;
415    /// # use futures::StreamExt;
416    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
417    /// let tick = process.tick();
418    /// let singleton = tick.singleton(q!(5));
419    /// singleton.map(q!(|v| v * 2)).all_ticks()
420    /// # }, |mut stream| async move {
421    /// // 10
422    /// # assert_eq!(stream.next().await.unwrap(), 10);
423    /// # }));
424    /// # }
425    /// ```
426    pub fn map<U, F, OP, B2: SingletonBound>(
427        self,
428        f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
429    ) -> Singleton<U, L, B2>
430    where
431        F: Fn(T) -> U + 'a,
432        B: ApplyOrderPreservingSingleton<OP, B2>,
433    {
434        let (f, proof) = f.splice_fn1_ctx_props(&self.location);
435        proof.register_proof(&f);
436        let f = f.into();
437        Singleton::new(
438            self.location.clone(),
439            HydroNode::Map {
440                f,
441                singleton_refs: Vec::new(),
442                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
443                metadata: self
444                    .location
445                    .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
446            },
447        )
448    }
449
450    /// Transforms the singleton value by applying a function `f` to it and then flattening
451    /// the result into a stream, preserving the order of elements.
452    ///
453    /// The function `f` is applied to the singleton value to produce an iterator, and all items
454    /// from that iterator are emitted in the output stream in deterministic order.
455    ///
456    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
457    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
458    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
459    ///
460    /// # Example
461    /// ```rust
462    /// # #[cfg(feature = "deploy")] {
463    /// # use hydro_lang::prelude::*;
464    /// # use futures::StreamExt;
465    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
466    /// let tick = process.tick();
467    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
468    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
469    /// # }, |mut stream| async move {
470    /// // 1, 2, 3
471    /// # for w in vec![1, 2, 3] {
472    /// #     assert_eq!(stream.next().await.unwrap(), w);
473    /// # }
474    /// # }));
475    /// # }
476    /// ```
477    pub fn flat_map_ordered<U, I, F>(
478        self,
479        f: impl IntoQuotedMut<'a, F, L>,
480    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
481    where
482        B: IsBounded,
483        I: IntoIterator<Item = U>,
484        F: Fn(T) -> I + 'a,
485    {
486        self.into_stream().flat_map_ordered(f)
487    }
488
489    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
490    /// for the output type `I` to produce items in any order.
491    ///
492    /// The function `f` is applied to the singleton value to produce an iterator, and all items
493    /// from that iterator are emitted in the output stream in non-deterministic order.
494    ///
495    /// # Example
496    /// ```rust
497    /// # #[cfg(feature = "deploy")] {
498    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
499    /// # use futures::StreamExt;
500    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
501    /// let tick = process.tick();
502    /// let singleton = tick.singleton(q!(
503    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
504    /// ));
505    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
506    /// # }, |mut stream| async move {
507    /// // 1, 2, 3, but in no particular order
508    /// # let mut results = Vec::new();
509    /// # for _ in 0..3 {
510    /// #     results.push(stream.next().await.unwrap());
511    /// # }
512    /// # results.sort();
513    /// # assert_eq!(results, vec![1, 2, 3]);
514    /// # }));
515    /// # }
516    /// ```
517    pub fn flat_map_unordered<U, I, F>(
518        self,
519        f: impl IntoQuotedMut<'a, F, L>,
520    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
521    where
522        B: IsBounded,
523        I: IntoIterator<Item = U>,
524        F: Fn(T) -> I + 'a,
525    {
526        self.into_stream().flat_map_unordered(f)
527    }
528
529    /// Flattens the singleton value into a stream, preserving the order of elements.
530    ///
531    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
532    /// are emitted in the output stream in deterministic order.
533    ///
534    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
535    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
536    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
537    ///
538    /// # Example
539    /// ```rust
540    /// # #[cfg(feature = "deploy")] {
541    /// # use hydro_lang::prelude::*;
542    /// # use futures::StreamExt;
543    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
544    /// let tick = process.tick();
545    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
546    /// singleton.flatten_ordered().all_ticks()
547    /// # }, |mut stream| async move {
548    /// // 1, 2, 3
549    /// # for w in vec![1, 2, 3] {
550    /// #     assert_eq!(stream.next().await.unwrap(), w);
551    /// # }
552    /// # }));
553    /// # }
554    /// ```
555    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
556    where
557        B: IsBounded,
558        T: IntoIterator<Item = U>,
559    {
560        self.flat_map_ordered(q!(|x| x))
561    }
562
563    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
564    /// for the element type `T` to produce items in any order.
565    ///
566    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
567    /// are emitted in the output stream in non-deterministic order.
568    ///
569    /// # Example
570    /// ```rust
571    /// # #[cfg(feature = "deploy")] {
572    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
573    /// # use futures::StreamExt;
574    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
575    /// let tick = process.tick();
576    /// let singleton = tick.singleton(q!(
577    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
578    /// ));
579    /// singleton.flatten_unordered().all_ticks()
580    /// # }, |mut stream| async move {
581    /// // 1, 2, 3, but in no particular order
582    /// # let mut results = Vec::new();
583    /// # for _ in 0..3 {
584    /// #     results.push(stream.next().await.unwrap());
585    /// # }
586    /// # results.sort();
587    /// # assert_eq!(results, vec![1, 2, 3]);
588    /// # }));
589    /// # }
590    /// ```
591    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
592    where
593        B: IsBounded,
594        T: IntoIterator<Item = U>,
595    {
596        self.flat_map_unordered(q!(|x| x))
597    }
598
599    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
600    ///
601    /// If the predicate returns `true`, the output optional contains the same value.
602    /// If the predicate returns `false`, the output optional is empty.
603    ///
604    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
605    /// not modify or take ownership of the value. If you need to modify the value while filtering
606    /// use [`Singleton::filter_map`] instead.
607    ///
608    /// # Example
609    /// ```rust
610    /// # #[cfg(feature = "deploy")] {
611    /// # use hydro_lang::prelude::*;
612    /// # use futures::StreamExt;
613    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614    /// let tick = process.tick();
615    /// let singleton = tick.singleton(q!(5));
616    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
617    /// # }, |mut stream| async move {
618    /// // 5
619    /// # assert_eq!(stream.next().await.unwrap(), 5);
620    /// # }));
621    /// # }
622    /// ```
623    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
624    where
625        F: Fn(&T) -> bool + 'a,
626    {
627        let f = f.splice_fn1_borrow_ctx(&self.location).into();
628        Optional::new(
629            self.location.clone(),
630            HydroNode::Filter {
631                f,
632                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
633                metadata: self
634                    .location
635                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
636            },
637        )
638    }
639
640    /// An operator that both filters and maps. It yields the value only if the supplied
641    /// closure `f` returns `Some(value)`.
642    ///
643    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
644    /// If the closure returns `None`, the output optional is empty.
645    ///
646    /// # Example
647    /// ```rust
648    /// # #[cfg(feature = "deploy")] {
649    /// # use hydro_lang::prelude::*;
650    /// # use futures::StreamExt;
651    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
652    /// let tick = process.tick();
653    /// let singleton = tick.singleton(q!("42"));
654    /// singleton
655    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
656    ///     .all_ticks()
657    /// # }, |mut stream| async move {
658    /// // 42
659    /// # assert_eq!(stream.next().await.unwrap(), 42);
660    /// # }));
661    /// # }
662    /// ```
663    pub fn filter_map<U, F>(
664        self,
665        f: impl IntoQuotedMut<'a, F, L>,
666    ) -> Optional<U, L, B::UnderlyingBound>
667    where
668        F: Fn(T) -> Option<U> + 'a,
669    {
670        let f = f.splice_fn1_ctx(&self.location).into();
671        Optional::new(
672            self.location.clone(),
673            HydroNode::FilterMap {
674                f,
675                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
676                metadata: self
677                    .location
678                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
679            },
680        )
681    }
682
683    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
684    ///
685    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
686    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
687    /// non-null. This is useful for combining several pieces of state together.
688    ///
689    /// # Example
690    /// ```rust
691    /// # #[cfg(feature = "deploy")] {
692    /// # use hydro_lang::prelude::*;
693    /// # use futures::StreamExt;
694    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
695    /// let tick = process.tick();
696    /// let numbers = process
697    ///   .source_iter(q!(vec![123, 456]))
698    ///   .batch(&tick, nondet!(/** test */));
699    /// let count = numbers.clone().count(); // Singleton
700    /// let max = numbers.max(); // Optional
701    /// count.zip(max).all_ticks()
702    /// # }, |mut stream| async move {
703    /// // [(2, 456)]
704    /// # for w in vec![(2, 456)] {
705    /// #     assert_eq!(stream.next().await.unwrap(), w);
706    /// # }
707    /// # }));
708    /// # }
709    /// ```
710    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
711    where
712        Self: ZipResult<'a, O, Location = L>,
713        B: IsBounded,
714    {
715        check_matching_location(&self.location, &Self::other_location(&other));
716
717        if L::is_top_level()
718            && let Some(tick) = self.location.try_tick()
719        {
720            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
721            let out = zip_inside_tick(
722                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
723                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
724                    other_location.clone(),
725                    HydroNode::Cast {
726                        inner: Box::new(Self::other_ir_node(other)),
727                        metadata: other_location.new_node_metadata(Optional::<
728                            <Self as ZipResult<'a, O>>::OtherType,
729                            Tick<L>,
730                            Bounded,
731                        >::collection_kind(
732                        )),
733                    },
734                )
735                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
736            )
737            .latest();
738
739            Self::make(
740                out.location.clone(),
741                out.ir_node.replace(HydroNode::Placeholder),
742            )
743        } else {
744            Self::make(
745                self.location.clone(),
746                HydroNode::CrossSingleton {
747                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
748                    right: Box::new(Self::other_ir_node(other)),
749                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
750                        bound: B::BOUND_KIND,
751                        element_type: stageleft::quote_type::<
752                            <Self as ZipResult<'a, O>>::ElementType,
753                        >()
754                        .into(),
755                    }),
756                },
757            )
758        }
759    }
760
761    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
762    /// boolean signal is `true`, otherwise the output is null.
763    ///
764    /// # Example
765    /// ```rust
766    /// # #[cfg(feature = "deploy")] {
767    /// # use hydro_lang::prelude::*;
768    /// # use futures::StreamExt;
769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770    /// let tick = process.tick();
771    /// // ticks are lazy by default, forces the second tick to run
772    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
773    ///
774    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
775    /// let batch_first_tick = process
776    ///   .source_iter(q!(vec![1]))
777    ///   .batch(&tick, nondet!(/** test */));
778    /// let batch_second_tick = process
779    ///   .source_iter(q!(vec![1, 2, 3]))
780    ///   .batch(&tick, nondet!(/** test */))
781    ///   .defer_tick();
782    /// batch_first_tick.chain(batch_second_tick).count()
783    ///   .filter_if(signal)
784    ///   .all_ticks()
785    /// # }, |mut stream| async move {
786    /// // [1]
787    /// # for w in vec![1] {
788    /// #     assert_eq!(stream.next().await.unwrap(), w);
789    /// # }
790    /// # }));
791    /// # }
792    /// ```
793    pub fn filter_if(
794        self,
795        signal: Singleton<bool, L, B>,
796    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
797    where
798        B: IsBounded,
799    {
800        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
801    }
802
803    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
804    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
805    ///
806    /// Useful for conditionally processing, such as only emitting a singleton's value outside
807    /// a tick if some other condition is satisfied.
808    ///
809    /// # Example
810    /// ```rust
811    /// # #[cfg(feature = "deploy")] {
812    /// # use hydro_lang::prelude::*;
813    /// # use futures::StreamExt;
814    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815    /// let tick = process.tick();
816    /// // ticks are lazy by default, forces the second tick to run
817    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818    ///
819    /// let batch_first_tick = process
820    ///   .source_iter(q!(vec![1]))
821    ///   .batch(&tick, nondet!(/** test */));
822    /// let batch_second_tick = process
823    ///   .source_iter(q!(vec![1, 2, 3]))
824    ///   .batch(&tick, nondet!(/** test */))
825    ///   .defer_tick(); // appears on the second tick
826    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
827    /// batch_first_tick.chain(batch_second_tick).count()
828    ///   .filter_if_some(some_on_first_tick)
829    ///   .all_ticks()
830    /// # }, |mut stream| async move {
831    /// // [1]
832    /// # for w in vec![1] {
833    /// #     assert_eq!(stream.next().await.unwrap(), w);
834    /// # }
835    /// # }));
836    /// # }
837    /// ```
838    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
839    pub fn filter_if_some<U>(
840        self,
841        signal: Optional<U, L, B>,
842    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
843    where
844        B: IsBounded,
845    {
846        self.filter_if(signal.is_some())
847    }
848
849    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
850    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
851    ///
852    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
853    /// the condition.
854    ///
855    /// # Example
856    /// ```rust
857    /// # #[cfg(feature = "deploy")] {
858    /// # use hydro_lang::prelude::*;
859    /// # use futures::StreamExt;
860    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
861    /// let tick = process.tick();
862    /// // ticks are lazy by default, forces the second tick to run
863    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
864    ///
865    /// let batch_first_tick = process
866    ///   .source_iter(q!(vec![1]))
867    ///   .batch(&tick, nondet!(/** test */));
868    /// let batch_second_tick = process
869    ///   .source_iter(q!(vec![1, 2, 3]))
870    ///   .batch(&tick, nondet!(/** test */))
871    ///   .defer_tick(); // appears on the second tick
872    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
873    /// batch_first_tick.chain(batch_second_tick).count()
874    ///   .filter_if_none(some_on_first_tick)
875    ///   .all_ticks()
876    /// # }, |mut stream| async move {
877    /// // [3]
878    /// # for w in vec![3] {
879    /// #     assert_eq!(stream.next().await.unwrap(), w);
880    /// # }
881    /// # }));
882    /// # }
883    /// ```
884    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
885    pub fn filter_if_none<U>(
886        self,
887        other: Optional<U, L, B>,
888    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
889    where
890        B: IsBounded,
891    {
892        self.filter_if(other.is_none())
893    }
894
895    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
896    ///
897    /// # Example
898    /// ```rust
899    /// # #[cfg(feature = "deploy")] {
900    /// # use hydro_lang::prelude::*;
901    /// # use futures::StreamExt;
902    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
903    /// let tick = process.tick();
904    /// let a = tick.singleton(q!(5));
905    /// let b = tick.singleton(q!(5));
906    /// a.equals(b).all_ticks()
907    /// # }, |mut stream| async move {
908    /// // [true]
909    /// # assert_eq!(stream.next().await.unwrap(), true);
910    /// # }));
911    /// # }
912    /// ```
913    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
914    where
915        T: PartialEq,
916        B: IsBounded,
917    {
918        self.zip(other).map(q!(|(a, b)| a == b))
919    }
920
921    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
922    /// greater than or equal to the provided threshold. The event will have the value of the
923    /// given threshold.
924    ///
925    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
926    /// the threshold would be non-deterministic.
927    ///
928    /// # Example
929    /// ```rust
930    /// # #[cfg(feature = "deploy")] {
931    /// # use hydro_lang::prelude::*;
932    /// # use futures::StreamExt;
933    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
934    /// let a = // singleton 1 ~> 5 ~> 10
935    /// # process.singleton(q!(5));
936    /// let b = process.singleton(q!(4));
937    /// a.threshold_greater_or_equal(b)
938    /// # }, |mut stream| async move {
939    /// // [4]
940    /// # assert_eq!(stream.next().await.unwrap(), 4);
941    /// # }));
942    /// # }
943    /// ```
944    pub fn threshold_greater_or_equal<B2: IsBounded>(
945        self,
946        threshold: Singleton<T, L, B2>,
947    ) -> Stream<T, L, B::UnderlyingBound>
948    where
949        T: Clone + PartialOrd,
950        B: IsMonotonic,
951    {
952        let threshold = threshold.make_bounded();
953        match self.try_make_bounded() {
954            Ok(bounded) => {
955                let uncasted = threshold
956                    .zip(bounded)
957                    .into_stream()
958                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
959
960                Stream::new(
961                    uncasted.location.clone(),
962                    uncasted.ir_node.replace(HydroNode::Placeholder),
963                )
964            }
965            Err(me) => {
966                let uncasted = sliced! {
967                    let me = use(me, nondet!(/** thresholds are deterministic */));
968                    let mut remaining_threshold = use::state(|l| {
969                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
970                        as_option
971                    });
972
973                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
974                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
975                    passed.map(q!(|(t, _)| t))
976                };
977
978                Stream::new(
979                    uncasted.location.clone(),
980                    uncasted.ir_node.replace(HydroNode::Placeholder),
981                )
982            }
983        }
984    }
985
986    /// An operator which allows you to "name" a `HydroNode`.
987    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
988    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
989        {
990            let mut node = self.ir_node.borrow_mut();
991            let metadata = node.metadata_mut();
992            metadata.tag = Some(name.to_owned());
993        }
994        self
995    }
996}
997
998impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
999    type Output = Singleton<bool, L, B::UnderlyingBound>;
1000
1001    fn not(self) -> Self::Output {
1002        self.map(q!(|b| !b))
1003    }
1004}
1005
1006impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1007where
1008    L: Location<'a>,
1009{
1010    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1011    /// the inner `Option`.
1012    ///
1013    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1014    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1015    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1016    ///
1017    /// # Example
1018    /// ```rust
1019    /// # #[cfg(feature = "deploy")] {
1020    /// # use hydro_lang::prelude::*;
1021    /// # use futures::StreamExt;
1022    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1023    /// let tick = process.tick();
1024    /// let singleton = tick.singleton(q!(Some(42)));
1025    /// singleton.into_optional().all_ticks()
1026    /// # }, |mut stream| async move {
1027    /// // 42
1028    /// # assert_eq!(stream.next().await.unwrap(), 42);
1029    /// # }));
1030    /// # }
1031    /// ```
1032    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1033        self.filter_map(q!(|v| v))
1034    }
1035}
1036
1037impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1038where
1039    L: Location<'a>,
1040{
1041    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1042    ///
1043    /// # Example
1044    /// ```rust
1045    /// # #[cfg(feature = "deploy")] {
1046    /// # use hydro_lang::prelude::*;
1047    /// # use futures::StreamExt;
1048    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049    /// let tick = process.tick();
1050    /// // ticks are lazy by default, forces the second tick to run
1051    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1052    ///
1053    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1054    /// let b = tick.singleton(q!(true)); // true, true
1055    /// a.and(b).all_ticks()
1056    /// # }, |mut stream| async move {
1057    /// // [true, false]
1058    /// # for w in vec![true, false] {
1059    /// #     assert_eq!(stream.next().await.unwrap(), w);
1060    /// # }
1061    /// # }));
1062    /// # }
1063    /// ```
1064    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1065    where
1066        B: IsBounded,
1067    {
1068        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1069    }
1070
1071    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1072    ///
1073    /// # Example
1074    /// ```rust
1075    /// # #[cfg(feature = "deploy")] {
1076    /// # use hydro_lang::prelude::*;
1077    /// # use futures::StreamExt;
1078    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1079    /// let tick = process.tick();
1080    /// // ticks are lazy by default, forces the second tick to run
1081    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1082    ///
1083    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1084    /// let b = tick.singleton(q!(false)); // false, false
1085    /// a.or(b).all_ticks()
1086    /// # }, |mut stream| async move {
1087    /// // [true, false]
1088    /// # for w in vec![true, false] {
1089    /// #     assert_eq!(stream.next().await.unwrap(), w);
1090    /// # }
1091    /// # }));
1092    /// # }
1093    /// ```
1094    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1095    where
1096        B: IsBounded,
1097    {
1098        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1099    }
1100}
1101
1102impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1103where
1104    L: Location<'a> + NoTick,
1105{
1106    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1107    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1108    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1109    /// all snapshots of this singleton into the atomic-associated tick will observe the
1110    /// same value each tick.
1111    ///
1112    /// # Non-Determinism
1113    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1114    /// the output singleton has a non-deterministic value since the snapshot can be at an
1115    /// arbitrary point in time.
1116    pub fn snapshot_atomic(
1117        self,
1118        tick: &Tick<L>,
1119        _nondet: NonDet,
1120    ) -> Singleton<T, Tick<L>, Bounded> {
1121        Singleton::new(
1122            tick.clone(),
1123            HydroNode::Batch {
1124                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1125                metadata: tick
1126                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1127            },
1128        )
1129    }
1130
1131    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1132    /// to the value will be asynchronously propagated.
1133    pub fn end_atomic(self) -> Singleton<T, L, B> {
1134        Singleton::new(
1135            self.location.tick.l.clone(),
1136            HydroNode::EndAtomic {
1137                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138                metadata: self
1139                    .location
1140                    .tick
1141                    .l
1142                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1143            },
1144        )
1145    }
1146}
1147
1148impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1149where
1150    L: Location<'a>,
1151{
1152    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1153    /// will observe the same version of the value and will be executed synchronously before any
1154    /// outputs are yielded (in [`Optional::end_atomic`]).
1155    ///
1156    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1157    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1158    /// a different version).
1159    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1160        let id = self.location.flow_state().borrow_mut().next_clock_id();
1161        let out_location = Atomic {
1162            tick: Tick {
1163                id,
1164                l: self.location.clone(),
1165            },
1166        };
1167        Singleton::new(
1168            out_location.clone(),
1169            HydroNode::BeginAtomic {
1170                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1171                metadata: out_location
1172                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1173            },
1174        )
1175    }
1176
1177    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1178    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1179    /// relevant data that contributed to the snapshot at tick `t`.
1180    ///
1181    /// # Non-Determinism
1182    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1183    /// the output singleton has a non-deterministic value since the snapshot can be at an
1184    /// arbitrary point in time.
1185    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
1186        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1187        Singleton::new(
1188            tick.clone(),
1189            HydroNode::Batch {
1190                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1191                metadata: tick
1192                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1193            },
1194        )
1195    }
1196
1197    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1198    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1199    ///
1200    /// # Non-Determinism
1201    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1202    /// to non-deterministic batching and arrival of inputs, the output stream is
1203    /// non-deterministic.
1204    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1205    where
1206        L: NoTick,
1207    {
1208        sliced! {
1209            let snapshot = use(self, nondet);
1210            snapshot.into_stream()
1211        }
1212        .weaken_retries()
1213    }
1214
1215    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1216    /// value taken at various points in time. Because the input singleton may be
1217    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1218    /// represent the value of the singleton given some prefix of the streams leading up to
1219    /// it.
1220    ///
1221    /// # Non-Determinism
1222    /// The output stream is non-deterministic in which elements are sampled, since this
1223    /// is controlled by a clock.
1224    pub fn sample_every(
1225        self,
1226        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1227        nondet: NonDet,
1228    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1229    where
1230        L: NoTick + NoAtomic,
1231    {
1232        let samples = self.location.source_interval(interval);
1233        sliced! {
1234            let snapshot = use(self, nondet);
1235            let sample_batch = use(samples, nondet);
1236
1237            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1238        }
1239        .weaken_retries()
1240    }
1241
1242    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1243    /// implies that `B == Bounded`.
1244    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1245    where
1246        B: IsBounded,
1247    {
1248        Singleton::new(
1249            self.location.clone(),
1250            self.ir_node.replace(HydroNode::Placeholder),
1251        )
1252    }
1253
1254    #[expect(clippy::result_large_err, reason = "internal use only")]
1255    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1256        if B::UnderlyingBound::BOUNDED {
1257            Ok(Singleton::new(
1258                self.location.clone(),
1259                self.ir_node.replace(HydroNode::Placeholder),
1260            ))
1261        } else {
1262            Err(self)
1263        }
1264    }
1265
1266    /// Clones this bounded singleton into a tick, returning a singleton that has the
1267    /// same value as the outer singleton. Because the outer singleton is bounded, this
1268    /// is deterministic because there is only a single immutable version.
1269    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1270    where
1271        B: IsBounded,
1272        T: Clone,
1273    {
1274        // TODO(shadaj): avoid printing simulator logs for this snapshot
1275        self.snapshot(
1276            tick,
1277            nondet!(/** bounded top-level singleton so deterministic */),
1278        )
1279    }
1280
1281    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1282    ///
1283    /// # Example
1284    /// ```rust
1285    /// # #[cfg(feature = "deploy")] {
1286    /// # use hydro_lang::prelude::*;
1287    /// # use futures::StreamExt;
1288    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1289    /// let tick = process.tick();
1290    /// let batch_input = process
1291    ///   .source_iter(q!(vec![123, 456]))
1292    ///   .batch(&tick, nondet!(/** test */));
1293    /// batch_input.clone().chain(
1294    ///   batch_input.count().into_stream()
1295    /// ).all_ticks()
1296    /// # }, |mut stream| async move {
1297    /// // [123, 456, 2]
1298    /// # for w in vec![123, 456, 2] {
1299    /// #     assert_eq!(stream.next().await.unwrap(), w);
1300    /// # }
1301    /// # }));
1302    /// # }
1303    /// ```
1304    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1305    where
1306        B: IsBounded,
1307    {
1308        Stream::new(
1309            self.location.clone(),
1310            HydroNode::Cast {
1311                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1312                metadata: self.location.new_node_metadata(Stream::<
1313                    T,
1314                    Tick<L>,
1315                    Bounded,
1316                    TotalOrder,
1317                    ExactlyOnce,
1318                >::collection_kind()),
1319            },
1320        )
1321    }
1322
1323    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1324    /// producing a singleton of the resolved output.
1325    ///
1326    /// This is useful when the singleton contains an async computation that must
1327    /// be awaited before further processing. The future is polled to completion
1328    /// before the output value is emitted.
1329    ///
1330    /// # Example
1331    /// ```rust
1332    /// # #[cfg(feature = "deploy")] {
1333    /// # use hydro_lang::prelude::*;
1334    /// # use futures::StreamExt;
1335    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1336    /// let tick = process.tick();
1337    /// let singleton = tick.singleton(q!(5));
1338    /// singleton
1339    ///     .map(q!(|v| async move { v * 2 }))
1340    ///     .resolve_future_blocking()
1341    ///     .all_ticks()
1342    /// # }, |mut stream| async move {
1343    /// // 10
1344    /// # assert_eq!(stream.next().await.unwrap(), 10);
1345    /// # }));
1346    /// # }
1347    /// ```
1348    pub fn resolve_future_blocking(
1349        self,
1350    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1351    where
1352        T: Future,
1353        B: IsBounded,
1354    {
1355        Singleton::new(
1356            self.location.clone(),
1357            HydroNode::ResolveFuturesBlocking {
1358                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1359                metadata: self
1360                    .location
1361                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1362            },
1363        )
1364    }
1365}
1366
1367impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1368where
1369    L: Location<'a>,
1370{
1371    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1372    /// which will stream the value computed in _each_ tick as a separate stream element.
1373    ///
1374    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1375    /// producing one element in the output for each tick. This is useful for batched computations,
1376    /// where the results from each tick must be combined together.
1377    ///
1378    /// # Example
1379    /// ```rust
1380    /// # #[cfg(feature = "deploy")] {
1381    /// # use hydro_lang::prelude::*;
1382    /// # use futures::StreamExt;
1383    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384    /// let tick = process.tick();
1385    /// # // ticks are lazy by default, forces the second tick to run
1386    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1387    /// # let batch_first_tick = process
1388    /// #   .source_iter(q!(vec![1]))
1389    /// #   .batch(&tick, nondet!(/** test */));
1390    /// # let batch_second_tick = process
1391    /// #   .source_iter(q!(vec![1, 2, 3]))
1392    /// #   .batch(&tick, nondet!(/** test */))
1393    /// #   .defer_tick(); // appears on the second tick
1394    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1395    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1396    ///     .count()
1397    ///     .all_ticks()
1398    /// # }, |mut stream| async move {
1399    /// // [1, 3]
1400    /// # for w in vec![1, 3] {
1401    /// #     assert_eq!(stream.next().await.unwrap(), w);
1402    /// # }
1403    /// # }));
1404    /// # }
1405    /// ```
1406    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1407        self.into_stream().all_ticks()
1408    }
1409
1410    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1411    /// which will stream the value computed in _each_ tick as a separate stream element.
1412    ///
1413    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1414    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1415    /// singleton's [`Tick`] context.
1416    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1417        self.into_stream().all_ticks_atomic()
1418    }
1419
1420    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1421    /// be asynchronously updated with the latest value of the singleton inside the tick.
1422    ///
1423    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1424    /// tick that tracks the inner value. This is useful for getting the value as of the
1425    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1426    ///
1427    /// # Example
1428    /// ```rust
1429    /// # #[cfg(feature = "deploy")] {
1430    /// # use hydro_lang::prelude::*;
1431    /// # use futures::StreamExt;
1432    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1433    /// let tick = process.tick();
1434    /// # // ticks are lazy by default, forces the second tick to run
1435    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1436    /// # let batch_first_tick = process
1437    /// #   .source_iter(q!(vec![1]))
1438    /// #   .batch(&tick, nondet!(/** test */));
1439    /// # let batch_second_tick = process
1440    /// #   .source_iter(q!(vec![1, 2, 3]))
1441    /// #   .batch(&tick, nondet!(/** test */))
1442    /// #   .defer_tick(); // appears on the second tick
1443    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1444    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1445    ///     .count()
1446    ///     .latest()
1447    /// # .sample_eager(nondet!(/** test */))
1448    /// # }, |mut stream| async move {
1449    /// // asynchronously changes from 1 ~> 3
1450    /// # for w in vec![1, 3] {
1451    /// #     assert_eq!(stream.next().await.unwrap(), w);
1452    /// # }
1453    /// # }));
1454    /// # }
1455    /// ```
1456    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1457        Singleton::new(
1458            self.location.outer().clone(),
1459            HydroNode::YieldConcat {
1460                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1461                metadata: self
1462                    .location
1463                    .outer()
1464                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1465            },
1466        )
1467    }
1468
1469    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1470    /// be updated with the latest value of the singleton inside the tick.
1471    ///
1472    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1473    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1474    /// singleton's [`Tick`] context.
1475    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1476        let out_location = Atomic {
1477            tick: self.location.clone(),
1478        };
1479        Singleton::new(
1480            out_location.clone(),
1481            HydroNode::YieldConcat {
1482                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1483                metadata: out_location
1484                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1485            },
1486        )
1487    }
1488}
1489
1490#[doc(hidden)]
1491/// Helper trait that determines the output collection type for [`Singleton::zip`].
1492///
1493/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1494/// [`Singleton`].
1495#[sealed::sealed]
1496pub trait ZipResult<'a, Other> {
1497    /// The output collection type.
1498    type Out;
1499    /// The type of the tupled output value.
1500    type ElementType;
1501    /// The type of the other collection's value.
1502    type OtherType;
1503    /// The location where the tupled result will be materialized.
1504    type Location: Location<'a>;
1505
1506    /// The location of the second input to the `zip`.
1507    fn other_location(other: &Other) -> Self::Location;
1508    /// The IR node of the second input to the `zip`.
1509    fn other_ir_node(other: Other) -> HydroNode;
1510
1511    /// Constructs the output live collection given an IR node containing the zip result.
1512    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1513}
1514
1515#[sealed::sealed]
1516impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1517where
1518    L: Location<'a>,
1519{
1520    type Out = Singleton<(T, U), L, B>;
1521    type ElementType = (T, U);
1522    type OtherType = U;
1523    type Location = L;
1524
1525    fn other_location(other: &Singleton<U, L, B>) -> L {
1526        other.location.clone()
1527    }
1528
1529    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1530        other.ir_node.replace(HydroNode::Placeholder)
1531    }
1532
1533    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1534        Singleton::new(
1535            location.clone(),
1536            HydroNode::Cast {
1537                inner: Box::new(ir_node),
1538                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1539            },
1540        )
1541    }
1542}
1543
1544#[sealed::sealed]
1545impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1546    for Singleton<T, L, B>
1547where
1548    L: Location<'a>,
1549{
1550    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1551    type ElementType = (T, U);
1552    type OtherType = U;
1553    type Location = L;
1554
1555    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1556        other.location.clone()
1557    }
1558
1559    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1560        other.ir_node.replace(HydroNode::Placeholder)
1561    }
1562
1563    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1564        Optional::new(location, ir_node)
1565    }
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570    #[cfg(feature = "deploy")]
1571    use futures::{SinkExt, StreamExt};
1572    #[cfg(feature = "deploy")]
1573    use hydro_deploy::Deployment;
1574    #[cfg(any(feature = "deploy", feature = "sim"))]
1575    use stageleft::q;
1576
1577    #[cfg(any(feature = "deploy", feature = "sim"))]
1578    use crate::compile::builder::FlowBuilder;
1579    #[cfg(feature = "deploy")]
1580    use crate::live_collections::stream::ExactlyOnce;
1581    #[cfg(any(feature = "deploy", feature = "sim"))]
1582    use crate::location::Location;
1583    #[cfg(any(feature = "deploy", feature = "sim"))]
1584    use crate::nondet::nondet;
1585
1586    #[cfg(feature = "deploy")]
1587    #[tokio::test]
1588    async fn tick_cycle_cardinality() {
1589        let mut deployment = Deployment::new();
1590
1591        let mut flow = FlowBuilder::new();
1592        let node = flow.process::<()>();
1593        let external = flow.external::<()>();
1594
1595        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1596
1597        let node_tick = node.tick();
1598        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1599        let counts = singleton
1600            .clone()
1601            .into_stream()
1602            .count()
1603            .filter_if(
1604                input
1605                    .batch(&node_tick, nondet!(/** testing */))
1606                    .first()
1607                    .is_some(),
1608            )
1609            .all_ticks()
1610            .send_bincode_external(&external);
1611        complete_cycle.complete_next_tick(singleton);
1612
1613        let nodes = flow
1614            .with_process(&node, deployment.Localhost())
1615            .with_external(&external, deployment.Localhost())
1616            .deploy(&mut deployment);
1617
1618        deployment.deploy().await.unwrap();
1619
1620        let mut tick_trigger = nodes.connect(input_send).await;
1621        let mut external_out = nodes.connect(counts).await;
1622
1623        deployment.start().await.unwrap();
1624
1625        tick_trigger.send(()).await.unwrap();
1626
1627        assert_eq!(external_out.next().await.unwrap(), 1);
1628
1629        tick_trigger.send(()).await.unwrap();
1630
1631        assert_eq!(external_out.next().await.unwrap(), 1);
1632    }
1633
1634    #[cfg(feature = "sim")]
1635    #[test]
1636    #[should_panic]
1637    fn sim_fold_intermediate_states() {
1638        let mut flow = FlowBuilder::new();
1639        let node = flow.process::<()>();
1640
1641        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1642        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1643
1644        let tick = node.tick();
1645        let batch = folded.snapshot(&tick, nondet!(/** test */));
1646        let out_recv = batch.all_ticks().sim_output();
1647
1648        flow.sim().exhaustive(async || {
1649            assert_eq!(out_recv.next().await.unwrap(), 10);
1650        });
1651    }
1652
1653    #[cfg(feature = "sim")]
1654    #[test]
1655    fn sim_fold_intermediate_state_count() {
1656        let mut flow = FlowBuilder::new();
1657        let node = flow.process::<()>();
1658
1659        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1660        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1661
1662        let tick = node.tick();
1663        let batch = folded.snapshot(&tick, nondet!(/** test */));
1664        let out_recv = batch.all_ticks().sim_output();
1665
1666        let instance_count = flow.sim().exhaustive(async || {
1667            let out = out_recv.collect::<Vec<_>>().await;
1668            assert_eq!(out.last(), Some(&10));
1669        });
1670
1671        assert_eq!(
1672            instance_count,
1673            16 // 2^4 possible subsets of intermediates (including initial state)
1674        )
1675    }
1676
1677    #[cfg(feature = "sim")]
1678    #[test]
1679    fn sim_fold_no_repeat_initial() {
1680        // check that we don't repeat the initial state of the fold in autonomous decisions
1681
1682        let mut flow = FlowBuilder::new();
1683        let node = flow.process::<()>();
1684
1685        let (in_port, input) = node.sim_input();
1686        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1687
1688        let tick = node.tick();
1689        let batch = folded.snapshot(&tick, nondet!(/** test */));
1690        let out_recv = batch.all_ticks().sim_output();
1691
1692        flow.sim().exhaustive(async || {
1693            assert_eq!(out_recv.next().await.unwrap(), 0);
1694
1695            in_port.send(123);
1696
1697            assert_eq!(out_recv.next().await.unwrap(), 123);
1698        });
1699    }
1700
1701    #[cfg(feature = "sim")]
1702    #[test]
1703    #[should_panic]
1704    fn sim_fold_repeats_snapshots() {
1705        // when the tick is driven by a snapshot AND something else, the snapshot can
1706        // "stutter" and repeat the same state multiple times
1707
1708        let mut flow = FlowBuilder::new();
1709        let node = flow.process::<()>();
1710
1711        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1712        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1713
1714        let tick = node.tick();
1715        let batch = source
1716            .batch(&tick, nondet!(/** test */))
1717            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1718        let out_recv = batch.all_ticks().sim_output();
1719
1720        flow.sim().exhaustive(async || {
1721            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1722            {
1723                panic!("repeated snapshot");
1724            }
1725        });
1726    }
1727
1728    #[cfg(feature = "sim")]
1729    #[test]
1730    fn sim_fold_repeats_snapshots_count() {
1731        // check the number of instances
1732        let mut flow = FlowBuilder::new();
1733        let node = flow.process::<()>();
1734
1735        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1736        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1737
1738        let tick = node.tick();
1739        let batch = source
1740            .batch(&tick, nondet!(/** test */))
1741            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1742        let out_recv = batch.all_ticks().sim_output();
1743
1744        let count = flow.sim().exhaustive(async || {
1745            let _ = out_recv.collect::<Vec<_>>().await;
1746        });
1747
1748        assert_eq!(count, 52);
1749        // don't have a combinatorial explanation for this number yet, but checked via logs
1750    }
1751
1752    #[cfg(feature = "sim")]
1753    #[test]
1754    fn sim_top_level_singleton_exhaustive() {
1755        // ensures that top-level singletons have only one snapshot
1756        let mut flow = FlowBuilder::new();
1757        let node = flow.process::<()>();
1758
1759        let singleton = node.singleton(q!(1));
1760        let tick = node.tick();
1761        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1762        let out_recv = batch.all_ticks().sim_output();
1763
1764        let count = flow.sim().exhaustive(async || {
1765            let _ = out_recv.collect::<Vec<_>>().await;
1766        });
1767
1768        assert_eq!(count, 1);
1769    }
1770
1771    #[cfg(feature = "sim")]
1772    #[test]
1773    fn sim_top_level_singleton_join_count() {
1774        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1775        // exploration
1776
1777        let mut flow = FlowBuilder::new();
1778        let node = flow.process::<()>();
1779
1780        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1781        let tick = node.tick();
1782        let batch = source_iter
1783            .batch(&tick, nondet!(/** test */))
1784            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1785        let out_recv = batch.all_ticks().sim_output();
1786
1787        let instance_count = flow.sim().exhaustive(async || {
1788            let _ = out_recv.collect::<Vec<_>>().await;
1789        });
1790
1791        assert_eq!(
1792            instance_count,
1793            16 // 2^4 ways to split up (including a possibly empty first batch)
1794        )
1795    }
1796
1797    #[cfg(feature = "sim")]
1798    #[test]
1799    fn top_level_singleton_into_stream_no_replay() {
1800        let mut flow = FlowBuilder::new();
1801        let node = flow.process::<()>();
1802
1803        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1804        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1805
1806        let out_recv = folded.into_stream().sim_output();
1807
1808        flow.sim().exhaustive(async || {
1809            out_recv.assert_yields_only([10]).await;
1810        });
1811    }
1812
1813    #[cfg(feature = "sim")]
1814    #[test]
1815    fn inside_tick_singleton_zip() {
1816        use crate::live_collections::Stream;
1817        use crate::live_collections::sliced::sliced;
1818
1819        let mut flow = FlowBuilder::new();
1820        let node = flow.process::<()>();
1821
1822        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1823        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1824
1825        let out_recv = sliced! {
1826            let v = use(folded, nondet!(/** test */));
1827            v.clone().zip(v).into_stream()
1828        }
1829        .sim_output();
1830
1831        let count = flow.sim().exhaustive(async || {
1832            let out = out_recv.collect::<Vec<_>>().await;
1833            assert_eq!(out.last(), Some(&(3, 3)));
1834        });
1835
1836        assert_eq!(count, 4);
1837    }
1838}