1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19pub use hydro_deploy::LinuxCompileType;
21use hydro_deploy::RustCrate;
22use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
23use nanoid::nanoid;
24use proc_macro2::Span;
25use sinktools::lazy::LazySink;
26use stageleft::QuotedWithContext;
27use syn::parse_quote;
28use tar::{Builder, Header};
29use tokio::net::TcpStream;
30use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
31use tracing::{Instrument, instrument, trace, warn};
32
33use super::deploy_runtime_containerized::*;
34use crate::compile::builder::ExternalPortId;
35use crate::compile::deploy::DeployResult;
36use crate::compile::deploy_provider::{
37 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
38};
39use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
40use crate::location::dynamic::LocationId;
41use crate::location::member_id::TaglessMemberId;
42use crate::location::{LocationKey, MembershipEvent, NetworkHint};
43
44#[derive(Clone, Debug)]
46pub struct DockerNetwork {
47 name: String,
48}
49
50impl DockerNetwork {
51 pub fn new(name: String) -> Self {
53 Self {
54 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
55 }
56 }
57}
58
59#[derive(Clone)]
61pub struct DockerDeployProcess {
62 key: LocationKey,
63 name: String,
64 next_port: Rc<RefCell<u16>>,
65 rust_crate: Rc<RefCell<Option<RustCrate>>>,
66
67 exposed_ports: Rc<RefCell<Vec<u16>>>,
68
69 docker_container_name: Rc<RefCell<Option<String>>>,
70
71 compilation_options: Option<String>,
72
73 config: Vec<String>,
74
75 network: DockerNetwork,
76
77 base_image: Option<String>,
78
79 linux_compile_type: LinuxCompileType,
80}
81
82impl Node for DockerDeployProcess {
83 type Port = u16;
84 type Meta = ();
85 type InstantiateEnv = DockerDeploy;
86
87 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
88 fn next_port(&self) -> Self::Port {
89 let port = {
90 let mut borrow = self.next_port.borrow_mut();
91 let port = *borrow;
92 *borrow += 1;
93 port
94 };
95
96 port
97 }
98
99 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
100 fn update_meta(&self, _meta: &Self::Meta) {}
101
102 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
103 fn instantiate(
104 &self,
105 _env: &mut Self::InstantiateEnv,
106 meta: &mut Self::Meta,
107 graph: DfirGraph,
108 extra_stmts: &[syn::Stmt],
109 sidecars: &[syn::Expr],
110 ) {
111 let (bin_name, config) = create_graph_trybuild(
112 graph,
113 extra_stmts,
114 sidecars,
115 Some(&self.name),
116 crate::compile::trybuild::generate::DeployMode::Containerized,
117 LinkingMode::Static,
118 );
119
120 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
121 .target_dir(config.target_dir)
122 .example(bin_name)
123 .no_default_features();
124
125 ret = ret.display_name("test_display_name");
126
127 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
128
129 if let Some(features) = config.features {
130 ret = ret.features(features);
131 }
132
133 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
134 ret = ret.config("build.incremental = false");
135
136 *self.rust_crate.borrow_mut() = Some(ret);
137 }
138}
139
140#[derive(Clone)]
142pub struct DockerDeployCluster {
143 key: LocationKey,
144 name: String,
145 next_port: Rc<RefCell<u16>>,
146 rust_crate: Rc<RefCell<Option<RustCrate>>>,
147
148 exposed_ports: Rc<RefCell<Vec<u16>>>,
149
150 docker_container_name: Rc<RefCell<Vec<String>>>,
151
152 compilation_options: Option<String>,
153
154 config: Vec<String>,
155
156 count: usize,
157
158 base_image: Option<String>,
159
160 linux_compile_type: LinuxCompileType,
161}
162
163impl Node for DockerDeployCluster {
164 type Port = u16;
165 type Meta = ();
166 type InstantiateEnv = DockerDeploy;
167
168 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
169 fn next_port(&self) -> Self::Port {
170 let port = {
171 let mut borrow = self.next_port.borrow_mut();
172 let port = *borrow;
173 *borrow += 1;
174 port
175 };
176
177 port
178 }
179
180 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
181 fn update_meta(&self, _meta: &Self::Meta) {}
182
183 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
184 fn instantiate(
185 &self,
186 _env: &mut Self::InstantiateEnv,
187 _meta: &mut Self::Meta,
188 graph: DfirGraph,
189 extra_stmts: &[syn::Stmt],
190 sidecars: &[syn::Expr],
191 ) {
192 let (bin_name, config) = create_graph_trybuild(
193 graph,
194 extra_stmts,
195 sidecars,
196 Some(&self.name),
197 crate::compile::trybuild::generate::DeployMode::Containerized,
198 LinkingMode::Static,
199 );
200
201 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
202 .target_dir(config.target_dir)
203 .example(bin_name)
204 .no_default_features();
205
206 ret = ret.display_name("test_display_name");
207
208 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
209
210 if let Some(features) = config.features {
211 ret = ret.features(features);
212 }
213
214 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
215 ret = ret.config("build.incremental = false");
216
217 *self.rust_crate.borrow_mut() = Some(ret);
218 }
219}
220
221#[derive(Clone, Debug)]
223#[expect(
224 dead_code,
225 reason = "fields used via Rc<RefCell> in RegisterPort impl and ExternalBytesPort construction"
226)]
227pub struct DockerDeployExternal {
228 pub(crate) key: LocationKey,
230 name: String,
231 next_port: Rc<RefCell<u16>>,
232
233 next_external_port_id: Rc<RefCell<ExternalPortId>>,
235
236 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
237
238 #[expect(clippy::type_complexity, reason = "internal code")]
239 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
240}
241
242impl Node for DockerDeployExternal {
243 type Port = u16;
244 type Meta = ();
245 type InstantiateEnv = DockerDeploy;
246
247 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
248 fn next_port(&self) -> Self::Port {
249 let port = {
250 let mut borrow = self.next_port.borrow_mut();
251 let port = *borrow;
252 *borrow += 1;
253 port
254 };
255
256 port
257 }
258
259 #[instrument(level = "trace", skip_all, fields(name = self.name))]
260 fn update_meta(&self, _meta: &Self::Meta) {}
261
262 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
263 fn instantiate(
264 &self,
265 _env: &mut Self::InstantiateEnv,
266 meta: &mut Self::Meta,
267 graph: DfirGraph,
268 extra_stmts: &[syn::Stmt],
269 sidecars: &[syn::Expr],
270 ) {
271 trace!(name: "surface", surface = graph.surface_syntax_string());
272 }
273}
274
275impl DockerDeployProcess {
276 pub fn expose_port(&self, port: u16) {
282 self.exposed_ports.borrow_mut().push(port);
283 }
284
285 pub async fn get_tcp_endpoint(&self, container_port: u16) -> (String, u16) {
289 let name = self
290 .docker_container_name
291 .borrow()
292 .as_ref()
293 .expect("container not yet started")
294 .clone();
295 let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
296 ("localhost".to_owned(), host_port)
297 }
298}
299
300impl DockerDeployCluster {
301 pub fn expose_port(&self, port: u16) {
307 self.exposed_ports.borrow_mut().push(port);
308 }
309
310 pub async fn get_all_tcp_endpoints(&self, container_port: u16) -> Vec<(String, u16)> {
314 let names = self.docker_container_name.borrow().clone();
315 let mut endpoints = Vec::with_capacity(names.len());
316 for name in names {
317 let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
318 endpoints.push(("localhost".to_owned(), host_port));
319 }
320 endpoints
321 }
322}
323
324type DynSourceSink<Out, In, InErr> = (
325 Pin<Box<dyn Stream<Item = Out>>>,
326 Pin<Box<dyn Sink<In, Error = InErr>>>,
327);
328
329impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
330 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
331 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
332 self.ports.borrow_mut().insert(external_port_id, port);
333 }
334
335 fn as_bytes_bidi(
336 &self,
337 external_port_id: ExternalPortId,
338 ) -> impl Future<
339 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
340 > + 'a {
341 let guard =
342 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
343
344 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
345 let (docker_container_name, remote_port, _) = self
346 .connection_info
347 .borrow()
348 .get(&local_port)
349 .unwrap()
350 .clone();
351
352 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
353
354 async move {
355 let local_port =
356 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
357 let remote_ip_address = "localhost";
358
359 trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
360
361 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
362 .await
363 .unwrap();
364
365 trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
366
367 let (rx, tx) = stream.into_split();
368
369 let source = Box::pin(
370 FramedRead::new(rx, LengthDelimitedCodec::new()),
371 ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
372
373 let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
374 as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
375
376 (source, sink)
377 }
378 .instrument(guard.exit())
379 }
380
381 fn as_bincode_bidi<InT, OutT>(
382 &self,
383 external_port_id: ExternalPortId,
384 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
385 where
386 InT: serde::Serialize + 'static,
387 OutT: serde::de::DeserializeOwned + 'static,
388 {
389 let guard =
390 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
391
392 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
393 let (docker_container_name, remote_port, _) = self
394 .connection_info
395 .borrow()
396 .get(&local_port)
397 .unwrap()
398 .clone();
399
400 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
401
402 async move {
403 let local_port =
404 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
405 let remote_ip_address = "localhost";
406
407 trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
408
409 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
410 .await
411 .unwrap();
412
413 trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
414
415 let (rx, tx) = stream.into_split();
416
417 let source = Box::pin(
418 FramedRead::new(rx, LengthDelimitedCodec::new())
419 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
420 ) as Pin<Box<dyn Stream<Item = OutT>>>;
421
422 let sink = Box::pin(
423 FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
424 Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
425 }),
426 ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
427
428 (source, sink)
429 }
430 .instrument(guard.exit())
431 }
432
433 fn as_bincode_sink<T>(
434 &self,
435 external_port_id: ExternalPortId,
436 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
437 where
438 T: serde::Serialize + 'static,
439 {
440 let guard =
441 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
442
443 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
444 let (docker_container_name, remote_port, _) = self
445 .connection_info
446 .borrow()
447 .get(&local_port)
448 .unwrap()
449 .clone();
450
451 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
452
453 async move {
454 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
455 let remote_ip_address = "localhost";
456
457 Box::pin(
458 LazySink::new(move || {
459 Box::pin(async move {
460 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
461
462 let stream =
463 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
464 .await?;
465
466 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
467
468 Result::<_, std::io::Error>::Ok(FramedWrite::new(
469 stream,
470 LengthDelimitedCodec::new(),
471 ))
472 })
473 })
474 .with(move |v| async move {
475 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
476 }),
477 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
478 }
479 .instrument(guard.exit())
480 }
481
482 fn as_bincode_source<T>(
483 &self,
484 external_port_id: ExternalPortId,
485 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
486 where
487 T: serde::de::DeserializeOwned + 'static,
488 {
489 let guard =
490 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
491
492 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
493 let (docker_container_name, remote_port, _) = self
494 .connection_info
495 .borrow()
496 .get(&local_port)
497 .unwrap()
498 .clone();
499
500 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
501
502 async move {
503
504 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
505 let remote_ip_address = "localhost";
506
507 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
508
509 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
510 .await
511 .unwrap();
512
513 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
514
515 Box::pin(
516 FramedRead::new(stream, LengthDelimitedCodec::new())
517 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
518 ) as Pin<Box<dyn Stream<Item = T>>>
519 }
520 .instrument(guard.exit())
521 }
522}
523
524#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
525async fn find_dynamically_allocated_docker_port(
526 docker_container_name: &str,
527 destination_port: u16,
528) -> u16 {
529 let docker = Docker::connect_with_local_defaults().unwrap();
530
531 let container_info = docker
532 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
533 .await
534 .unwrap();
535
536 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
537
538 let remote_port = container_info
540 .network_settings
541 .as_ref()
542 .unwrap()
543 .ports
544 .as_ref()
545 .unwrap()
546 .get(&format!("{destination_port}/tcp"))
547 .unwrap()
548 .as_ref()
549 .unwrap()
550 .iter()
551 .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
552 .unwrap()
553 .host_port
554 .as_ref()
555 .unwrap()
556 .parse()
557 .unwrap();
558
559 remote_port
560}
561
562pub struct DockerDeploy {
564 docker_processes: Vec<DockerDeployProcessSpec>,
565 docker_clusters: Vec<DockerDeployClusterSpec>,
566 network: DockerNetwork,
567 deployment_instance: String,
568}
569
570#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
571async fn create_and_start_container(
572 docker: &Docker,
573 container_name: &str,
574 image_name: &str,
575 network_name: &str,
576 deployment_instance: &str,
577) -> Result<(), anyhow::Error> {
578 let config = ContainerCreateBody {
579 image: Some(image_name.to_owned()),
580 hostname: Some(container_name.to_owned()),
581 host_config: Some(HostConfig {
582 binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
583 publish_all_ports: Some(true),
584 port_bindings: Some(HashMap::new()), ..Default::default()
586 }),
587 env: Some(vec![
588 format!("CONTAINER_NAME={container_name}"),
589 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
590 format!("RUST_LOG=trace"),
591 ]),
592 networking_config: Some(NetworkingConfig {
593 endpoints_config: Some(HashMap::from([(
594 network_name.to_owned(),
595 EndpointSettings {
596 ..Default::default()
597 },
598 )])),
599 }),
600 tty: Some(true),
601 ..Default::default()
602 };
603
604 let options = CreateContainerOptions {
605 name: Some(container_name.to_owned()),
606 ..Default::default()
607 };
608
609 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
610 docker.create_container(Some(options), config).await?;
611 docker
612 .start_container(container_name, None::<StartContainerOptions>)
613 .await?;
614
615 Ok(())
616}
617
618#[instrument(level = "trace", skip_all, fields(%image_name))]
619async fn build_and_create_image(
620 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
621 compilation_options: Option<&str>,
622 config: &[String],
623 exposed_ports: &[u16],
624 image_name: &str,
625 base_image: Option<&str>,
626 linux_compile_type: LinuxCompileType,
627) -> Result<(), anyhow::Error> {
628 let mut rust_crate = rust_crate
629 .borrow_mut()
630 .take()
631 .unwrap()
632 .rustflags(compilation_options.unwrap_or_default());
633
634 for cfg in config {
635 rust_crate = rust_crate.config(cfg);
636 }
637
638 let build_output = match build_crate_memoized(
639 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(linux_compile_type)),
640 )
641 .await
642 {
643 Ok(build_output) => build_output,
644 Err(BuildError::FailedToBuildCrate {
645 exit_status,
646 diagnostics,
647 text_lines,
648 stderr_lines,
649 }) => {
650 let diagnostics = diagnostics
651 .into_iter()
652 .map(|d| d.rendered.unwrap())
653 .collect::<Vec<_>>()
654 .join("\n");
655 let text_lines = text_lines.join("\n");
656 let stderr_lines = stderr_lines.join("\n");
657
658 anyhow::bail!(
659 r#"
660Failed to build crate {exit_status:?}
661--- diagnostics
662---
663{diagnostics}
664---
665---
666---
667
668--- text_lines
669---
670---
671{text_lines}
672---
673---
674---
675
676--- stderr_lines
677---
678---
679{stderr_lines}
680---
681---
682---"#
683 );
684 }
685 Err(err) => {
686 anyhow::bail!("Failed to build crate {err:?}");
687 }
688 };
689
690 let docker = Docker::connect_with_local_defaults()?;
691
692 let mut tar_data = Vec::new();
693 {
694 let mut tar = Builder::new(&mut tar_data);
695
696 let exposed_ports = exposed_ports
697 .iter()
698 .map(|port| format!("EXPOSE {port}/tcp"))
699 .collect::<Vec<_>>()
700 .join("\n");
701
702 let from_image = base_image.unwrap_or("scratch");
703 let dockerfile_content = format!(
704 r#"
705 FROM {from_image}
706 {exposed_ports}
707 COPY app /app
708 CMD ["/app"]
709 "#,
710 );
711
712 trace!(name: "dockerfile", %dockerfile_content);
713
714 let mut header = Header::new_gnu();
715 header.set_path("Dockerfile")?;
716 header.set_size(dockerfile_content.len() as u64);
717 header.set_cksum();
718 tar.append(&header, dockerfile_content.as_bytes())?;
719
720 let mut header = Header::new_gnu();
721 header.set_path("app")?;
722 header.set_size(build_output.bin_data.len() as u64);
723 header.set_mode(0o755);
724 header.set_cksum();
725 tar.append(&header, &build_output.bin_data[..])?;
726
727 tar.finish()?;
728 }
729
730 let build_options = BuildImageOptions {
731 dockerfile: "Dockerfile".to_owned(),
732 t: Some(image_name.to_owned()),
733 rm: true,
734 ..Default::default()
735 };
736
737 use bollard::errors::Error;
738
739 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
740 let mut build_stream = docker.build_image(build_options, None, Some(body));
741 while let Some(msg) = build_stream.next().await {
742 match msg {
743 Ok(_) => {}
744 Err(e) => match e {
745 Error::DockerStreamError { error } => {
746 return Err(anyhow::anyhow!(
747 "Docker build failed: DockerStreamError: {{ error: {error} }}"
748 ));
749 }
750 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
751 },
752 }
753 }
754
755 Ok(())
756}
757
758impl DockerDeploy {
759 pub fn new(network: DockerNetwork) -> Self {
761 Self {
762 docker_processes: Vec::new(),
763 docker_clusters: Vec::new(),
764 network,
765 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
766 }
767 }
768
769 pub fn add_localhost_docker(
771 &mut self,
772 compilation_options: Option<String>,
773 config: Vec<String>,
774 ) -> DockerDeployProcessSpec {
775 let process = DockerDeployProcessSpec {
776 compilation_options,
777 config,
778 network: self.network.clone(),
779 deployment_instance: self.deployment_instance.clone(),
780 base_image: None,
781 linux_compile_type: LinuxCompileType::Musl,
782 };
783
784 self.docker_processes.push(process.clone());
785
786 process
787 }
788
789 pub fn add_localhost_docker_cluster(
791 &mut self,
792 compilation_options: Option<String>,
793 config: Vec<String>,
794 count: usize,
795 ) -> DockerDeployClusterSpec {
796 let cluster = DockerDeployClusterSpec {
797 compilation_options,
798 config,
799 count,
800 deployment_instance: self.deployment_instance.clone(),
801 base_image: None,
802 linux_compile_type: LinuxCompileType::Musl,
803 };
804
805 self.docker_clusters.push(cluster.clone());
806
807 cluster
808 }
809
810 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
812 DockerDeployExternalSpec { name }
813 }
814
815 pub fn get_deployment_instance(&self) -> String {
817 self.deployment_instance.clone()
818 }
819
820 #[instrument(level = "trace", skip_all)]
822 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
823 for (_, _, process) in nodes.get_all_processes() {
824 let exposed_ports = process.exposed_ports.borrow().clone();
825
826 build_and_create_image(
827 &process.rust_crate,
828 process.compilation_options.as_deref(),
829 &process.config,
830 &exposed_ports,
831 &process.name,
832 process.base_image.as_deref(),
833 process.linux_compile_type,
834 )
835 .await?;
836 }
837
838 for (_, _, cluster) in nodes.get_all_clusters() {
839 let exposed_ports = cluster.exposed_ports.borrow().clone();
840 build_and_create_image(
841 &cluster.rust_crate,
842 cluster.compilation_options.as_deref(),
843 &cluster.config,
844 &exposed_ports,
845 &cluster.name,
846 cluster.base_image.as_deref(),
847 cluster.linux_compile_type,
848 )
849 .await?;
850 }
851
852 Ok(())
853 }
854
855 #[instrument(level = "trace", skip_all)]
857 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
858 let docker = Docker::connect_with_local_defaults()?;
859
860 match docker
861 .create_network(NetworkCreateRequest {
862 name: self.network.name.clone(),
863 driver: Some("bridge".to_owned()),
864 ..Default::default()
865 })
866 .await
867 {
868 Ok(v) => v.id,
869 Err(e) => {
870 panic!("Failed to create docker network: {e:?}");
871 }
872 };
873
874 for (_, _, process) in nodes.get_all_processes() {
875 let docker_container_name: String = get_docker_container_name(&process.name, None);
876 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
877
878 create_and_start_container(
879 &docker,
880 &docker_container_name,
881 &process.name,
882 &self.network.name,
883 &self.deployment_instance,
884 )
885 .await?;
886 }
887
888 for (_, _, cluster) in nodes.get_all_clusters() {
889 for num in 0..cluster.count {
890 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
891 cluster
892 .docker_container_name
893 .borrow_mut()
894 .push(docker_container_name.clone());
895
896 create_and_start_container(
897 &docker,
898 &docker_container_name,
899 &cluster.name,
900 &self.network.name,
901 &self.deployment_instance,
902 )
903 .await?;
904 }
905 }
906
907 Ok(())
908 }
909
910 #[instrument(level = "trace", skip_all)]
912 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
913 let docker = Docker::connect_with_local_defaults()?;
914
915 for (_, _, process) in nodes.get_all_processes() {
916 let docker_container_name: String = get_docker_container_name(&process.name, None);
917
918 docker
919 .kill_container(&docker_container_name, None::<KillContainerOptions>)
920 .await?;
921 }
922
923 for (_, _, cluster) in nodes.get_all_clusters() {
924 for num in 0..cluster.count {
925 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
926
927 docker
928 .kill_container(&docker_container_name, None::<KillContainerOptions>)
929 .await?;
930 }
931 }
932
933 Ok(())
934 }
935
936 #[instrument(level = "trace", skip_all)]
938 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
939 let docker = Docker::connect_with_local_defaults()?;
940
941 for (_, _, process) in nodes.get_all_processes() {
942 let docker_container_name: String = get_docker_container_name(&process.name, None);
943
944 docker
945 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
946 .await?;
947 }
948
949 for (_, _, cluster) in nodes.get_all_clusters() {
950 for num in 0..cluster.count {
951 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
952
953 docker
954 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
955 .await?;
956 }
957 }
958
959 docker
960 .remove_network(&self.network.name)
961 .await
962 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
963
964 use bollard::query_parameters::RemoveImageOptions;
965
966 for (_, _, process) in nodes.get_all_processes() {
967 docker
968 .remove_image(&process.name, None::<RemoveImageOptions>, None)
969 .await?;
970 }
971
972 for (_, _, cluster) in nodes.get_all_clusters() {
973 docker
974 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
975 .await?;
976 }
977
978 Ok(())
979 }
980}
981
982impl<'a> Deploy<'a> for DockerDeploy {
983 type Meta = ();
984 type InstantiateEnv = Self;
985
986 type Process = DockerDeployProcess;
987 type Cluster = DockerDeployCluster;
988 type External = DockerDeployExternal;
989
990 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
991 fn o2o_sink_source(
992 _env: &mut Self::InstantiateEnv,
993 p1: &Self::Process,
994 p1_port: &<Self::Process as Node>::Port,
995 p2: &Self::Process,
996 p2_port: &<Self::Process as Node>::Port,
997 name: Option<&str>,
998 networking_info: &crate::networking::NetworkingInfo,
999 ) -> (syn::Expr, syn::Expr) {
1000 match networking_info {
1001 crate::networking::NetworkingInfo::Tcp {
1002 fault: crate::networking::TcpFault::FailStop,
1003 } => {}
1004 _ => panic!("Unsupported networking info: {:?}", networking_info),
1005 }
1006
1007 deploy_containerized_o2o(
1008 &p2.name,
1009 name.expect("channel name is required for containerized deployment"),
1010 )
1011 }
1012
1013 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
1014 fn o2o_connect(
1015 p1: &Self::Process,
1016 p1_port: &<Self::Process as Node>::Port,
1017 p2: &Self::Process,
1018 p2_port: &<Self::Process as Node>::Port,
1019 ) -> Box<dyn FnOnce()> {
1020 let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1021
1022 Box::new(move || {
1023 trace!(name: "o2o_connect thunk", %serialized);
1024 })
1025 }
1026
1027 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1028 fn o2m_sink_source(
1029 _env: &mut Self::InstantiateEnv,
1030 p1: &Self::Process,
1031 p1_port: &<Self::Process as Node>::Port,
1032 c2: &Self::Cluster,
1033 c2_port: &<Self::Cluster as Node>::Port,
1034 name: Option<&str>,
1035 networking_info: &crate::networking::NetworkingInfo,
1036 ) -> (syn::Expr, syn::Expr) {
1037 match networking_info {
1038 crate::networking::NetworkingInfo::Tcp {
1039 fault: crate::networking::TcpFault::FailStop,
1040 } => {}
1041 _ => panic!("Unsupported networking info: {:?}", networking_info),
1042 }
1043
1044 deploy_containerized_o2m(
1045 name.expect("channel name is required for containerized deployment"),
1046 )
1047 }
1048
1049 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1050 fn o2m_connect(
1051 p1: &Self::Process,
1052 p1_port: &<Self::Process as Node>::Port,
1053 c2: &Self::Cluster,
1054 c2_port: &<Self::Cluster as Node>::Port,
1055 ) -> Box<dyn FnOnce()> {
1056 let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
1057
1058 Box::new(move || {
1059 trace!(name: "o2m_connect thunk", %serialized);
1060 })
1061 }
1062
1063 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1064 fn m2o_sink_source(
1065 _env: &mut Self::InstantiateEnv,
1066 c1: &Self::Cluster,
1067 c1_port: &<Self::Cluster as Node>::Port,
1068 p2: &Self::Process,
1069 p2_port: &<Self::Process as Node>::Port,
1070 name: Option<&str>,
1071 networking_info: &crate::networking::NetworkingInfo,
1072 ) -> (syn::Expr, syn::Expr) {
1073 match networking_info {
1074 crate::networking::NetworkingInfo::Tcp {
1075 fault: crate::networking::TcpFault::FailStop,
1076 } => {}
1077 _ => panic!("Unsupported networking info: {:?}", networking_info),
1078 }
1079
1080 deploy_containerized_m2o(
1081 &p2.name,
1082 name.expect("channel name is required for containerized deployment"),
1083 )
1084 }
1085
1086 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1087 fn m2o_connect(
1088 c1: &Self::Cluster,
1089 c1_port: &<Self::Cluster as Node>::Port,
1090 p2: &Self::Process,
1091 p2_port: &<Self::Process as Node>::Port,
1092 ) -> Box<dyn FnOnce()> {
1093 let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1094
1095 Box::new(move || {
1096 trace!(name: "m2o_connect thunk", %serialized);
1097 })
1098 }
1099
1100 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1101 fn m2m_sink_source(
1102 _env: &mut Self::InstantiateEnv,
1103 c1: &Self::Cluster,
1104 c1_port: &<Self::Cluster as Node>::Port,
1105 c2: &Self::Cluster,
1106 c2_port: &<Self::Cluster as Node>::Port,
1107 name: Option<&str>,
1108 networking_info: &crate::networking::NetworkingInfo,
1109 ) -> (syn::Expr, syn::Expr) {
1110 match networking_info {
1111 crate::networking::NetworkingInfo::Tcp {
1112 fault: crate::networking::TcpFault::FailStop,
1113 } => {}
1114 _ => panic!("Unsupported networking info: {:?}", networking_info),
1115 }
1116
1117 deploy_containerized_m2m(
1118 name.expect("channel name is required for containerized deployment"),
1119 )
1120 }
1121
1122 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1123 fn m2m_connect(
1124 c1: &Self::Cluster,
1125 c1_port: &<Self::Cluster as Node>::Port,
1126 c2: &Self::Cluster,
1127 c2_port: &<Self::Cluster as Node>::Port,
1128 ) -> Box<dyn FnOnce()> {
1129 let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1130
1131 Box::new(move || {
1132 trace!(name: "m2m_connect thunk", %serialized);
1133 })
1134 }
1135
1136 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1137 fn e2o_many_source(
1138 extra_stmts: &mut Vec<syn::Stmt>,
1139 p2: &Self::Process,
1140 p2_port: &<Self::Process as Node>::Port,
1141 codec_type: &syn::Type,
1142 shared_handle: String,
1143 ) -> syn::Expr {
1144 p2.exposed_ports.borrow_mut().push(*p2_port);
1145
1146 let socket_ident = syn::Ident::new(
1147 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1148 Span::call_site(),
1149 );
1150
1151 let source_ident = syn::Ident::new(
1152 &format!("__hydro_deploy_many_{}_source", &shared_handle),
1153 Span::call_site(),
1154 );
1155
1156 let sink_ident = syn::Ident::new(
1157 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1158 Span::call_site(),
1159 );
1160
1161 let membership_ident = syn::Ident::new(
1162 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1163 Span::call_site(),
1164 );
1165
1166 let bind_addr = format!("0.0.0.0:{}", p2_port);
1167
1168 extra_stmts.push(syn::parse_quote! {
1169 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1170 });
1171
1172 let root = crate::staging_util::get_this_crate();
1173
1174 extra_stmts.push(syn::parse_quote! {
1175 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1176 });
1177
1178 parse_quote!(#source_ident)
1179 }
1180
1181 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1182 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1183 let sink_ident = syn::Ident::new(
1184 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1185 Span::call_site(),
1186 );
1187 parse_quote!(#sink_ident)
1188 }
1189
1190 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1191 fn e2o_source(
1192 extra_stmts: &mut Vec<syn::Stmt>,
1193 p1: &Self::External,
1194 p1_port: &<Self::External as Node>::Port,
1195 p2: &Self::Process,
1196 p2_port: &<Self::Process as Node>::Port,
1197 _codec_type: &syn::Type,
1198 shared_handle: String,
1199 ) -> syn::Expr {
1200 p1.connection_info.borrow_mut().insert(
1201 *p1_port,
1202 (
1203 p2.docker_container_name.clone(),
1204 *p2_port,
1205 p2.network.clone(),
1206 ),
1207 );
1208
1209 p2.exposed_ports.borrow_mut().push(*p2_port);
1210
1211 let socket_ident = syn::Ident::new(
1212 &format!("__hydro_deploy_{}_socket", &shared_handle),
1213 Span::call_site(),
1214 );
1215
1216 let source_ident = syn::Ident::new(
1217 &format!("__hydro_deploy_{}_source", &shared_handle),
1218 Span::call_site(),
1219 );
1220
1221 let sink_ident = syn::Ident::new(
1222 &format!("__hydro_deploy_{}_sink", &shared_handle),
1223 Span::call_site(),
1224 );
1225
1226 let bind_addr = format!("0.0.0.0:{}", p2_port);
1227
1228 extra_stmts.push(syn::parse_quote! {
1229 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1230 });
1231
1232 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1233
1234 extra_stmts.push(syn::parse_quote! {
1235 let (#sink_ident, #source_ident) = (#create_expr).split();
1236 });
1237
1238 parse_quote!(#source_ident)
1239 }
1240
1241 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1242 fn e2o_connect(
1243 p1: &Self::External,
1244 p1_port: &<Self::External as Node>::Port,
1245 p2: &Self::Process,
1246 p2_port: &<Self::Process as Node>::Port,
1247 many: bool,
1248 server_hint: NetworkHint,
1249 ) -> Box<dyn FnOnce()> {
1250 if server_hint != NetworkHint::Auto {
1251 panic!(
1252 "Docker deployment only supports NetworkHint::Auto, got {:?}",
1253 server_hint
1254 );
1255 }
1256
1257 if many {
1259 p1.connection_info.borrow_mut().insert(
1260 *p1_port,
1261 (
1262 p2.docker_container_name.clone(),
1263 *p2_port,
1264 p2.network.clone(),
1265 ),
1266 );
1267 }
1268
1269 let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1270
1271 Box::new(move || {
1272 trace!(name: "e2o_connect thunk", %serialized);
1273 })
1274 }
1275
1276 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1277 fn o2e_sink(
1278 p1: &Self::Process,
1279 p1_port: &<Self::Process as Node>::Port,
1280 p2: &Self::External,
1281 p2_port: &<Self::External as Node>::Port,
1282 shared_handle: String,
1283 ) -> syn::Expr {
1284 let sink_ident = syn::Ident::new(
1285 &format!("__hydro_deploy_{}_sink", &shared_handle),
1286 Span::call_site(),
1287 );
1288 parse_quote!(#sink_ident)
1289 }
1290
1291 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1292 fn cluster_ids(
1293 of_cluster: LocationKey,
1294 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1295 cluster_ids()
1296 }
1297
1298 #[instrument(level = "trace", skip_all)]
1299 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1300 cluster_self_id()
1301 }
1302
1303 #[instrument(level = "trace", skip_all, fields(?location_id))]
1304 fn cluster_membership_stream(
1305 _env: &mut Self::InstantiateEnv,
1306 _at_location: &LocationId,
1307 location_id: &LocationId,
1308 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1309 {
1310 cluster_membership_stream(location_id)
1311 }
1312}
1313
1314const CONTAINER_ALPHABET: [char; 36] = [
1315 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1316 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1317];
1318
1319fn is_valid_docker_image_name(name: &str) -> bool {
1320 regex::Regex::new(r"^[a-z0-9]+([._-][a-z0-9]+)*$")
1321 .unwrap()
1322 .is_match(name)
1323}
1324
1325#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1326fn get_docker_image_name(
1327 name_hint: &str,
1328 location_key: LocationKey,
1329 deployment_instance: &str,
1330) -> String {
1331 let name_hint: String = name_hint
1332 .split("::")
1333 .last()
1334 .unwrap()
1335 .to_ascii_lowercase()
1336 .split(['.', '_', '-'])
1337 .filter(|s| !s.is_empty())
1338 .collect::<Vec<_>>()
1339 .join("-");
1340
1341 let image_name = format!("hy-{name_hint}-{deployment_instance}-{location_key}");
1342
1343 if !is_valid_docker_image_name(&image_name) {
1344 panic!(
1345 "Generated Docker image name '{image_name}' is not a valid Docker image name. \
1346 Docker image names may only contain lowercase alphanumeric characters \
1347 separated by single '.', '_', or '-' characters, and must start and end \
1348 with an alphanumeric character. The most likely cause is your location \
1349 struct name '{name_hint}'"
1350 );
1351 }
1352
1353 image_name
1354}
1355
1356#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1357fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1358 if let Some(instance) = instance {
1359 format!("{image_name}-{instance}")
1360 } else {
1361 image_name.to_owned()
1362 }
1363}
1364#[derive(Clone)]
1366pub struct DockerDeployProcessSpec {
1367 compilation_options: Option<String>,
1368 config: Vec<String>,
1369 network: DockerNetwork,
1370 deployment_instance: String,
1371 base_image: Option<String>,
1372 linux_compile_type: LinuxCompileType,
1373}
1374
1375impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1376 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1377 fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1378 DockerDeployProcess {
1379 key,
1380 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1381
1382 next_port: Rc::new(RefCell::new(1000)),
1383 rust_crate: Rc::new(RefCell::new(None)),
1384
1385 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1386
1387 docker_container_name: Rc::new(RefCell::new(None)),
1388
1389 compilation_options: self.compilation_options,
1390 config: self.config,
1391
1392 network: self.network.clone(),
1393
1394 base_image: self.base_image,
1395 linux_compile_type: self.linux_compile_type,
1396 }
1397 }
1398}
1399
1400#[derive(Clone)]
1402pub struct DockerDeployClusterSpec {
1403 compilation_options: Option<String>,
1404 config: Vec<String>,
1405 count: usize,
1406 deployment_instance: String,
1407 base_image: Option<String>,
1408 linux_compile_type: LinuxCompileType,
1409}
1410
1411impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1412 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1413 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1414 DockerDeployCluster {
1415 key,
1416 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1417
1418 next_port: Rc::new(RefCell::new(1000)),
1419 rust_crate: Rc::new(RefCell::new(None)),
1420
1421 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1422
1423 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1424
1425 compilation_options: self.compilation_options,
1426 config: self.config,
1427
1428 count: self.count,
1429
1430 base_image: self.base_image,
1431 linux_compile_type: self.linux_compile_type,
1432 }
1433 }
1434}
1435
1436impl DockerDeployProcessSpec {
1437 pub fn base_image(mut self, image: impl Into<String>) -> Self {
1440 self.base_image = Some(image.into());
1441 self
1442 }
1443
1444 pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1447 self.linux_compile_type = compile_type;
1448 self
1449 }
1450}
1451
1452impl DockerDeployClusterSpec {
1453 pub fn base_image(mut self, image: impl Into<String>) -> Self {
1456 self.base_image = Some(image.into());
1457 self
1458 }
1459
1460 pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1463 self.linux_compile_type = compile_type;
1464 self
1465 }
1466}
1467
1468pub struct DockerDeployExternalSpec {
1470 name: String,
1471}
1472
1473impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1474 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1475 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1476 DockerDeployExternal {
1477 key,
1478 name: self.name,
1479 next_port: Rc::new(RefCell::new(10000)),
1480 next_external_port_id: Rc::new(RefCell::new(ExternalPortId::default())),
1481 ports: Rc::new(RefCell::new(HashMap::new())),
1482 connection_info: Rc::new(RefCell::new(HashMap::new())),
1483 }
1484 }
1485}