Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41    type UnderlyingBound: Boundedness;
42    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43    type ValueBound: Boundedness;
44
45    /// The type of the keyed singleton if the value for each key is immutable.
46    type WithBoundedValue: KeyedSingletonBound<
47            UnderlyingBound = Self::UnderlyingBound,
48            ValueBound = Bounded,
49            EraseMonotonic = Self::WithBoundedValue,
50        >;
51
52    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53    type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55    /// The type of the keyed singleton if the value for each key is no longer monotonic.
56    type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59    fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63    type UnderlyingBound = Unbounded;
64    type ValueBound = Unbounded;
65    type WithBoundedValue = BoundedValue;
66    type KeyedStreamToMonotone = MonotonicValue;
67    type EraseMonotonic = Unbounded;
68
69    fn bound_kind() -> KeyedSingletonBoundKind {
70        KeyedSingletonBoundKind::Unbounded
71    }
72}
73
74impl KeyedSingletonBound for Bounded {
75    type UnderlyingBound = Bounded;
76    type ValueBound = Bounded;
77    type WithBoundedValue = Bounded;
78    type KeyedStreamToMonotone = Bounded;
79    type EraseMonotonic = Bounded;
80
81    fn bound_kind() -> KeyedSingletonBoundKind {
82        KeyedSingletonBoundKind::Bounded
83    }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91    type UnderlyingBound = Unbounded;
92    type ValueBound = Bounded;
93    type WithBoundedValue = BoundedValue;
94    type KeyedStreamToMonotone = BoundedValue;
95    type EraseMonotonic = BoundedValue;
96
97    fn bound_kind() -> KeyedSingletonBoundKind {
98        KeyedSingletonBoundKind::BoundedValue
99    }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107    type UnderlyingBound = Unbounded;
108    type ValueBound = Unbounded;
109    type WithBoundedValue = BoundedValue;
110    type KeyedStreamToMonotone = MonotonicValue;
111    type EraseMonotonic = Unbounded;
112
113    fn bound_kind() -> KeyedSingletonBoundKind {
114        KeyedSingletonBoundKind::MonotonicValue
115    }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131///     - [`Bounded`] (local and finite)
132///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135    pub(crate) location: Loc,
136    pub(crate) ir_node: RefCell<HydroNode>,
137    pub(crate) flow_state: FlowState,
138
139    _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143    fn drop(&mut self) {
144        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147                input: Box::new(ir_node),
148                op_metadata: HydroIrOpMetadata::new(),
149            });
150        }
151    }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155    for KeyedSingleton<K, V, Loc, Bound>
156{
157    fn clone(&self) -> Self {
158        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160            *self.ir_node.borrow_mut() = HydroNode::Tee {
161                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162                metadata: self.location.new_node_metadata(Self::collection_kind()),
163            };
164        }
165
166        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167            KeyedSingleton {
168                location: self.location.clone(),
169                flow_state: self.flow_state.clone(),
170                ir_node: HydroNode::Tee {
171                    inner: SharedNode(inner.0.clone()),
172                    metadata: metadata.clone(),
173                }
174                .into(),
175                _phantom: PhantomData,
176            }
177        } else {
178            unreachable!()
179        }
180    }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184    for KeyedSingleton<K, V, L, B>
185where
186    L: Location<'a> + NoTick,
187{
188    type Location = L;
189
190    fn create_source(cycle_id: CycleId, location: L) -> Self {
191        KeyedSingleton {
192            flow_state: location.flow_state().clone(),
193            location: location.clone(),
194            ir_node: RefCell::new(HydroNode::CycleSource {
195                cycle_id,
196                metadata: location.new_node_metadata(Self::collection_kind()),
197            }),
198            _phantom: PhantomData,
199        }
200    }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205    L: Location<'a>,
206{
207    type Location = Tick<L>;
208
209    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210        KeyedSingleton::new(
211            location.clone(),
212            HydroNode::CycleSource {
213                cycle_id,
214                metadata: location.new_node_metadata(Self::collection_kind()),
215            },
216        )
217    }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        KeyedSingleton::defer_tick(self)
226    }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230    for KeyedSingleton<K, V, L, B>
231where
232    L: Location<'a> + NoTick,
233{
234    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235        assert_eq!(
236            Location::id(&self.location),
237            expected_location,
238            "locations do not match"
239        );
240        self.location
241            .flow_state()
242            .borrow_mut()
243            .push_root(HydroRoot::CycleSink {
244                cycle_id,
245                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246                op_metadata: HydroIrOpMetadata::new(),
247            });
248    }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253    L: Location<'a>,
254{
255    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256        assert_eq!(
257            Location::id(&self.location),
258            expected_location,
259            "locations do not match"
260        );
261        self.location
262            .flow_state()
263            .borrow_mut()
264            .push_root(HydroRoot::CycleSink {
265                cycle_id,
266                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267                op_metadata: HydroIrOpMetadata::new(),
268            });
269    }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277        let flow_state = location.flow_state().clone();
278        KeyedSingleton {
279            location,
280            flow_state,
281            ir_node: RefCell::new(ir_node),
282            _phantom: PhantomData,
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed singleton is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290}
291
292#[cfg(stageleft_runtime)]
293fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
294    me: KeyedSingleton<K, V, L, Bounded>,
295) -> Singleton<usize, L, Bounded> {
296    me.entries().count()
297}
298
299#[cfg(stageleft_runtime)]
300fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
301    me: KeyedSingleton<K, V, L, Bounded>,
302) -> Singleton<HashMap<K, V>, L, Bounded>
303where
304    K: Eq + Hash,
305{
306    me.entries()
307        .assume_ordering_trusted(nondet!(
308            /// There is only one element associated with each key. The closure technically
309            /// isn't commutative in the case where both passed entries have the same key
310            /// but different values.
311            ///
312            /// In the future, we may want to have an `assume!(...)` statement in the UDF that
313            /// the key is never already present in the map.
314        ))
315        .fold(
316            q!(|| HashMap::new()),
317            q!(|map, (k, v)| {
318                map.insert(k, v);
319            }),
320        )
321}
322
323impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
324    pub(crate) fn collection_kind() -> CollectionKind {
325        CollectionKind::KeyedSingleton {
326            bound: B::bound_kind(),
327            key_type: stageleft::quote_type::<K>().into(),
328            value_type: stageleft::quote_type::<V>().into(),
329        }
330    }
331
332    /// Transforms each value by invoking `f` on each element, with keys staying the same
333    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
334    ///
335    /// If you do not want to modify the stream and instead only want to view
336    /// each item use [`KeyedSingleton::inspect`] instead.
337    ///
338    /// # Example
339    /// ```rust
340    /// # #[cfg(feature = "deploy")] {
341    /// # use hydro_lang::prelude::*;
342    /// # use futures::StreamExt;
343    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
344    /// let keyed_singleton = // { 1: 2, 2: 4 }
345    /// # process
346    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
347    /// #     .into_keyed()
348    /// #     .first();
349    /// keyed_singleton.map(q!(|v| v + 1))
350    /// #   .entries()
351    /// # }, |mut stream| async move {
352    /// // { 1: 3, 2: 5 }
353    /// # let mut results = Vec::new();
354    /// # for _ in 0..2 {
355    /// #     results.push(stream.next().await.unwrap());
356    /// # }
357    /// # results.sort();
358    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
359    /// # }));
360    /// # }
361    /// ```
362    pub fn map<U, F>(
363        self,
364        f: impl IntoQuotedMut<'a, F, L> + Copy,
365    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
366    where
367        F: Fn(V) -> U + 'a,
368    {
369        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
370        let map_f = q!({
371            let orig = f;
372            move |(k, v)| (k, orig(v))
373        })
374        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
375        .into();
376
377        KeyedSingleton::new(
378            self.location.clone(),
379            HydroNode::Map {
380                f: map_f,
381                singleton_refs: Vec::new(),
382                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
383                metadata: self.location.new_node_metadata(KeyedSingleton::<
384                    K,
385                    U,
386                    L,
387                    B::EraseMonotonic,
388                >::collection_kind()),
389            },
390        )
391    }
392
393    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
394    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
395    ///
396    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
397    /// the new value `U`. The key remains unchanged in the output.
398    ///
399    /// # Example
400    /// ```rust
401    /// # #[cfg(feature = "deploy")] {
402    /// # use hydro_lang::prelude::*;
403    /// # use futures::StreamExt;
404    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405    /// let keyed_singleton = // { 1: 2, 2: 4 }
406    /// # process
407    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
408    /// #     .into_keyed()
409    /// #     .first();
410    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
411    /// #   .entries()
412    /// # }, |mut stream| async move {
413    /// // { 1: 3, 2: 6 }
414    /// # let mut results = Vec::new();
415    /// # for _ in 0..2 {
416    /// #     results.push(stream.next().await.unwrap());
417    /// # }
418    /// # results.sort();
419    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
420    /// # }));
421    /// # }
422    /// ```
423    pub fn map_with_key<U, F>(
424        self,
425        f: impl IntoQuotedMut<'a, F, L> + Copy,
426    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
427    where
428        F: Fn((K, V)) -> U + 'a,
429        K: Clone,
430    {
431        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
432        let map_f = q!({
433            let orig = f;
434            move |(k, v)| {
435                let out = orig((Clone::clone(&k), v));
436                (k, out)
437            }
438        })
439        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
440        .into();
441
442        KeyedSingleton::new(
443            self.location.clone(),
444            HydroNode::Map {
445                f: map_f,
446                singleton_refs: Vec::new(),
447                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
448                metadata: self.location.new_node_metadata(KeyedSingleton::<
449                    K,
450                    U,
451                    L,
452                    B::EraseMonotonic,
453                >::collection_kind()),
454            },
455        )
456    }
457
458    /// Gets the number of keys in the keyed singleton.
459    ///
460    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
461    /// since keys may be added / removed over time. When the set of keys changes, the count will
462    /// be asynchronously updated.
463    ///
464    /// # Example
465    /// ```rust
466    /// # #[cfg(feature = "deploy")] {
467    /// # use hydro_lang::prelude::*;
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470    /// # let tick = process.tick();
471    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
472    /// # process
473    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
474    /// #     .into_keyed()
475    /// #     .batch(&tick, nondet!(/** test */))
476    /// #     .first();
477    /// keyed_singleton.key_count()
478    /// # .all_ticks()
479    /// # }, |mut stream| async move {
480    /// // 3
481    /// # assert_eq!(stream.next().await.unwrap(), 3);
482    /// # }));
483    /// # }
484    /// ```
485    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
486        if B::ValueBound::BOUNDED {
487            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
488                location: self.location.clone(),
489                flow_state: self.flow_state.clone(),
490                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
491                _phantom: PhantomData,
492            };
493
494            me.entries().count().ignore_monotonic()
495        } else if L::is_top_level()
496            && let Some(tick) = self.location.try_tick()
497            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
498        {
499            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
500                self.location.clone(),
501                self.ir_node.replace(HydroNode::Placeholder),
502            );
503
504            let out =
505                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
506                    .latest();
507            Singleton::new(
508                out.location.clone(),
509                out.ir_node.replace(HydroNode::Placeholder),
510            )
511        } else {
512            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
513        }
514    }
515
516    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
517    ///
518    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
519    /// asynchronously as well.
520    ///
521    /// # Example
522    /// ```rust
523    /// # #[cfg(feature = "deploy")] {
524    /// # use hydro_lang::prelude::*;
525    /// # use futures::StreamExt;
526    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
527    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
528    /// # process
529    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
530    /// #     .into_keyed()
531    /// #     .batch(&process.tick(), nondet!(/** test */))
532    /// #     .first();
533    /// keyed_singleton.into_singleton()
534    /// # .all_ticks()
535    /// # }, |mut stream| async move {
536    /// // { 1: "a", 2: "b", 3: "c" }
537    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
538    /// # }));
539    /// # }
540    /// ```
541    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
542    where
543        K: Eq + Hash,
544    {
545        if B::ValueBound::BOUNDED {
546            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
547                location: self.location.clone(),
548                flow_state: self.flow_state.clone(),
549                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
550                _phantom: PhantomData,
551            };
552
553            me.entries()
554                .assume_ordering_trusted(nondet!(
555                    /// There is only one element associated with each key. The closure technically
556                    /// isn't commutative in the case where both passed entries have the same key
557                    /// but different values.
558                    ///
559                    /// In the future, we may want to have an `assume!(...)` statement in the UDF that
560                    /// the key is never already present in the map.
561                ))
562                .fold(
563                    q!(|| HashMap::new()),
564                    q!(|map, (k, v)| {
565                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
566                        map.insert(k, v);
567                    }),
568                )
569        } else if L::is_top_level()
570            && let Some(tick) = self.location.try_tick()
571            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
572        {
573            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
574                self.location.clone(),
575                self.ir_node.replace(HydroNode::Placeholder),
576            );
577
578            let out = into_singleton_inside_tick(
579                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
580            )
581            .latest();
582            Singleton::new(
583                out.location.clone(),
584                out.ir_node.replace(HydroNode::Placeholder),
585            )
586        } else {
587            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
588        }
589    }
590
591    /// An operator which allows you to "name" a `HydroNode`.
592    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
593    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
594        {
595            let mut node = self.ir_node.borrow_mut();
596            let metadata = node.metadata_mut();
597            metadata.tag = Some(name.to_owned());
598        }
599        self
600    }
601
602    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
603    /// implies that `B == Bounded`.
604    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
605    where
606        B: IsBounded,
607    {
608        KeyedSingleton::new(
609            self.location.clone(),
610            self.ir_node.replace(HydroNode::Placeholder),
611        )
612    }
613
614    /// Gets the value associated with a specific key from the keyed singleton.
615    /// Returns `None` if the key is `None` or there is no associated value.
616    ///
617    /// # Example
618    /// ```rust
619    /// # #[cfg(feature = "deploy")] {
620    /// # use hydro_lang::prelude::*;
621    /// # use futures::StreamExt;
622    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
623    /// let tick = process.tick();
624    /// let keyed_data = process
625    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
626    ///     .into_keyed()
627    ///     .batch(&tick, nondet!(/** test */))
628    ///     .first();
629    /// let key = tick.singleton(q!(1));
630    /// keyed_data.get(key).all_ticks()
631    /// # }, |mut stream| async move {
632    /// // 2
633    /// # assert_eq!(stream.next().await.unwrap(), 2);
634    /// # }));
635    /// # }
636    /// ```
637    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
638    where
639        B: IsBounded,
640        K: Hash + Eq + Clone,
641        V: Clone,
642    {
643        self.make_bounded()
644            .into_keyed_stream()
645            .get(key)
646            .cast_at_most_one_element()
647    }
648
649    /// Emit a keyed stream containing keys shared between the keyed singleton and the
650    /// keyed stream, where each value in the output keyed stream is a tuple of
651    /// (the keyed singleton's value, the keyed stream's value).
652    ///
653    /// # Example
654    /// ```rust
655    /// # #[cfg(feature = "deploy")] {
656    /// # use hydro_lang::prelude::*;
657    /// # use futures::StreamExt;
658    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659    /// let tick = process.tick();
660    /// let keyed_data = process
661    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
662    ///     .into_keyed()
663    ///     .batch(&tick, nondet!(/** test */))
664    ///     .first();
665    /// let other_data = process
666    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
667    ///     .into_keyed()
668    ///     .batch(&tick, nondet!(/** test */));
669    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
670    /// # }, |mut stream| async move {
671    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
672    /// # let mut results = vec![];
673    /// # for _ in 0..3 {
674    /// #     results.push(stream.next().await.unwrap());
675    /// # }
676    /// # results.sort();
677    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
678    /// # }));
679    /// # }
680    /// ```
681    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
682        self,
683        other: KeyedStream<K, V2, L, B2, O2, R2>,
684    ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
685    where
686        B: IsBounded,
687        K: Eq + Hash + Clone,
688        V: Clone,
689        V2: Clone,
690    {
691        // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
692        // always produces deterministic order per key (nested loop join), this could just use
693        // `join_keyed_stream` without constructing IRs manually
694        KeyedStream::new(
695            self.location.clone(),
696            HydroNode::Join {
697                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
698                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
699                metadata: self
700                    .location
701                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
702            },
703        )
704    }
705
706    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
707    /// where each value in the output keyed singleton is a tuple of
708    /// (self.value, other.value).
709    ///
710    /// # Example
711    /// ```rust
712    /// # #[cfg(feature = "deploy")] {
713    /// # use hydro_lang::prelude::*;
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
716    /// # let tick = process.tick();
717    /// let requests = // { 1: 10, 2: 20, 3: 30 }
718    /// # process
719    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
720    /// #     .into_keyed()
721    /// #     .batch(&tick, nondet!(/** test */))
722    /// #     .first();
723    /// let other = // { 1: 100, 2: 200, 4: 400 }
724    /// # process
725    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
726    /// #     .into_keyed()
727    /// #     .batch(&tick, nondet!(/** test */))
728    /// #     .first();
729    /// requests.join_keyed_singleton(other)
730    /// # .entries().all_ticks()
731    /// # }, |mut stream| async move {
732    /// // { 1: (10, 100), 2: (20, 200) }
733    /// # let mut results = vec![];
734    /// # for _ in 0..2 {
735    /// #     results.push(stream.next().await.unwrap());
736    /// # }
737    /// # results.sort();
738    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
739    /// # }));
740    /// # }
741    /// ```
742    pub fn join_keyed_singleton<V2: Clone>(
743        self,
744        other: KeyedSingleton<K, V2, L, Bounded>,
745    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
746    where
747        B: IsBounded,
748        K: Eq + Hash + Clone,
749        V: Clone,
750    {
751        let result_stream = self
752            .make_bounded()
753            .entries()
754            .join(other.entries())
755            .into_keyed();
756
757        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
758        result_stream.cast_at_most_one_entry_per_key()
759    }
760
761    /// For each value in `self`, find the matching key in `lookup`.
762    /// The output is a keyed singleton with the key from `self`, and a value
763    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
764    /// If the key is not present in `lookup`, the option will be [`None`].
765    ///
766    /// # Example
767    /// ```rust
768    /// # #[cfg(feature = "deploy")] {
769    /// # use hydro_lang::prelude::*;
770    /// # use futures::StreamExt;
771    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
772    /// # let tick = process.tick();
773    /// let requests = // { 1: 10, 2: 20 }
774    /// # process
775    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
776    /// #     .into_keyed()
777    /// #     .batch(&tick, nondet!(/** test */))
778    /// #     .first();
779    /// let other_data = // { 10: 100, 11: 110 }
780    /// # process
781    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
782    /// #     .into_keyed()
783    /// #     .batch(&tick, nondet!(/** test */))
784    /// #     .first();
785    /// requests.lookup_keyed_singleton(other_data)
786    /// # .entries().all_ticks()
787    /// # }, |mut stream| async move {
788    /// // { 1: (10, Some(100)), 2: (20, None) }
789    /// # let mut results = vec![];
790    /// # for _ in 0..2 {
791    /// #     results.push(stream.next().await.unwrap());
792    /// # }
793    /// # results.sort();
794    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
795    /// # }));
796    /// # }
797    /// ```
798    pub fn lookup_keyed_singleton<V2>(
799        self,
800        lookup: KeyedSingleton<V, V2, L, Bounded>,
801    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
802    where
803        B: IsBounded,
804        K: Eq + Hash + Clone,
805        V: Eq + Hash + Clone,
806        V2: Clone,
807    {
808        let result_stream = self
809            .make_bounded()
810            .into_keyed_stream()
811            .lookup_keyed_stream(lookup.into_keyed_stream());
812
813        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
814        result_stream.cast_at_most_one_entry_per_key()
815    }
816
817    /// For each value in `self`, find the matching key in `lookup`.
818    /// The output is a keyed stream with the key from `self`, and a value
819    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
820    /// If the key is not present in `lookup`, the option will be [`None`].
821    ///
822    /// # Example
823    /// ```rust
824    /// # #[cfg(feature = "deploy")] {
825    /// # use hydro_lang::prelude::*;
826    /// # use futures::StreamExt;
827    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
828    /// # let tick = process.tick();
829    /// let requests = // { 1: 10, 2: 20 }
830    /// # process
831    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
832    /// #     .into_keyed()
833    /// #     .batch(&tick, nondet!(/** test */))
834    /// #     .first();
835    /// let other_data = // { 10: 100, 10: 110 }
836    /// # process
837    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
838    /// #     .into_keyed()
839    /// #     .batch(&tick, nondet!(/** test */));
840    /// requests.lookup_keyed_stream(other_data)
841    /// # .entries().all_ticks()
842    /// # }, |mut stream| async move {
843    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
844    /// # let mut results = vec![];
845    /// # for _ in 0..3 {
846    /// #     results.push(stream.next().await.unwrap());
847    /// # }
848    /// # results.sort();
849    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
850    /// # }));
851    /// # }
852    /// ```
853    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
854        self,
855        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
856    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
857    where
858        B: IsBounded,
859        K: Eq + Hash + Clone,
860        V: Eq + Hash + Clone,
861        V2: Clone,
862    {
863        self.make_bounded()
864            .entries()
865            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
866            .into_keyed()
867            .lookup_keyed_stream(lookup)
868    }
869}
870
871impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
872    KeyedSingleton<K, V, L, B>
873{
874    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
875    ///
876    /// The value for each key must be bounded, otherwise the resulting stream elements would be
877    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
878    /// into the output.
879    ///
880    /// # Example
881    /// ```rust
882    /// # #[cfg(feature = "deploy")] {
883    /// # use hydro_lang::prelude::*;
884    /// # use futures::StreamExt;
885    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
886    /// let keyed_singleton = // { 1: 2, 2: 4 }
887    /// # process
888    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
889    /// #     .into_keyed()
890    /// #     .first();
891    /// keyed_singleton.entries()
892    /// # }, |mut stream| async move {
893    /// // (1, 2), (2, 4) in any order
894    /// # let mut results = Vec::new();
895    /// # for _ in 0..2 {
896    /// #     results.push(stream.next().await.unwrap());
897    /// # }
898    /// # results.sort();
899    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
900    /// # }));
901    /// # }
902    /// ```
903    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
904        self.into_keyed_stream().entries()
905    }
906
907    /// Flattens the keyed singleton into an unordered stream of just the values.
908    ///
909    /// The value for each key must be bounded, otherwise the resulting stream elements would be
910    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
911    /// into the output.
912    ///
913    /// # Example
914    /// ```rust
915    /// # #[cfg(feature = "deploy")] {
916    /// # use hydro_lang::prelude::*;
917    /// # use futures::StreamExt;
918    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
919    /// let keyed_singleton = // { 1: 2, 2: 4 }
920    /// # process
921    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
922    /// #     .into_keyed()
923    /// #     .first();
924    /// keyed_singleton.values()
925    /// # }, |mut stream| async move {
926    /// // 2, 4 in any order
927    /// # let mut results = Vec::new();
928    /// # for _ in 0..2 {
929    /// #     results.push(stream.next().await.unwrap());
930    /// # }
931    /// # results.sort();
932    /// # assert_eq!(results, vec![2, 4]);
933    /// # }));
934    /// # }
935    /// ```
936    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
937        let map_f = q!(|(_, v)| v)
938            .splice_fn1_ctx::<(K, V), V>(&self.location)
939            .into();
940
941        Stream::new(
942            self.location.clone(),
943            HydroNode::Map {
944                f: map_f,
945                singleton_refs: Vec::new(),
946                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
947                metadata: self.location.new_node_metadata(Stream::<
948                    V,
949                    L,
950                    B::UnderlyingBound,
951                    NoOrder,
952                    ExactlyOnce,
953                >::collection_kind()),
954            },
955        )
956    }
957
958    /// Flattens the keyed singleton into an unordered stream of just the keys.
959    ///
960    /// The value for each key must be bounded, otherwise the removal of keys would result in
961    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
962    /// into the output.
963    ///
964    /// # Example
965    /// ```rust
966    /// # #[cfg(feature = "deploy")] {
967    /// # use hydro_lang::prelude::*;
968    /// # use futures::StreamExt;
969    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
970    /// let keyed_singleton = // { 1: 2, 2: 4 }
971    /// # process
972    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
973    /// #     .into_keyed()
974    /// #     .first();
975    /// keyed_singleton.keys()
976    /// # }, |mut stream| async move {
977    /// // 1, 2 in any order
978    /// # let mut results = Vec::new();
979    /// # for _ in 0..2 {
980    /// #     results.push(stream.next().await.unwrap());
981    /// # }
982    /// # results.sort();
983    /// # assert_eq!(results, vec![1, 2]);
984    /// # }));
985    /// # }
986    /// ```
987    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
988        self.entries().map(q!(|(k, _)| k))
989    }
990
991    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
992    /// entries whose keys are not in the provided stream.
993    ///
994    /// # Example
995    /// ```rust
996    /// # #[cfg(feature = "deploy")] {
997    /// # use hydro_lang::prelude::*;
998    /// # use futures::StreamExt;
999    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1000    /// let tick = process.tick();
1001    /// let keyed_singleton = // { 1: 2, 2: 4 }
1002    /// # process
1003    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1004    /// #     .into_keyed()
1005    /// #     .first()
1006    /// #     .batch(&tick, nondet!(/** test */));
1007    /// let keys_to_remove = process
1008    ///     .source_iter(q!(vec![1]))
1009    ///     .batch(&tick, nondet!(/** test */));
1010    /// keyed_singleton.filter_key_not_in(keys_to_remove)
1011    /// #   .entries().all_ticks()
1012    /// # }, |mut stream| async move {
1013    /// // { 2: 4 }
1014    /// # for w in vec![(2, 4)] {
1015    /// #     assert_eq!(stream.next().await.unwrap(), w);
1016    /// # }
1017    /// # }));
1018    /// # }
1019    /// ```
1020    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1021        self,
1022        other: Stream<K, L, Bounded, O2, R2>,
1023    ) -> Self
1024    where
1025        K: Hash + Eq,
1026    {
1027        check_matching_location(&self.location, &other.location);
1028
1029        KeyedSingleton::new(
1030            self.location.clone(),
1031            HydroNode::AntiJoin {
1032                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1033                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1034                metadata: self.location.new_node_metadata(Self::collection_kind()),
1035            },
1036        )
1037    }
1038
1039    /// An operator which allows you to "inspect" each value of a keyed singleton without
1040    /// modifying it. The closure `f` is called on a reference to each value. This is
1041    /// mainly useful for debugging, and should not be used to generate side-effects.
1042    ///
1043    /// # Example
1044    /// ```rust
1045    /// # #[cfg(feature = "deploy")] {
1046    /// # use hydro_lang::prelude::*;
1047    /// # use futures::StreamExt;
1048    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049    /// let keyed_singleton = // { 1: 2, 2: 4 }
1050    /// # process
1051    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1052    /// #     .into_keyed()
1053    /// #     .first();
1054    /// keyed_singleton
1055    ///     .inspect(q!(|v| println!("{}", v)))
1056    /// #   .entries()
1057    /// # }, |mut stream| async move {
1058    /// // { 1: 2, 2: 4 }
1059    /// # for w in vec![(1, 2), (2, 4)] {
1060    /// #     assert_eq!(stream.next().await.unwrap(), w);
1061    /// # }
1062    /// # }));
1063    /// # }
1064    /// ```
1065    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1066    where
1067        F: Fn(&V) + 'a,
1068    {
1069        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1070        let inspect_f = q!({
1071            let orig = f;
1072            move |t: &(_, _)| orig(&t.1)
1073        })
1074        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1075        .into();
1076
1077        KeyedSingleton::new(
1078            self.location.clone(),
1079            HydroNode::Inspect {
1080                f: inspect_f,
1081                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1082                metadata: self.location.new_node_metadata(Self::collection_kind()),
1083            },
1084        )
1085    }
1086
1087    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1088    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1089    /// mainly useful for debugging, and should not be used to generate side-effects.
1090    ///
1091    /// # Example
1092    /// ```rust
1093    /// # #[cfg(feature = "deploy")] {
1094    /// # use hydro_lang::prelude::*;
1095    /// # use futures::StreamExt;
1096    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1097    /// let keyed_singleton = // { 1: 2, 2: 4 }
1098    /// # process
1099    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1100    /// #     .into_keyed()
1101    /// #     .first();
1102    /// keyed_singleton
1103    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1104    /// #   .entries()
1105    /// # }, |mut stream| async move {
1106    /// // { 1: 2, 2: 4 }
1107    /// # for w in vec![(1, 2), (2, 4)] {
1108    /// #     assert_eq!(stream.next().await.unwrap(), w);
1109    /// # }
1110    /// # }));
1111    /// # }
1112    /// ```
1113    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1114    where
1115        F: Fn(&(K, V)) + 'a,
1116    {
1117        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1118
1119        KeyedSingleton::new(
1120            self.location.clone(),
1121            HydroNode::Inspect {
1122                f: inspect_f,
1123                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1124                metadata: self.location.new_node_metadata(Self::collection_kind()),
1125            },
1126        )
1127    }
1128
1129    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1130    ///
1131    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1132    /// asynchronously updated if a new key is added that is higher than the previous max key.
1133    ///
1134    /// # Example
1135    /// ```rust
1136    /// # #[cfg(feature = "deploy")] {
1137    /// # use hydro_lang::prelude::*;
1138    /// # use futures::StreamExt;
1139    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1140    /// let tick = process.tick();
1141    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1142    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1143    /// #     .into_keyed()
1144    /// #     .first();
1145    /// keyed_singleton.get_max_key()
1146    /// # .sample_eager(nondet!(/** test */))
1147    /// # }, |mut stream| async move {
1148    /// // (2, 456)
1149    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1150    /// # }));
1151    /// # }
1152    /// ```
1153    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1154    where
1155        K: Ord,
1156    {
1157        self.entries()
1158            .assume_ordering_trusted(nondet!(
1159                /// There is only one element associated with each key, and the keys are totallly
1160                /// ordered so we will produce a deterministic value. The closure technically
1161                /// isn't commutative in the case where both passed entries have the same key
1162                /// but different values.
1163                ///
1164                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1165                /// the two inputs do not have the same key.
1166            ))
1167            .reduce(q!(
1168                move |curr, new| {
1169                    if new.0 > curr.0 {
1170                        *curr = new;
1171                    }
1172                },
1173                idempotent = manual_proof!(/** repeated elements are ignored */)
1174            ))
1175    }
1176
1177    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1178    /// element, the value.
1179    ///
1180    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1181    ///
1182    /// # Example
1183    /// ```rust
1184    /// # #[cfg(feature = "deploy")] {
1185    /// # use hydro_lang::prelude::*;
1186    /// # use futures::StreamExt;
1187    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1188    /// let keyed_singleton = // { 1: 2, 2: 4 }
1189    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1190    /// #     .into_keyed()
1191    /// #     .first();
1192    /// keyed_singleton
1193    ///     .clone()
1194    ///     .into_keyed_stream()
1195    ///     .merge_unordered(
1196    ///         keyed_singleton.into_keyed_stream()
1197    ///     )
1198    /// #   .entries()
1199    /// # }, |mut stream| async move {
1200    /// /// // { 1: [2, 2], 2: [4, 4] }
1201    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1202    /// #     assert_eq!(stream.next().await.unwrap(), w);
1203    /// # }
1204    /// # }));
1205    /// # }
1206    /// ```
1207    pub fn into_keyed_stream(
1208        self,
1209    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1210        KeyedStream::new(
1211            self.location.clone(),
1212            HydroNode::Cast {
1213                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214                metadata: self.location.new_node_metadata(KeyedStream::<
1215                    K,
1216                    V,
1217                    L,
1218                    B::UnderlyingBound,
1219                    TotalOrder,
1220                    ExactlyOnce,
1221                >::collection_kind()),
1222            },
1223        )
1224    }
1225}
1226
1227impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1228where
1229    L: Location<'a>,
1230{
1231    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1232    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1233    ///
1234    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1235    /// processed before an acknowledgement is emitted.
1236    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1237        let id = self.location.flow_state().borrow_mut().next_clock_id();
1238        let out_location = Atomic {
1239            tick: Tick {
1240                id,
1241                l: self.location.clone(),
1242            },
1243        };
1244        KeyedSingleton::new(
1245            out_location.clone(),
1246            HydroNode::BeginAtomic {
1247                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1248                metadata: out_location
1249                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1250            },
1251        )
1252    }
1253}
1254
1255impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1256where
1257    L: Location<'a> + NoTick,
1258{
1259    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1260    /// See [`KeyedSingleton::atomic`] for more details.
1261    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1262        KeyedSingleton::new(
1263            self.location.tick.l.clone(),
1264            HydroNode::EndAtomic {
1265                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1266                metadata: self
1267                    .location
1268                    .tick
1269                    .l
1270                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1271            },
1272        )
1273    }
1274}
1275
1276impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1277    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1278    /// tick `T` always has the entries of `self` at tick `T - 1`.
1279    ///
1280    /// At tick `0`, the output has no entries, since there is no previous tick.
1281    ///
1282    /// This operator enables stateful iterative processing with ticks, by sending data from one
1283    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1284    ///
1285    /// # Example
1286    /// ```rust
1287    /// # #[cfg(feature = "deploy")] {
1288    /// # use hydro_lang::prelude::*;
1289    /// # use futures::StreamExt;
1290    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1291    /// let tick = process.tick();
1292    /// # // ticks are lazy by default, forces the second tick to run
1293    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1294    /// # let batch_first_tick = process
1295    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1296    /// #   .batch(&tick, nondet!(/** test */))
1297    /// #   .into_keyed();
1298    /// # let batch_second_tick = process
1299    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1300    /// #   .batch(&tick, nondet!(/** test */))
1301    /// #   .into_keyed()
1302    /// #   .defer_tick(); // appears on the second tick
1303    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1304    /// # batch_first_tick.chain(batch_second_tick).first();
1305    /// input_batch.clone().filter_key_not_in(
1306    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1307    /// )
1308    /// # .entries().all_ticks()
1309    /// # }, |mut stream| async move {
1310    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1311    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1312    /// #     assert_eq!(stream.next().await.unwrap(), w);
1313    /// # }
1314    /// # }));
1315    /// # }
1316    /// ```
1317    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1318        KeyedSingleton::new(
1319            self.location.clone(),
1320            HydroNode::DeferTick {
1321                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1322                metadata: self
1323                    .location
1324                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1325            },
1326        )
1327    }
1328}
1329
1330impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1331where
1332    L: Location<'a>,
1333{
1334    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1335    /// point in time.
1336    ///
1337    /// # Non-Determinism
1338    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1339    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1340    pub fn snapshot(
1341        self,
1342        tick: &Tick<L>,
1343        _nondet: NonDet,
1344    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1345        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1346        KeyedSingleton::new(
1347            tick.clone(),
1348            HydroNode::Batch {
1349                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1350                metadata: tick
1351                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1352            },
1353        )
1354    }
1355}
1356
1357impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1358where
1359    L: Location<'a> + NoTick,
1360{
1361    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1362    /// state of the keyed singleton being atomically processed.
1363    ///
1364    /// # Non-Determinism
1365    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1366    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1367    pub fn snapshot_atomic(
1368        self,
1369        tick: &Tick<L>,
1370        _nondet: NonDet,
1371    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1372        KeyedSingleton::new(
1373            tick.clone(),
1374            HydroNode::Batch {
1375                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1376                metadata: tick
1377                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1378            },
1379        )
1380    }
1381}
1382
1383impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1384where
1385    L: Location<'a>,
1386{
1387    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1388    ///
1389    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1390    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1391    /// is filtered out.
1392    ///
1393    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1394    /// not modify or take ownership of the values. If you need to modify the values while filtering
1395    /// use [`KeyedSingleton::filter_map`] instead.
1396    ///
1397    /// # Example
1398    /// ```rust
1399    /// # #[cfg(feature = "deploy")] {
1400    /// # use hydro_lang::prelude::*;
1401    /// # use futures::StreamExt;
1402    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1403    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1404    /// # process
1405    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1406    /// #     .into_keyed()
1407    /// #     .first();
1408    /// keyed_singleton.filter(q!(|&v| v > 1))
1409    /// #   .entries()
1410    /// # }, |mut stream| async move {
1411    /// // { 1: 2, 2: 4 }
1412    /// # let mut results = Vec::new();
1413    /// # for _ in 0..2 {
1414    /// #     results.push(stream.next().await.unwrap());
1415    /// # }
1416    /// # results.sort();
1417    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1418    /// # }));
1419    /// # }
1420    /// ```
1421    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1422    where
1423        F: Fn(&V) -> bool + 'a,
1424    {
1425        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1426        let filter_f = q!({
1427            let orig = f;
1428            move |t: &(_, _)| orig(&t.1)
1429        })
1430        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1431        .into();
1432
1433        KeyedSingleton::new(
1434            self.location.clone(),
1435            HydroNode::Filter {
1436                f: filter_f,
1437                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1438                metadata: self
1439                    .location
1440                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1441            },
1442        )
1443    }
1444
1445    /// An operator that both filters and maps values. It yields only the key-value pairs where
1446    /// the supplied closure `f` returns `Some(value)`.
1447    ///
1448    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1449    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1450    /// If it returns `None`, the key-value pair is filtered out.
1451    ///
1452    /// # Example
1453    /// ```rust
1454    /// # #[cfg(feature = "deploy")] {
1455    /// # use hydro_lang::prelude::*;
1456    /// # use futures::StreamExt;
1457    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1458    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1459    /// # process
1460    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1461    /// #     .into_keyed()
1462    /// #     .first();
1463    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1464    /// #   .entries()
1465    /// # }, |mut stream| async move {
1466    /// // { 1: 42, 3: 100 }
1467    /// # let mut results = Vec::new();
1468    /// # for _ in 0..2 {
1469    /// #     results.push(stream.next().await.unwrap());
1470    /// # }
1471    /// # results.sort();
1472    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1473    /// # }));
1474    /// # }
1475    /// ```
1476    pub fn filter_map<F, U>(
1477        self,
1478        f: impl IntoQuotedMut<'a, F, L> + Copy,
1479    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1480    where
1481        F: Fn(V) -> Option<U> + 'a,
1482    {
1483        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1484        let filter_map_f = q!({
1485            let orig = f;
1486            move |(k, v)| orig(v).map(|o| (k, o))
1487        })
1488        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1489        .into();
1490
1491        KeyedSingleton::new(
1492            self.location.clone(),
1493            HydroNode::FilterMap {
1494                f: filter_map_f,
1495                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1496                metadata: self.location.new_node_metadata(KeyedSingleton::<
1497                    K,
1498                    U,
1499                    L,
1500                    B::EraseMonotonic,
1501                >::collection_kind()),
1502            },
1503        )
1504    }
1505
1506    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1507    /// arrived since the previous batch was released.
1508    ///
1509    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1510    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1511    ///
1512    /// # Non-Determinism
1513    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1514    /// has a non-deterministic set of key-value pairs.
1515    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1516    where
1517        L: NoTick,
1518    {
1519        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1520        KeyedSingleton::new(
1521            tick.clone(),
1522            HydroNode::Batch {
1523                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1524                metadata: tick
1525                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1526            },
1527        )
1528    }
1529}
1530
1531impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1532where
1533    L: Location<'a> + NoTick,
1534{
1535    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1536    /// atomically processed.
1537    ///
1538    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1539    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1540    ///
1541    /// # Non-Determinism
1542    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1543    /// has a non-deterministic set of key-value pairs.
1544    pub fn batch_atomic(
1545        self,
1546        tick: &Tick<L>,
1547        nondet: NonDet,
1548    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1549        let _ = nondet;
1550        KeyedSingleton::new(
1551            tick.clone(),
1552            HydroNode::Batch {
1553                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1554                metadata: tick
1555                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1556            },
1557        )
1558    }
1559}
1560
1561#[cfg(test)]
1562mod tests {
1563    #[cfg(feature = "deploy")]
1564    use futures::{SinkExt, StreamExt};
1565    #[cfg(feature = "deploy")]
1566    use hydro_deploy::Deployment;
1567    #[cfg(any(feature = "deploy", feature = "sim"))]
1568    use stageleft::q;
1569
1570    #[cfg(any(feature = "deploy", feature = "sim"))]
1571    use crate::compile::builder::FlowBuilder;
1572    #[cfg(any(feature = "deploy", feature = "sim"))]
1573    use crate::location::Location;
1574    #[cfg(any(feature = "deploy", feature = "sim"))]
1575    use crate::nondet::nondet;
1576
1577    #[cfg(feature = "deploy")]
1578    #[tokio::test]
1579    async fn key_count_bounded_value() {
1580        let mut deployment = Deployment::new();
1581
1582        let mut flow = FlowBuilder::new();
1583        let node = flow.process::<()>();
1584        let external = flow.external::<()>();
1585
1586        let (input_port, input) = node.source_external_bincode(&external);
1587        let out = input
1588            .into_keyed()
1589            .first()
1590            .key_count()
1591            .sample_eager(nondet!(/** test */))
1592            .send_bincode_external(&external);
1593
1594        let nodes = flow
1595            .with_process(&node, deployment.Localhost())
1596            .with_external(&external, deployment.Localhost())
1597            .deploy(&mut deployment);
1598
1599        deployment.deploy().await.unwrap();
1600
1601        let mut external_in = nodes.connect(input_port).await;
1602        let mut external_out = nodes.connect(out).await;
1603
1604        deployment.start().await.unwrap();
1605
1606        assert_eq!(external_out.next().await.unwrap(), 0);
1607
1608        external_in.send((1, 1)).await.unwrap();
1609        assert_eq!(external_out.next().await.unwrap(), 1);
1610
1611        external_in.send((2, 2)).await.unwrap();
1612        assert_eq!(external_out.next().await.unwrap(), 2);
1613    }
1614
1615    #[cfg(feature = "deploy")]
1616    #[tokio::test]
1617    async fn key_count_unbounded_value() {
1618        let mut deployment = Deployment::new();
1619
1620        let mut flow = FlowBuilder::new();
1621        let node = flow.process::<()>();
1622        let external = flow.external::<()>();
1623
1624        let (input_port, input) = node.source_external_bincode(&external);
1625        let out = input
1626            .into_keyed()
1627            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1628            .key_count()
1629            .sample_eager(nondet!(/** test */))
1630            .send_bincode_external(&external);
1631
1632        let nodes = flow
1633            .with_process(&node, deployment.Localhost())
1634            .with_external(&external, deployment.Localhost())
1635            .deploy(&mut deployment);
1636
1637        deployment.deploy().await.unwrap();
1638
1639        let mut external_in = nodes.connect(input_port).await;
1640        let mut external_out = nodes.connect(out).await;
1641
1642        deployment.start().await.unwrap();
1643
1644        assert_eq!(external_out.next().await.unwrap(), 0);
1645
1646        external_in.send((1, 1)).await.unwrap();
1647        assert_eq!(external_out.next().await.unwrap(), 1);
1648
1649        external_in.send((1, 2)).await.unwrap();
1650        assert_eq!(external_out.next().await.unwrap(), 1);
1651
1652        external_in.send((2, 2)).await.unwrap();
1653        assert_eq!(external_out.next().await.unwrap(), 2);
1654
1655        external_in.send((1, 1)).await.unwrap();
1656        assert_eq!(external_out.next().await.unwrap(), 2);
1657
1658        external_in.send((3, 1)).await.unwrap();
1659        assert_eq!(external_out.next().await.unwrap(), 3);
1660    }
1661
1662    #[cfg(feature = "deploy")]
1663    #[tokio::test]
1664    async fn into_singleton_bounded_value() {
1665        let mut deployment = Deployment::new();
1666
1667        let mut flow = FlowBuilder::new();
1668        let node = flow.process::<()>();
1669        let external = flow.external::<()>();
1670
1671        let (input_port, input) = node.source_external_bincode(&external);
1672        let out = input
1673            .into_keyed()
1674            .first()
1675            .into_singleton()
1676            .sample_eager(nondet!(/** test */))
1677            .send_bincode_external(&external);
1678
1679        let nodes = flow
1680            .with_process(&node, deployment.Localhost())
1681            .with_external(&external, deployment.Localhost())
1682            .deploy(&mut deployment);
1683
1684        deployment.deploy().await.unwrap();
1685
1686        let mut external_in = nodes.connect(input_port).await;
1687        let mut external_out = nodes.connect(out).await;
1688
1689        deployment.start().await.unwrap();
1690
1691        assert_eq!(
1692            external_out.next().await.unwrap(),
1693            std::collections::HashMap::new()
1694        );
1695
1696        external_in.send((1, 1)).await.unwrap();
1697        assert_eq!(
1698            external_out.next().await.unwrap(),
1699            vec![(1, 1)].into_iter().collect()
1700        );
1701
1702        external_in.send((2, 2)).await.unwrap();
1703        assert_eq!(
1704            external_out.next().await.unwrap(),
1705            vec![(1, 1), (2, 2)].into_iter().collect()
1706        );
1707    }
1708
1709    #[cfg(feature = "deploy")]
1710    #[tokio::test]
1711    async fn into_singleton_unbounded_value() {
1712        let mut deployment = Deployment::new();
1713
1714        let mut flow = FlowBuilder::new();
1715        let node = flow.process::<()>();
1716        let external = flow.external::<()>();
1717
1718        let (input_port, input) = node.source_external_bincode(&external);
1719        let out = input
1720            .into_keyed()
1721            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1722            .into_singleton()
1723            .sample_eager(nondet!(/** test */))
1724            .send_bincode_external(&external);
1725
1726        let nodes = flow
1727            .with_process(&node, deployment.Localhost())
1728            .with_external(&external, deployment.Localhost())
1729            .deploy(&mut deployment);
1730
1731        deployment.deploy().await.unwrap();
1732
1733        let mut external_in = nodes.connect(input_port).await;
1734        let mut external_out = nodes.connect(out).await;
1735
1736        deployment.start().await.unwrap();
1737
1738        assert_eq!(
1739            external_out.next().await.unwrap(),
1740            std::collections::HashMap::new()
1741        );
1742
1743        external_in.send((1, 1)).await.unwrap();
1744        assert_eq!(
1745            external_out.next().await.unwrap(),
1746            vec![(1, 1)].into_iter().collect()
1747        );
1748
1749        external_in.send((1, 2)).await.unwrap();
1750        assert_eq!(
1751            external_out.next().await.unwrap(),
1752            vec![(1, 2)].into_iter().collect()
1753        );
1754
1755        external_in.send((2, 2)).await.unwrap();
1756        assert_eq!(
1757            external_out.next().await.unwrap(),
1758            vec![(1, 2), (2, 1)].into_iter().collect()
1759        );
1760
1761        external_in.send((1, 1)).await.unwrap();
1762        assert_eq!(
1763            external_out.next().await.unwrap(),
1764            vec![(1, 3), (2, 1)].into_iter().collect()
1765        );
1766
1767        external_in.send((3, 1)).await.unwrap();
1768        assert_eq!(
1769            external_out.next().await.unwrap(),
1770            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1771        );
1772    }
1773
1774    #[cfg(feature = "sim")]
1775    #[test]
1776    fn sim_unbounded_singleton_snapshot() {
1777        let mut flow = FlowBuilder::new();
1778        let node = flow.process::<()>();
1779
1780        let (input_port, input) = node.sim_input();
1781        let output = input
1782            .into_keyed()
1783            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1784            .snapshot(&node.tick(), nondet!(/** test */))
1785            .entries()
1786            .all_ticks()
1787            .sim_output();
1788
1789        let count = flow.sim().exhaustive(async || {
1790            input_port.send((1, 123));
1791            input_port.send((1, 456));
1792            input_port.send((2, 123));
1793
1794            let all = output.collect_sorted::<Vec<_>>().await;
1795            assert_eq!(all.last().unwrap(), &(2, 1));
1796        });
1797
1798        assert_eq!(count, 8);
1799    }
1800
1801    #[cfg(feature = "deploy")]
1802    #[tokio::test]
1803    async fn join_keyed_stream() {
1804        let mut deployment = Deployment::new();
1805
1806        let mut flow = FlowBuilder::new();
1807        let node = flow.process::<()>();
1808        let external = flow.external::<()>();
1809
1810        let tick = node.tick();
1811        let keyed_data = node
1812            .source_iter(q!(vec![(1, 10), (2, 20)]))
1813            .into_keyed()
1814            .batch(&tick, nondet!(/** test */))
1815            .first();
1816        let requests = node
1817            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1818            .into_keyed()
1819            .batch(&tick, nondet!(/** test */));
1820
1821        let out = keyed_data
1822            .join_keyed_stream(requests)
1823            .entries()
1824            .all_ticks()
1825            .send_bincode_external(&external);
1826
1827        let nodes = flow
1828            .with_process(&node, deployment.Localhost())
1829            .with_external(&external, deployment.Localhost())
1830            .deploy(&mut deployment);
1831
1832        deployment.deploy().await.unwrap();
1833
1834        let mut external_out = nodes.connect(out).await;
1835
1836        deployment.start().await.unwrap();
1837
1838        let mut results = vec![];
1839        for _ in 0..2 {
1840            results.push(external_out.next().await.unwrap());
1841        }
1842        results.sort();
1843
1844        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1845    }
1846}