1use std::cell::RefCell;
5use std::collections::BTreeMap;
6use std::pin::Pin;
7use std::rc::Rc;
8
9use bytes::Bytes;
10use dfir_lang::graph::DfirGraph;
11use futures::{Sink, Stream};
12use proc_macro2::Span;
13use serde::{Deserialize, Serialize};
14use stageleft::QuotedWithContext;
15use syn::parse_quote;
16use tracing::{instrument, trace};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct HydroManifest {
21 pub processes: BTreeMap<String, ProcessManifest>,
23 pub clusters: BTreeMap<String, ClusterManifest>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct BuildConfig {
30 pub project_dir: String,
32 pub target_dir: String,
34 pub bin_name: String,
36 pub package_name: String,
38 pub features: Vec<String>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", rename_all = "lowercase")]
45pub enum PortInfo {
46 Tcp {
48 port: u16,
50 },
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ProcessManifest {
56 pub build: BuildConfig,
58 pub ports: BTreeMap<String, PortInfo>,
60 pub task_family: String,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ClusterManifest {
67 pub build: BuildConfig,
69 pub ports: BTreeMap<String, PortInfo>,
71 pub default_count: usize,
73 pub task_family_prefix: String,
75}
76
77use super::deploy_runtime_containerized_ecs::*;
78use crate::compile::builder::ExternalPortId;
79use crate::compile::deploy::DeployResult;
80use crate::compile::deploy_provider::{
81 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
82};
83use crate::compile::trybuild::generate::create_graph_trybuild;
84use crate::location::dynamic::LocationId;
85use crate::location::member_id::TaglessMemberId;
86use crate::location::{LocationKey, MembershipEvent, NetworkHint};
87
88#[derive(Clone)]
90pub struct EcsDeployProcess {
91 id: LocationKey,
92 name: String,
93 next_port: Rc<RefCell<u16>>,
94
95 exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
96
97 trybuild_config:
98 Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
99}
100
101impl Node for EcsDeployProcess {
102 type Port = u16;
103 type Meta = ();
104 type InstantiateEnv = EcsDeploy;
105
106 #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
107 fn next_port(&self) -> Self::Port {
108 let port = {
109 let mut borrow = self.next_port.borrow_mut();
110 let port = *borrow;
111 *borrow += 1;
112 port
113 };
114
115 port
116 }
117
118 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
119 fn update_meta(&self, _meta: &Self::Meta) {}
120
121 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
122 fn instantiate(
123 &self,
124 _env: &mut Self::InstantiateEnv,
125 meta: &mut Self::Meta,
126 graph: DfirGraph,
127 extra_stmts: &[syn::Stmt],
128 sidecars: &[syn::Expr],
129 ) {
130 let (bin_name, config) = create_graph_trybuild(
131 graph,
132 extra_stmts,
133 sidecars,
134 Some(&self.name),
135 crate::compile::trybuild::generate::DeployMode::Containerized,
136 crate::compile::trybuild::generate::LinkingMode::Static,
137 );
138
139 *self.trybuild_config.borrow_mut() = Some((bin_name, config));
141 }
142}
143
144impl EcsDeployProcess {
145 pub fn expose_port(&self, port: u16) {
151 let port_name = format!("exposed-{}", port);
152 self.exposed_ports
153 .borrow_mut()
154 .insert(port_name, PortInfo::Tcp { port });
155 }
156}
157
158#[derive(Clone)]
160pub struct EcsDeployCluster {
161 id: LocationKey,
162 name: String,
163 next_port: Rc<RefCell<u16>>,
164
165 exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
166
167 count: usize,
168
169 trybuild_config:
171 Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
172}
173
174impl Node for EcsDeployCluster {
175 type Port = u16;
176 type Meta = ();
177 type InstantiateEnv = EcsDeploy;
178
179 #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
180 fn next_port(&self) -> Self::Port {
181 let port = {
182 let mut borrow = self.next_port.borrow_mut();
183 let port = *borrow;
184 *borrow += 1;
185 port
186 };
187
188 port
189 }
190
191 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
192 fn update_meta(&self, _meta: &Self::Meta) {}
193
194 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, extra_stmts = extra_stmts.len()))]
195 fn instantiate(
196 &self,
197 _env: &mut Self::InstantiateEnv,
198 _meta: &mut Self::Meta,
199 graph: DfirGraph,
200 extra_stmts: &[syn::Stmt],
201 sidecars: &[syn::Expr],
202 ) {
203 let (bin_name, config) = create_graph_trybuild(
204 graph,
205 extra_stmts,
206 sidecars,
207 Some(&self.name),
208 crate::compile::trybuild::generate::DeployMode::Containerized,
209 crate::compile::trybuild::generate::LinkingMode::Static,
210 );
211
212 *self.trybuild_config.borrow_mut() = Some((bin_name, config));
214 }
215}
216
217#[derive(Clone, Debug)]
219pub struct EcsDeployExternal {
220 name: String,
221 next_port: Rc<RefCell<u16>>,
222}
223
224impl Node for EcsDeployExternal {
225 type Port = u16;
226 type Meta = ();
227 type InstantiateEnv = EcsDeploy;
228
229 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
230 fn next_port(&self) -> Self::Port {
231 let port = {
232 let mut borrow = self.next_port.borrow_mut();
233 let port = *borrow;
234 *borrow += 1;
235 port
236 };
237
238 port
239 }
240
241 #[instrument(level = "trace", skip_all, fields(name = self.name))]
242 fn update_meta(&self, _meta: &Self::Meta) {}
243
244 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
245 fn instantiate(
246 &self,
247 _env: &mut Self::InstantiateEnv,
248 meta: &mut Self::Meta,
249 graph: DfirGraph,
250 extra_stmts: &[syn::Stmt],
251 sidecars: &[syn::Expr],
252 ) {
253 trace!(name: "surface", surface = graph.surface_syntax_string());
254 }
255}
256
257impl EcsDeployCluster {
258 pub fn expose_port(&self, port: u16) {
265 let port_name = format!("exposed-{}", port);
266 self.exposed_ports
267 .borrow_mut()
268 .insert(port_name, PortInfo::Tcp { port });
269 }
270}
271
272type DynSourceSink<Out, In, InErr> = (
273 Pin<Box<dyn Stream<Item = Out>>>,
274 Pin<Box<dyn Sink<In, Error = InErr>>>,
275);
276
277impl<'a> RegisterPort<'a, EcsDeploy> for EcsDeployExternal {
278 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
279 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {}
280
281 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
282 fn as_bytes_bidi(
283 &self,
284 _external_port_id: ExternalPortId,
285 ) -> impl Future<
286 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
287 > + 'a {
288 async { unimplemented!() }
289 }
290
291 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
292 fn as_bincode_bidi<InT, OutT>(
293 &self,
294 _external_port_id: ExternalPortId,
295 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
296 where
297 InT: Serialize + 'static,
298 OutT: serde::de::DeserializeOwned + 'static,
299 {
300 async { unimplemented!() }
301 }
302
303 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
304 fn as_bincode_sink<T>(
305 &self,
306 _external_port_id: ExternalPortId,
307 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
308 where
309 T: Serialize + 'static,
310 {
311 async { unimplemented!() }
312 }
313
314 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
315 fn as_bincode_source<T>(
316 &self,
317 _external_port_id: ExternalPortId,
318 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
319 where
320 T: serde::de::DeserializeOwned + 'static,
321 {
322 async { unimplemented!() }
323 }
324}
325
326pub struct EcsDeploy;
328
329impl Default for EcsDeploy {
330 fn default() -> Self {
331 Self::new()
332 }
333}
334
335impl EcsDeploy {
336 pub fn new() -> Self {
338 Self
339 }
340
341 pub fn add_ecs_process(&mut self) -> EcsDeployProcessSpec {
343 EcsDeployProcessSpec
344 }
345
346 pub fn add_ecs_cluster(&mut self, count: usize) -> EcsDeployClusterSpec {
348 EcsDeployClusterSpec { count }
349 }
350
351 pub fn add_external(&self, name: String) -> EcsDeployExternalSpec {
353 EcsDeployExternalSpec { name }
354 }
355
356 #[instrument(level = "trace", skip_all)]
364 pub fn export(&self, nodes: &DeployResult<'_, Self>) -> HydroManifest {
365 let mut manifest = HydroManifest {
366 processes: BTreeMap::new(),
367 clusters: BTreeMap::new(),
368 };
369
370 for (location_id, name_hint, process) in nodes.get_all_processes() {
372 let LocationId::Process(_) = location_id else {
373 unreachable!()
374 };
375
376 let (bin_name, trybuild_config) = process
377 .trybuild_config
378 .borrow()
379 .clone()
380 .expect("trybuild_config should be set after instantiate");
381
382 manifest.processes.insert(
383 name_hint.to_owned(),
384 ProcessManifest {
385 build: build_info_from_config(&bin_name, &trybuild_config),
386 ports: process.exposed_ports.borrow().clone(),
387 task_family: process.name.clone(),
388 },
389 );
390 }
391
392 for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
394 let LocationId::Cluster(_) = location_id else {
395 unreachable!()
396 };
397
398 let (bin_name, trybuild_config) = cluster
399 .trybuild_config
400 .borrow()
401 .clone()
402 .expect("trybuild_config should be set after instantiate");
403
404 manifest.clusters.insert(
405 name_hint.to_owned(),
406 ClusterManifest {
407 build: build_info_from_config(&bin_name, &trybuild_config),
408 ports: cluster.exposed_ports.borrow().clone(),
409 default_count: cluster.count,
410 task_family_prefix: cluster.name.clone(),
411 },
412 );
413 }
414
415 manifest
416 }
417}
418
419fn build_info_from_config(
420 bin_name: &str,
421 config: &crate::compile::trybuild::generate::TrybuildConfig,
422) -> BuildConfig {
423 let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
424 if let Some(extra) = &config.features {
425 features.extend(extra.clone());
426 }
427 let crate_name = config
428 .project_dir
429 .file_name()
430 .and_then(|n| n.to_str())
431 .unwrap_or("unknown")
432 .replace("_", "-");
433
434 let package_name = format!("{}-hydro-trybuild", crate_name);
435
436 BuildConfig {
437 project_dir: config.project_dir.to_string_lossy().into_owned(),
438 target_dir: config.target_dir.to_string_lossy().into_owned(),
439 bin_name: bin_name.to_owned(),
440 package_name,
441 features,
442 }
443}
444
445impl<'a> Deploy<'a> for EcsDeploy {
446 type InstantiateEnv = Self;
447 type Process = EcsDeployProcess;
448 type Cluster = EcsDeployCluster;
449 type External = EcsDeployExternal;
450 type Meta = ();
451
452 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
453 fn o2o_sink_source(
454 _env: &mut Self::InstantiateEnv,
455 p1: &Self::Process,
456 p1_port: &<Self::Process as Node>::Port,
457 p2: &Self::Process,
458 p2_port: &<Self::Process as Node>::Port,
459 name: Option<&str>,
460 networking_info: &crate::networking::NetworkingInfo,
461 ) -> (syn::Expr, syn::Expr) {
462 match networking_info {
463 crate::networking::NetworkingInfo::Tcp {
464 fault: crate::networking::TcpFault::FailStop,
465 } => {}
466 _ => panic!("Unsupported networking info: {:?}", networking_info),
467 }
468
469 deploy_containerized_o2o(
470 &p2.name,
471 name.expect("channel name is required for containerized deployment"),
472 )
473 }
474
475 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
476 fn o2o_connect(
477 p1: &Self::Process,
478 p1_port: &<Self::Process as Node>::Port,
479 p2: &Self::Process,
480 p2_port: &<Self::Process as Node>::Port,
481 ) -> Box<dyn FnOnce()> {
482 let serialized = format!(
483 "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
484 p1.name, p2.name
485 );
486
487 Box::new(move || {
488 trace!(name: "o2o_connect thunk", %serialized);
489 })
490 }
491
492 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
493 fn o2m_sink_source(
494 _env: &mut Self::InstantiateEnv,
495 p1: &Self::Process,
496 p1_port: &<Self::Process as Node>::Port,
497 c2: &Self::Cluster,
498 c2_port: &<Self::Cluster as Node>::Port,
499 name: Option<&str>,
500 networking_info: &crate::networking::NetworkingInfo,
501 ) -> (syn::Expr, syn::Expr) {
502 match networking_info {
503 crate::networking::NetworkingInfo::Tcp {
504 fault: crate::networking::TcpFault::FailStop,
505 } => {}
506 _ => panic!("Unsupported networking info: {:?}", networking_info),
507 }
508
509 deploy_containerized_o2m(
510 name.expect("channel name is required for containerized deployment"),
511 )
512 }
513
514 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
515 fn o2m_connect(
516 p1: &Self::Process,
517 p1_port: &<Self::Process as Node>::Port,
518 c2: &Self::Cluster,
519 c2_port: &<Self::Cluster as Node>::Port,
520 ) -> Box<dyn FnOnce()> {
521 let serialized = format!(
522 "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
523 p1.name, c2.name
524 );
525
526 Box::new(move || {
527 trace!(name: "o2m_connect thunk", %serialized);
528 })
529 }
530
531 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
532 fn m2o_sink_source(
533 _env: &mut Self::InstantiateEnv,
534 c1: &Self::Cluster,
535 c1_port: &<Self::Cluster as Node>::Port,
536 p2: &Self::Process,
537 p2_port: &<Self::Process as Node>::Port,
538 name: Option<&str>,
539 networking_info: &crate::networking::NetworkingInfo,
540 ) -> (syn::Expr, syn::Expr) {
541 match networking_info {
542 crate::networking::NetworkingInfo::Tcp {
543 fault: crate::networking::TcpFault::FailStop,
544 } => {}
545 _ => panic!("Unsupported networking info: {:?}", networking_info),
546 }
547
548 deploy_containerized_m2o(
549 &p2.name,
550 name.expect("channel name is required for containerized deployment"),
551 )
552 }
553
554 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
555 fn m2o_connect(
556 c1: &Self::Cluster,
557 c1_port: &<Self::Cluster as Node>::Port,
558 p2: &Self::Process,
559 p2_port: &<Self::Process as Node>::Port,
560 ) -> Box<dyn FnOnce()> {
561 let serialized = format!(
562 "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
563 c1.name, p2.name
564 );
565
566 Box::new(move || {
567 trace!(name: "m2o_connect thunk", %serialized);
568 })
569 }
570
571 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
572 fn m2m_sink_source(
573 _env: &mut Self::InstantiateEnv,
574 c1: &Self::Cluster,
575 c1_port: &<Self::Cluster as Node>::Port,
576 c2: &Self::Cluster,
577 c2_port: &<Self::Cluster as Node>::Port,
578 name: Option<&str>,
579 networking_info: &crate::networking::NetworkingInfo,
580 ) -> (syn::Expr, syn::Expr) {
581 match networking_info {
582 crate::networking::NetworkingInfo::Tcp {
583 fault: crate::networking::TcpFault::FailStop,
584 } => {}
585 _ => panic!("Unsupported networking info: {:?}", networking_info),
586 }
587
588 deploy_containerized_m2m(
589 name.expect("channel name is required for containerized deployment"),
590 )
591 }
592
593 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
594 fn m2m_connect(
595 c1: &Self::Cluster,
596 c1_port: &<Self::Cluster as Node>::Port,
597 c2: &Self::Cluster,
598 c2_port: &<Self::Cluster as Node>::Port,
599 ) -> Box<dyn FnOnce()> {
600 let serialized = format!(
601 "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
602 c1.name, c2.name
603 );
604
605 Box::new(move || {
606 trace!(name: "m2m_connect thunk", %serialized);
607 })
608 }
609
610 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
611 fn e2o_many_source(
612 extra_stmts: &mut Vec<syn::Stmt>,
613 p2: &Self::Process,
614 p2_port: &<Self::Process as Node>::Port,
615 codec_type: &syn::Type,
616 shared_handle: String,
617 ) -> syn::Expr {
618 p2.exposed_ports
619 .borrow_mut()
620 .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
621
622 let socket_ident = syn::Ident::new(
623 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
624 Span::call_site(),
625 );
626
627 let source_ident = syn::Ident::new(
628 &format!("__hydro_deploy_many_{}_source", &shared_handle),
629 Span::call_site(),
630 );
631
632 let sink_ident = syn::Ident::new(
633 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
634 Span::call_site(),
635 );
636
637 let membership_ident = syn::Ident::new(
638 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
639 Span::call_site(),
640 );
641
642 let bind_addr = format!("0.0.0.0:{}", p2_port);
643
644 extra_stmts.push(syn::parse_quote! {
645 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
646 });
647
648 let root = crate::staging_util::get_this_crate();
649
650 extra_stmts.push(syn::parse_quote! {
651 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
652 });
653
654 parse_quote!(#source_ident)
655 }
656
657 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
658 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
659 let sink_ident = syn::Ident::new(
660 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
661 Span::call_site(),
662 );
663 parse_quote!(#sink_ident)
664 }
665
666 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
667 fn e2o_source(
668 extra_stmts: &mut Vec<syn::Stmt>,
669 p1: &Self::External,
670 p1_port: &<Self::External as Node>::Port,
671 p2: &Self::Process,
672 p2_port: &<Self::Process as Node>::Port,
673 codec_type: &syn::Type,
674 shared_handle: String,
675 ) -> syn::Expr {
676 p2.exposed_ports
678 .borrow_mut()
679 .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
680
681 let source_ident = syn::Ident::new(
682 &format!("__hydro_deploy_{}_source", &shared_handle),
683 Span::call_site(),
684 );
685
686 let bind_addr = format!("0.0.0.0:{}", p2_port);
687
688 let socket_ident = syn::Ident::new(
691 &format!("__hydro_deploy_{}_socket", &shared_handle),
692 Span::call_site(),
693 );
694
695 let sink_ident = syn::Ident::new(
696 &format!("__hydro_deploy_{}_sink", &shared_handle),
697 Span::call_site(),
698 );
699
700 extra_stmts.push(syn::parse_quote! {
701 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
702 });
703
704 let create_expr = deploy_containerized_external_sink_source_ident(bind_addr, socket_ident);
705
706 extra_stmts.push(syn::parse_quote! {
707 let (#sink_ident, #source_ident) = (#create_expr).split();
708 });
709
710 parse_quote!(#source_ident)
711 }
712
713 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
714 fn e2o_connect(
715 p1: &Self::External,
716 p1_port: &<Self::External as Node>::Port,
717 p2: &Self::Process,
718 p2_port: &<Self::Process as Node>::Port,
719 many: bool,
720 server_hint: NetworkHint,
721 ) -> Box<dyn FnOnce()> {
722 let serialized = format!(
723 "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
724 p1.name, p2.name
725 );
726
727 Box::new(move || {
728 trace!(name: "e2o_connect thunk", %serialized);
729 })
730 }
731
732 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
733 fn o2e_sink(
734 p1: &Self::Process,
735 p1_port: &<Self::Process as Node>::Port,
736 p2: &Self::External,
737 p2_port: &<Self::External as Node>::Port,
738 shared_handle: String,
739 ) -> syn::Expr {
740 let sink_ident = syn::Ident::new(
741 &format!("__hydro_deploy_{}_sink", &shared_handle),
742 Span::call_site(),
743 );
744 parse_quote!(#sink_ident)
745 }
746
747 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
748 fn cluster_ids(
749 of_cluster: LocationKey,
750 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
751 cluster_ids()
752 }
753
754 #[instrument(level = "trace", skip_all)]
755 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
756 cluster_self_id()
757 }
758
759 #[instrument(level = "trace", skip_all, fields(?location_id))]
760 fn cluster_membership_stream(
761 _env: &mut Self::InstantiateEnv,
762 _at_location: &LocationId,
763 location_id: &LocationId,
764 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
765 {
766 cluster_membership_stream(location_id)
767 }
768}
769
770#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location))]
771fn get_ecs_image_name(name_hint: &str, location: LocationKey) -> String {
772 let name_hint = name_hint
773 .split("::")
774 .last()
775 .unwrap()
776 .to_ascii_lowercase()
777 .replace(".", "-")
778 .replace("_", "-")
779 .replace("::", "-");
780
781 format!("hy-{name_hint}-{location}")
782}
783
784#[derive(Clone)]
786pub struct EcsDeployProcessSpec;
787
788impl<'a> ProcessSpec<'a, EcsDeploy> for EcsDeployProcessSpec {
789 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
790 fn build(self, id: LocationKey, name_hint: &'_ str) -> <EcsDeploy as Deploy<'a>>::Process {
791 EcsDeployProcess {
792 id,
793 name: get_ecs_image_name(name_hint, id),
794 next_port: Rc::new(RefCell::new(10001)),
795 exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
796 trybuild_config: Rc::new(RefCell::new(None)),
797 }
798 }
799}
800
801#[derive(Clone)]
803pub struct EcsDeployClusterSpec {
804 count: usize,
805}
806
807impl<'a> ClusterSpec<'a, EcsDeploy> for EcsDeployClusterSpec {
808 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
809 fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::Cluster {
810 EcsDeployCluster {
811 id,
812 name: get_ecs_image_name(name_hint, id),
813 next_port: Rc::new(RefCell::new(10001)),
814 exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
815 count: self.count,
816 trybuild_config: Rc::new(RefCell::new(None)),
817 }
818 }
819}
820
821pub struct EcsDeployExternalSpec {
823 name: String,
824}
825
826impl<'a> ExternalSpec<'a, EcsDeploy> for EcsDeployExternalSpec {
827 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
828 fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::External {
829 EcsDeployExternal {
830 name: self.name,
831 next_port: Rc::new(RefCell::new(10000)),
832 }
833 }
834}