Skip to main content

hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19// Re-export LinuxCompileType so users can configure compile type without depending on hydro_deploy directly.
20pub use hydro_deploy::LinuxCompileType;
21use hydro_deploy::RustCrate;
22use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
23use nanoid::nanoid;
24use proc_macro2::Span;
25use sinktools::lazy::LazySink;
26use stageleft::QuotedWithContext;
27use syn::parse_quote;
28use tar::{Builder, Header};
29use tokio::net::TcpStream;
30use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
31use tracing::{Instrument, instrument, trace, warn};
32
33use super::deploy_runtime_containerized::*;
34use crate::compile::builder::ExternalPortId;
35use crate::compile::deploy::DeployResult;
36use crate::compile::deploy_provider::{
37    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
38};
39use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
40use crate::location::dynamic::LocationId;
41use crate::location::member_id::TaglessMemberId;
42use crate::location::{LocationKey, MembershipEvent, NetworkHint};
43
44/// represents a docker network
45#[derive(Clone, Debug)]
46pub struct DockerNetwork {
47    name: String,
48}
49
50impl DockerNetwork {
51    /// creates a new docker network (will actually be created when deployment.start() is called).
52    pub fn new(name: String) -> Self {
53        Self {
54            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
55        }
56    }
57}
58
59/// Represents a process running in a docker container
60#[derive(Clone)]
61pub struct DockerDeployProcess {
62    key: LocationKey,
63    name: String,
64    next_port: Rc<RefCell<u16>>,
65    rust_crate: Rc<RefCell<Option<RustCrate>>>,
66
67    exposed_ports: Rc<RefCell<Vec<u16>>>,
68
69    docker_container_name: Rc<RefCell<Option<String>>>,
70
71    compilation_options: Option<String>,
72
73    config: Vec<String>,
74
75    network: DockerNetwork,
76
77    base_image: Option<String>,
78
79    linux_compile_type: LinuxCompileType,
80}
81
82impl Node for DockerDeployProcess {
83    type Port = u16;
84    type Meta = ();
85    type InstantiateEnv = DockerDeploy;
86
87    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
88    fn next_port(&self) -> Self::Port {
89        let port = {
90            let mut borrow = self.next_port.borrow_mut();
91            let port = *borrow;
92            *borrow += 1;
93            port
94        };
95
96        port
97    }
98
99    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
100    fn update_meta(&self, _meta: &Self::Meta) {}
101
102    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
103    fn instantiate(
104        &self,
105        _env: &mut Self::InstantiateEnv,
106        meta: &mut Self::Meta,
107        graph: DfirGraph,
108        extra_stmts: &[syn::Stmt],
109        sidecars: &[syn::Expr],
110    ) {
111        let (bin_name, config) = create_graph_trybuild(
112            graph,
113            extra_stmts,
114            sidecars,
115            Some(&self.name),
116            crate::compile::trybuild::generate::DeployMode::Containerized,
117            LinkingMode::Static,
118        );
119
120        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
121            .target_dir(config.target_dir)
122            .example(bin_name)
123            .no_default_features();
124
125        ret = ret.display_name("test_display_name");
126
127        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
128
129        if let Some(features) = config.features {
130            ret = ret.features(features);
131        }
132
133        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
134        ret = ret.config("build.incremental = false");
135
136        *self.rust_crate.borrow_mut() = Some(ret);
137    }
138}
139
140/// Represents a logical cluster, which can be a variable amount of individual containers.
141#[derive(Clone)]
142pub struct DockerDeployCluster {
143    key: LocationKey,
144    name: String,
145    next_port: Rc<RefCell<u16>>,
146    rust_crate: Rc<RefCell<Option<RustCrate>>>,
147
148    exposed_ports: Rc<RefCell<Vec<u16>>>,
149
150    docker_container_name: Rc<RefCell<Vec<String>>>,
151
152    compilation_options: Option<String>,
153
154    config: Vec<String>,
155
156    count: usize,
157
158    base_image: Option<String>,
159
160    linux_compile_type: LinuxCompileType,
161}
162
163impl Node for DockerDeployCluster {
164    type Port = u16;
165    type Meta = ();
166    type InstantiateEnv = DockerDeploy;
167
168    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
169    fn next_port(&self) -> Self::Port {
170        let port = {
171            let mut borrow = self.next_port.borrow_mut();
172            let port = *borrow;
173            *borrow += 1;
174            port
175        };
176
177        port
178    }
179
180    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
181    fn update_meta(&self, _meta: &Self::Meta) {}
182
183    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
184    fn instantiate(
185        &self,
186        _env: &mut Self::InstantiateEnv,
187        _meta: &mut Self::Meta,
188        graph: DfirGraph,
189        extra_stmts: &[syn::Stmt],
190        sidecars: &[syn::Expr],
191    ) {
192        let (bin_name, config) = create_graph_trybuild(
193            graph,
194            extra_stmts,
195            sidecars,
196            Some(&self.name),
197            crate::compile::trybuild::generate::DeployMode::Containerized,
198            LinkingMode::Static,
199        );
200
201        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
202            .target_dir(config.target_dir)
203            .example(bin_name)
204            .no_default_features();
205
206        ret = ret.display_name("test_display_name");
207
208        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
209
210        if let Some(features) = config.features {
211            ret = ret.features(features);
212        }
213
214        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
215        ret = ret.config("build.incremental = false");
216
217        *self.rust_crate.borrow_mut() = Some(ret);
218    }
219}
220
221/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
222#[derive(Clone, Debug)]
223#[expect(
224    dead_code,
225    reason = "fields used via Rc<RefCell> in RegisterPort impl and ExternalBytesPort construction"
226)]
227pub struct DockerDeployExternal {
228    /// The location key for this external, used for port handle construction.
229    pub(crate) key: LocationKey,
230    name: String,
231    next_port: Rc<RefCell<u16>>,
232
233    /// Counter for generating ExternalPortId values at deploy time.
234    next_external_port_id: Rc<RefCell<ExternalPortId>>,
235
236    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
237
238    #[expect(clippy::type_complexity, reason = "internal code")]
239    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
240}
241
242impl Node for DockerDeployExternal {
243    type Port = u16;
244    type Meta = ();
245    type InstantiateEnv = DockerDeploy;
246
247    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
248    fn next_port(&self) -> Self::Port {
249        let port = {
250            let mut borrow = self.next_port.borrow_mut();
251            let port = *borrow;
252            *borrow += 1;
253            port
254        };
255
256        port
257    }
258
259    #[instrument(level = "trace", skip_all, fields(name = self.name))]
260    fn update_meta(&self, _meta: &Self::Meta) {}
261
262    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
263    fn instantiate(
264        &self,
265        _env: &mut Self::InstantiateEnv,
266        meta: &mut Self::Meta,
267        graph: DfirGraph,
268        extra_stmts: &[syn::Stmt],
269        sidecars: &[syn::Expr],
270    ) {
271        trace!(name: "surface", surface = graph.surface_syntax_string());
272    }
273}
274
275impl DockerDeployProcess {
276    /// Expose a TCP port on this process for external access.
277    ///
278    /// The binary running on this process must bind a `TcpListener` on this port.
279    /// This method ensures the port appears in the Docker image's `EXPOSE` directives
280    /// and is available for endpoint discovery via [`Self::get_tcp_endpoint`].
281    pub fn expose_port(&self, port: u16) {
282        self.exposed_ports.borrow_mut().push(port);
283    }
284
285    /// Returns the TCP endpoint `(host, port)` for this process exposing
286    /// the given container port. Queries Docker for the dynamically allocated
287    /// host port mapping.
288    pub async fn get_tcp_endpoint(&self, container_port: u16) -> (String, u16) {
289        let name = self
290            .docker_container_name
291            .borrow()
292            .as_ref()
293            .expect("container not yet started")
294            .clone();
295        let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
296        ("localhost".to_owned(), host_port)
297    }
298}
299
300impl DockerDeployCluster {
301    /// Expose a TCP port on every member of this cluster for external access.
302    ///
303    /// The binary running on this cluster must bind a `TcpListener` on this port.
304    /// This method ensures the port appears in the Docker image's `EXPOSE` directives
305    /// and is available for endpoint discovery via [`Self::get_all_tcp_endpoints`].
306    pub fn expose_port(&self, port: u16) {
307        self.exposed_ports.borrow_mut().push(port);
308    }
309
310    /// Returns TCP endpoints `(host, port)` for all cluster members exposing
311    /// the given container port. Queries Docker for the dynamically allocated
312    /// host port mapping.
313    pub async fn get_all_tcp_endpoints(&self, container_port: u16) -> Vec<(String, u16)> {
314        let names = self.docker_container_name.borrow().clone();
315        let mut endpoints = Vec::with_capacity(names.len());
316        for name in names {
317            let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
318            endpoints.push(("localhost".to_owned(), host_port));
319        }
320        endpoints
321    }
322}
323
324type DynSourceSink<Out, In, InErr> = (
325    Pin<Box<dyn Stream<Item = Out>>>,
326    Pin<Box<dyn Sink<In, Error = InErr>>>,
327);
328
329impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
330    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
331    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
332        self.ports.borrow_mut().insert(external_port_id, port);
333    }
334
335    fn as_bytes_bidi(
336        &self,
337        external_port_id: ExternalPortId,
338    ) -> impl Future<
339        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
340    > + 'a {
341        let guard =
342            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
343
344        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
345        let (docker_container_name, remote_port, _) = self
346            .connection_info
347            .borrow()
348            .get(&local_port)
349            .unwrap()
350            .clone();
351
352        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
353
354        async move {
355            let local_port =
356                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
357            let remote_ip_address = "localhost";
358
359            trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
360
361            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
362                .await
363                .unwrap();
364
365            trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
366
367            let (rx, tx) = stream.into_split();
368
369            let source = Box::pin(
370                FramedRead::new(rx, LengthDelimitedCodec::new()),
371            ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
372
373            let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
374                as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
375
376            (source, sink)
377        }
378        .instrument(guard.exit())
379    }
380
381    fn as_bincode_bidi<InT, OutT>(
382        &self,
383        external_port_id: ExternalPortId,
384    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
385    where
386        InT: serde::Serialize + 'static,
387        OutT: serde::de::DeserializeOwned + 'static,
388    {
389        let guard =
390            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
391
392        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
393        let (docker_container_name, remote_port, _) = self
394            .connection_info
395            .borrow()
396            .get(&local_port)
397            .unwrap()
398            .clone();
399
400        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
401
402        async move {
403            let local_port =
404                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
405            let remote_ip_address = "localhost";
406
407            trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
408
409            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
410                .await
411                .unwrap();
412
413            trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
414
415            let (rx, tx) = stream.into_split();
416
417            let source = Box::pin(
418                FramedRead::new(rx, LengthDelimitedCodec::new())
419                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
420            ) as Pin<Box<dyn Stream<Item = OutT>>>;
421
422            let sink = Box::pin(
423                FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
424                    Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
425                }),
426            ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
427
428            (source, sink)
429        }
430        .instrument(guard.exit())
431    }
432
433    fn as_bincode_sink<T>(
434        &self,
435        external_port_id: ExternalPortId,
436    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
437    where
438        T: serde::Serialize + 'static,
439    {
440        let guard =
441            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
442
443        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
444        let (docker_container_name, remote_port, _) = self
445            .connection_info
446            .borrow()
447            .get(&local_port)
448            .unwrap()
449            .clone();
450
451        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
452
453        async move {
454            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
455            let remote_ip_address = "localhost";
456
457            Box::pin(
458                LazySink::new(move || {
459                    Box::pin(async move {
460                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
461
462                        let stream =
463                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
464                                .await?;
465
466                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
467
468                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
469                            stream,
470                            LengthDelimitedCodec::new(),
471                        ))
472                    })
473                })
474                .with(move |v| async move {
475                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
476                }),
477            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
478        }
479        .instrument(guard.exit())
480    }
481
482    fn as_bincode_source<T>(
483        &self,
484        external_port_id: ExternalPortId,
485    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
486    where
487        T: serde::de::DeserializeOwned + 'static,
488    {
489        let guard =
490            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
491
492        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
493        let (docker_container_name, remote_port, _) = self
494            .connection_info
495            .borrow()
496            .get(&local_port)
497            .unwrap()
498            .clone();
499
500        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
501
502        async move {
503
504            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
505            let remote_ip_address = "localhost";
506
507            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
508
509            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
510                .await
511                .unwrap();
512
513            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
514
515            Box::pin(
516                FramedRead::new(stream, LengthDelimitedCodec::new())
517                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
518            ) as Pin<Box<dyn Stream<Item = T>>>
519        }
520        .instrument(guard.exit())
521    }
522}
523
524#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
525async fn find_dynamically_allocated_docker_port(
526    docker_container_name: &str,
527    destination_port: u16,
528) -> u16 {
529    let docker = Docker::connect_with_local_defaults().unwrap();
530
531    let container_info = docker
532        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
533        .await
534        .unwrap();
535
536    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
537
538    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
539    let remote_port = container_info
540        .network_settings
541        .as_ref()
542        .unwrap()
543        .ports
544        .as_ref()
545        .unwrap()
546        .get(&format!("{destination_port}/tcp"))
547        .unwrap()
548        .as_ref()
549        .unwrap()
550        .iter()
551        .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
552        .unwrap()
553        .host_port
554        .as_ref()
555        .unwrap()
556        .parse()
557        .unwrap();
558
559    remote_port
560}
561
562/// For deploying to a local docker instance
563pub struct DockerDeploy {
564    docker_processes: Vec<DockerDeployProcessSpec>,
565    docker_clusters: Vec<DockerDeployClusterSpec>,
566    network: DockerNetwork,
567    deployment_instance: String,
568}
569
570#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
571async fn create_and_start_container(
572    docker: &Docker,
573    container_name: &str,
574    image_name: &str,
575    network_name: &str,
576    deployment_instance: &str,
577) -> Result<(), anyhow::Error> {
578    let config = ContainerCreateBody {
579        image: Some(image_name.to_owned()),
580        hostname: Some(container_name.to_owned()),
581        host_config: Some(HostConfig {
582            binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
583            publish_all_ports: Some(true),
584            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
585            ..Default::default()
586        }),
587        env: Some(vec![
588            format!("CONTAINER_NAME={container_name}"),
589            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
590            format!("RUST_LOG=trace"),
591        ]),
592        networking_config: Some(NetworkingConfig {
593            endpoints_config: Some(HashMap::from([(
594                network_name.to_owned(),
595                EndpointSettings {
596                    ..Default::default()
597                },
598            )])),
599        }),
600        tty: Some(true),
601        ..Default::default()
602    };
603
604    let options = CreateContainerOptions {
605        name: Some(container_name.to_owned()),
606        ..Default::default()
607    };
608
609    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
610    docker.create_container(Some(options), config).await?;
611    docker
612        .start_container(container_name, None::<StartContainerOptions>)
613        .await?;
614
615    Ok(())
616}
617
618#[instrument(level = "trace", skip_all, fields(%image_name))]
619async fn build_and_create_image(
620    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
621    compilation_options: Option<&str>,
622    config: &[String],
623    exposed_ports: &[u16],
624    image_name: &str,
625    base_image: Option<&str>,
626    linux_compile_type: LinuxCompileType,
627) -> Result<(), anyhow::Error> {
628    let mut rust_crate = rust_crate
629        .borrow_mut()
630        .take()
631        .unwrap()
632        .rustflags(compilation_options.unwrap_or_default());
633
634    for cfg in config {
635        rust_crate = rust_crate.config(cfg);
636    }
637
638    let build_output = match build_crate_memoized(
639        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(linux_compile_type)),
640    )
641    .await
642    {
643        Ok(build_output) => build_output,
644        Err(BuildError::FailedToBuildCrate {
645            exit_status,
646            diagnostics,
647            text_lines,
648            stderr_lines,
649        }) => {
650            let diagnostics = diagnostics
651                .into_iter()
652                .map(|d| d.rendered.unwrap())
653                .collect::<Vec<_>>()
654                .join("\n");
655            let text_lines = text_lines.join("\n");
656            let stderr_lines = stderr_lines.join("\n");
657
658            anyhow::bail!(
659                r#"
660Failed to build crate {exit_status:?}
661--- diagnostics
662---
663{diagnostics}
664---
665---
666---
667
668--- text_lines
669---
670---
671{text_lines}
672---
673---
674---
675
676--- stderr_lines
677---
678---
679{stderr_lines}
680---
681---
682---"#
683            );
684        }
685        Err(err) => {
686            anyhow::bail!("Failed to build crate {err:?}");
687        }
688    };
689
690    let docker = Docker::connect_with_local_defaults()?;
691
692    let mut tar_data = Vec::new();
693    {
694        let mut tar = Builder::new(&mut tar_data);
695
696        let exposed_ports = exposed_ports
697            .iter()
698            .map(|port| format!("EXPOSE {port}/tcp"))
699            .collect::<Vec<_>>()
700            .join("\n");
701
702        let from_image = base_image.unwrap_or("scratch");
703        let dockerfile_content = format!(
704            r#"
705                FROM {from_image}
706                {exposed_ports}
707                COPY app /app
708                CMD ["/app"]
709            "#,
710        );
711
712        trace!(name: "dockerfile", %dockerfile_content);
713
714        let mut header = Header::new_gnu();
715        header.set_path("Dockerfile")?;
716        header.set_size(dockerfile_content.len() as u64);
717        header.set_cksum();
718        tar.append(&header, dockerfile_content.as_bytes())?;
719
720        let mut header = Header::new_gnu();
721        header.set_path("app")?;
722        header.set_size(build_output.bin_data.len() as u64);
723        header.set_mode(0o755);
724        header.set_cksum();
725        tar.append(&header, &build_output.bin_data[..])?;
726
727        tar.finish()?;
728    }
729
730    let build_options = BuildImageOptions {
731        dockerfile: "Dockerfile".to_owned(),
732        t: Some(image_name.to_owned()),
733        rm: true,
734        ..Default::default()
735    };
736
737    use bollard::errors::Error;
738
739    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
740    let mut build_stream = docker.build_image(build_options, None, Some(body));
741    while let Some(msg) = build_stream.next().await {
742        match msg {
743            Ok(_) => {}
744            Err(e) => match e {
745                Error::DockerStreamError { error } => {
746                    return Err(anyhow::anyhow!(
747                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
748                    ));
749                }
750                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
751            },
752        }
753    }
754
755    Ok(())
756}
757
758impl DockerDeploy {
759    /// Create a new deployment
760    pub fn new(network: DockerNetwork) -> Self {
761        Self {
762            docker_processes: Vec::new(),
763            docker_clusters: Vec::new(),
764            network,
765            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
766        }
767    }
768
769    /// Add an internal docker service to the deployment.
770    pub fn add_localhost_docker(
771        &mut self,
772        compilation_options: Option<String>,
773        config: Vec<String>,
774    ) -> DockerDeployProcessSpec {
775        let process = DockerDeployProcessSpec {
776            compilation_options,
777            config,
778            network: self.network.clone(),
779            deployment_instance: self.deployment_instance.clone(),
780            base_image: None,
781            linux_compile_type: LinuxCompileType::Musl,
782        };
783
784        self.docker_processes.push(process.clone());
785
786        process
787    }
788
789    /// Add an internal docker cluster to the deployment.
790    pub fn add_localhost_docker_cluster(
791        &mut self,
792        compilation_options: Option<String>,
793        config: Vec<String>,
794        count: usize,
795    ) -> DockerDeployClusterSpec {
796        let cluster = DockerDeployClusterSpec {
797            compilation_options,
798            config,
799            count,
800            deployment_instance: self.deployment_instance.clone(),
801            base_image: None,
802            linux_compile_type: LinuxCompileType::Musl,
803        };
804
805        self.docker_clusters.push(cluster.clone());
806
807        cluster
808    }
809
810    /// Add an external process to the deployment.
811    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
812        DockerDeployExternalSpec { name }
813    }
814
815    /// Get the deployment instance from this deployment.
816    pub fn get_deployment_instance(&self) -> String {
817        self.deployment_instance.clone()
818    }
819
820    /// Create docker images.
821    #[instrument(level = "trace", skip_all)]
822    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
823        for (_, _, process) in nodes.get_all_processes() {
824            let exposed_ports = process.exposed_ports.borrow().clone();
825
826            build_and_create_image(
827                &process.rust_crate,
828                process.compilation_options.as_deref(),
829                &process.config,
830                &exposed_ports,
831                &process.name,
832                process.base_image.as_deref(),
833                process.linux_compile_type,
834            )
835            .await?;
836        }
837
838        for (_, _, cluster) in nodes.get_all_clusters() {
839            let exposed_ports = cluster.exposed_ports.borrow().clone();
840            build_and_create_image(
841                &cluster.rust_crate,
842                cluster.compilation_options.as_deref(),
843                &cluster.config,
844                &exposed_ports,
845                &cluster.name,
846                cluster.base_image.as_deref(),
847                cluster.linux_compile_type,
848            )
849            .await?;
850        }
851
852        Ok(())
853    }
854
855    /// Start the deployment, tell docker to create containers from the existing provisioned images.
856    #[instrument(level = "trace", skip_all)]
857    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
858        let docker = Docker::connect_with_local_defaults()?;
859
860        match docker
861            .create_network(NetworkCreateRequest {
862                name: self.network.name.clone(),
863                driver: Some("bridge".to_owned()),
864                ..Default::default()
865            })
866            .await
867        {
868            Ok(v) => v.id,
869            Err(e) => {
870                panic!("Failed to create docker network: {e:?}");
871            }
872        };
873
874        for (_, _, process) in nodes.get_all_processes() {
875            let docker_container_name: String = get_docker_container_name(&process.name, None);
876            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
877
878            create_and_start_container(
879                &docker,
880                &docker_container_name,
881                &process.name,
882                &self.network.name,
883                &self.deployment_instance,
884            )
885            .await?;
886        }
887
888        for (_, _, cluster) in nodes.get_all_clusters() {
889            for num in 0..cluster.count {
890                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
891                cluster
892                    .docker_container_name
893                    .borrow_mut()
894                    .push(docker_container_name.clone());
895
896                create_and_start_container(
897                    &docker,
898                    &docker_container_name,
899                    &cluster.name,
900                    &self.network.name,
901                    &self.deployment_instance,
902                )
903                .await?;
904            }
905        }
906
907        Ok(())
908    }
909
910    /// Stop the deployment, destroy all containers
911    #[instrument(level = "trace", skip_all)]
912    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
913        let docker = Docker::connect_with_local_defaults()?;
914
915        for (_, _, process) in nodes.get_all_processes() {
916            let docker_container_name: String = get_docker_container_name(&process.name, None);
917
918            docker
919                .kill_container(&docker_container_name, None::<KillContainerOptions>)
920                .await?;
921        }
922
923        for (_, _, cluster) in nodes.get_all_clusters() {
924            for num in 0..cluster.count {
925                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
926
927                docker
928                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
929                    .await?;
930            }
931        }
932
933        Ok(())
934    }
935
936    /// remove containers, images, and networks.
937    #[instrument(level = "trace", skip_all)]
938    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
939        let docker = Docker::connect_with_local_defaults()?;
940
941        for (_, _, process) in nodes.get_all_processes() {
942            let docker_container_name: String = get_docker_container_name(&process.name, None);
943
944            docker
945                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
946                .await?;
947        }
948
949        for (_, _, cluster) in nodes.get_all_clusters() {
950            for num in 0..cluster.count {
951                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
952
953                docker
954                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
955                    .await?;
956            }
957        }
958
959        docker
960            .remove_network(&self.network.name)
961            .await
962            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
963
964        use bollard::query_parameters::RemoveImageOptions;
965
966        for (_, _, process) in nodes.get_all_processes() {
967            docker
968                .remove_image(&process.name, None::<RemoveImageOptions>, None)
969                .await?;
970        }
971
972        for (_, _, cluster) in nodes.get_all_clusters() {
973            docker
974                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
975                .await?;
976        }
977
978        Ok(())
979    }
980}
981
982impl<'a> Deploy<'a> for DockerDeploy {
983    type Meta = ();
984    type InstantiateEnv = Self;
985
986    type Process = DockerDeployProcess;
987    type Cluster = DockerDeployCluster;
988    type External = DockerDeployExternal;
989
990    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
991    fn o2o_sink_source(
992        _env: &mut Self::InstantiateEnv,
993        p1: &Self::Process,
994        p1_port: &<Self::Process as Node>::Port,
995        p2: &Self::Process,
996        p2_port: &<Self::Process as Node>::Port,
997        name: Option<&str>,
998        networking_info: &crate::networking::NetworkingInfo,
999    ) -> (syn::Expr, syn::Expr) {
1000        match networking_info {
1001            crate::networking::NetworkingInfo::Tcp {
1002                fault: crate::networking::TcpFault::FailStop,
1003            } => {}
1004            _ => panic!("Unsupported networking info: {:?}", networking_info),
1005        }
1006
1007        deploy_containerized_o2o(
1008            &p2.name,
1009            name.expect("channel name is required for containerized deployment"),
1010        )
1011    }
1012
1013    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
1014    fn o2o_connect(
1015        p1: &Self::Process,
1016        p1_port: &<Self::Process as Node>::Port,
1017        p2: &Self::Process,
1018        p2_port: &<Self::Process as Node>::Port,
1019    ) -> Box<dyn FnOnce()> {
1020        let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1021
1022        Box::new(move || {
1023            trace!(name: "o2o_connect thunk", %serialized);
1024        })
1025    }
1026
1027    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1028    fn o2m_sink_source(
1029        _env: &mut Self::InstantiateEnv,
1030        p1: &Self::Process,
1031        p1_port: &<Self::Process as Node>::Port,
1032        c2: &Self::Cluster,
1033        c2_port: &<Self::Cluster as Node>::Port,
1034        name: Option<&str>,
1035        networking_info: &crate::networking::NetworkingInfo,
1036    ) -> (syn::Expr, syn::Expr) {
1037        match networking_info {
1038            crate::networking::NetworkingInfo::Tcp {
1039                fault: crate::networking::TcpFault::FailStop,
1040            } => {}
1041            _ => panic!("Unsupported networking info: {:?}", networking_info),
1042        }
1043
1044        deploy_containerized_o2m(
1045            name.expect("channel name is required for containerized deployment"),
1046        )
1047    }
1048
1049    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1050    fn o2m_connect(
1051        p1: &Self::Process,
1052        p1_port: &<Self::Process as Node>::Port,
1053        c2: &Self::Cluster,
1054        c2_port: &<Self::Cluster as Node>::Port,
1055    ) -> Box<dyn FnOnce()> {
1056        let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
1057
1058        Box::new(move || {
1059            trace!(name: "o2m_connect thunk", %serialized);
1060        })
1061    }
1062
1063    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1064    fn m2o_sink_source(
1065        _env: &mut Self::InstantiateEnv,
1066        c1: &Self::Cluster,
1067        c1_port: &<Self::Cluster as Node>::Port,
1068        p2: &Self::Process,
1069        p2_port: &<Self::Process as Node>::Port,
1070        name: Option<&str>,
1071        networking_info: &crate::networking::NetworkingInfo,
1072    ) -> (syn::Expr, syn::Expr) {
1073        match networking_info {
1074            crate::networking::NetworkingInfo::Tcp {
1075                fault: crate::networking::TcpFault::FailStop,
1076            } => {}
1077            _ => panic!("Unsupported networking info: {:?}", networking_info),
1078        }
1079
1080        deploy_containerized_m2o(
1081            &p2.name,
1082            name.expect("channel name is required for containerized deployment"),
1083        )
1084    }
1085
1086    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1087    fn m2o_connect(
1088        c1: &Self::Cluster,
1089        c1_port: &<Self::Cluster as Node>::Port,
1090        p2: &Self::Process,
1091        p2_port: &<Self::Process as Node>::Port,
1092    ) -> Box<dyn FnOnce()> {
1093        let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1094
1095        Box::new(move || {
1096            trace!(name: "m2o_connect thunk", %serialized);
1097        })
1098    }
1099
1100    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1101    fn m2m_sink_source(
1102        _env: &mut Self::InstantiateEnv,
1103        c1: &Self::Cluster,
1104        c1_port: &<Self::Cluster as Node>::Port,
1105        c2: &Self::Cluster,
1106        c2_port: &<Self::Cluster as Node>::Port,
1107        name: Option<&str>,
1108        networking_info: &crate::networking::NetworkingInfo,
1109    ) -> (syn::Expr, syn::Expr) {
1110        match networking_info {
1111            crate::networking::NetworkingInfo::Tcp {
1112                fault: crate::networking::TcpFault::FailStop,
1113            } => {}
1114            _ => panic!("Unsupported networking info: {:?}", networking_info),
1115        }
1116
1117        deploy_containerized_m2m(
1118            name.expect("channel name is required for containerized deployment"),
1119        )
1120    }
1121
1122    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1123    fn m2m_connect(
1124        c1: &Self::Cluster,
1125        c1_port: &<Self::Cluster as Node>::Port,
1126        c2: &Self::Cluster,
1127        c2_port: &<Self::Cluster as Node>::Port,
1128    ) -> Box<dyn FnOnce()> {
1129        let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1130
1131        Box::new(move || {
1132            trace!(name: "m2m_connect thunk", %serialized);
1133        })
1134    }
1135
1136    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1137    fn e2o_many_source(
1138        extra_stmts: &mut Vec<syn::Stmt>,
1139        p2: &Self::Process,
1140        p2_port: &<Self::Process as Node>::Port,
1141        codec_type: &syn::Type,
1142        shared_handle: String,
1143    ) -> syn::Expr {
1144        p2.exposed_ports.borrow_mut().push(*p2_port);
1145
1146        let socket_ident = syn::Ident::new(
1147            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1148            Span::call_site(),
1149        );
1150
1151        let source_ident = syn::Ident::new(
1152            &format!("__hydro_deploy_many_{}_source", &shared_handle),
1153            Span::call_site(),
1154        );
1155
1156        let sink_ident = syn::Ident::new(
1157            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1158            Span::call_site(),
1159        );
1160
1161        let membership_ident = syn::Ident::new(
1162            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1163            Span::call_site(),
1164        );
1165
1166        let bind_addr = format!("0.0.0.0:{}", p2_port);
1167
1168        extra_stmts.push(syn::parse_quote! {
1169            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1170        });
1171
1172        let root = crate::staging_util::get_this_crate();
1173
1174        extra_stmts.push(syn::parse_quote! {
1175            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1176        });
1177
1178        parse_quote!(#source_ident)
1179    }
1180
1181    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1182    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1183        let sink_ident = syn::Ident::new(
1184            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1185            Span::call_site(),
1186        );
1187        parse_quote!(#sink_ident)
1188    }
1189
1190    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1191    fn e2o_source(
1192        extra_stmts: &mut Vec<syn::Stmt>,
1193        p1: &Self::External,
1194        p1_port: &<Self::External as Node>::Port,
1195        p2: &Self::Process,
1196        p2_port: &<Self::Process as Node>::Port,
1197        _codec_type: &syn::Type,
1198        shared_handle: String,
1199    ) -> syn::Expr {
1200        p1.connection_info.borrow_mut().insert(
1201            *p1_port,
1202            (
1203                p2.docker_container_name.clone(),
1204                *p2_port,
1205                p2.network.clone(),
1206            ),
1207        );
1208
1209        p2.exposed_ports.borrow_mut().push(*p2_port);
1210
1211        let socket_ident = syn::Ident::new(
1212            &format!("__hydro_deploy_{}_socket", &shared_handle),
1213            Span::call_site(),
1214        );
1215
1216        let source_ident = syn::Ident::new(
1217            &format!("__hydro_deploy_{}_source", &shared_handle),
1218            Span::call_site(),
1219        );
1220
1221        let sink_ident = syn::Ident::new(
1222            &format!("__hydro_deploy_{}_sink", &shared_handle),
1223            Span::call_site(),
1224        );
1225
1226        let bind_addr = format!("0.0.0.0:{}", p2_port);
1227
1228        extra_stmts.push(syn::parse_quote! {
1229            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1230        });
1231
1232        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1233
1234        extra_stmts.push(syn::parse_quote! {
1235            let (#sink_ident, #source_ident) = (#create_expr).split();
1236        });
1237
1238        parse_quote!(#source_ident)
1239    }
1240
1241    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1242    fn e2o_connect(
1243        p1: &Self::External,
1244        p1_port: &<Self::External as Node>::Port,
1245        p2: &Self::Process,
1246        p2_port: &<Self::Process as Node>::Port,
1247        many: bool,
1248        server_hint: NetworkHint,
1249    ) -> Box<dyn FnOnce()> {
1250        if server_hint != NetworkHint::Auto {
1251            panic!(
1252                "Docker deployment only supports NetworkHint::Auto, got {:?}",
1253                server_hint
1254            );
1255        }
1256
1257        // For many connections, we need to populate connection_info so as_bincode_bidi can find it
1258        if many {
1259            p1.connection_info.borrow_mut().insert(
1260                *p1_port,
1261                (
1262                    p2.docker_container_name.clone(),
1263                    *p2_port,
1264                    p2.network.clone(),
1265                ),
1266            );
1267        }
1268
1269        let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1270
1271        Box::new(move || {
1272            trace!(name: "e2o_connect thunk", %serialized);
1273        })
1274    }
1275
1276    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1277    fn o2e_sink(
1278        p1: &Self::Process,
1279        p1_port: &<Self::Process as Node>::Port,
1280        p2: &Self::External,
1281        p2_port: &<Self::External as Node>::Port,
1282        shared_handle: String,
1283    ) -> syn::Expr {
1284        let sink_ident = syn::Ident::new(
1285            &format!("__hydro_deploy_{}_sink", &shared_handle),
1286            Span::call_site(),
1287        );
1288        parse_quote!(#sink_ident)
1289    }
1290
1291    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1292    fn cluster_ids(
1293        of_cluster: LocationKey,
1294    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1295        cluster_ids()
1296    }
1297
1298    #[instrument(level = "trace", skip_all)]
1299    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1300        cluster_self_id()
1301    }
1302
1303    #[instrument(level = "trace", skip_all, fields(?location_id))]
1304    fn cluster_membership_stream(
1305        _env: &mut Self::InstantiateEnv,
1306        _at_location: &LocationId,
1307        location_id: &LocationId,
1308    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1309    {
1310        cluster_membership_stream(location_id)
1311    }
1312}
1313
1314const CONTAINER_ALPHABET: [char; 36] = [
1315    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1316    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1317];
1318
1319fn is_valid_docker_image_name(name: &str) -> bool {
1320    regex::Regex::new(r"^[a-z0-9]+([._-][a-z0-9]+)*$")
1321        .unwrap()
1322        .is_match(name)
1323}
1324
1325#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1326fn get_docker_image_name(
1327    name_hint: &str,
1328    location_key: LocationKey,
1329    deployment_instance: &str,
1330) -> String {
1331    let name_hint: String = name_hint
1332        .split("::")
1333        .last()
1334        .unwrap()
1335        .to_ascii_lowercase()
1336        .split(['.', '_', '-'])
1337        .filter(|s| !s.is_empty())
1338        .collect::<Vec<_>>()
1339        .join("-");
1340
1341    let image_name = format!("hy-{name_hint}-{deployment_instance}-{location_key}");
1342
1343    if !is_valid_docker_image_name(&image_name) {
1344        panic!(
1345            "Generated Docker image name '{image_name}' is not a valid Docker image name. \
1346             Docker image names may only contain lowercase alphanumeric characters \
1347             separated by single '.', '_', or '-' characters, and must start and end \
1348             with an alphanumeric character. The most likely cause is your location \
1349             struct name '{name_hint}'"
1350        );
1351    }
1352
1353    image_name
1354}
1355
1356#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1357fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1358    if let Some(instance) = instance {
1359        format!("{image_name}-{instance}")
1360    } else {
1361        image_name.to_owned()
1362    }
1363}
1364/// Represents a Process running in a docker container
1365#[derive(Clone)]
1366pub struct DockerDeployProcessSpec {
1367    compilation_options: Option<String>,
1368    config: Vec<String>,
1369    network: DockerNetwork,
1370    deployment_instance: String,
1371    base_image: Option<String>,
1372    linux_compile_type: LinuxCompileType,
1373}
1374
1375impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1376    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1377    fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1378        DockerDeployProcess {
1379            key,
1380            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1381
1382            next_port: Rc::new(RefCell::new(1000)),
1383            rust_crate: Rc::new(RefCell::new(None)),
1384
1385            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1386
1387            docker_container_name: Rc::new(RefCell::new(None)),
1388
1389            compilation_options: self.compilation_options,
1390            config: self.config,
1391
1392            network: self.network.clone(),
1393
1394            base_image: self.base_image,
1395            linux_compile_type: self.linux_compile_type,
1396        }
1397    }
1398}
1399
1400/// Represents a Cluster running across `count` docker containers.
1401#[derive(Clone)]
1402pub struct DockerDeployClusterSpec {
1403    compilation_options: Option<String>,
1404    config: Vec<String>,
1405    count: usize,
1406    deployment_instance: String,
1407    base_image: Option<String>,
1408    linux_compile_type: LinuxCompileType,
1409}
1410
1411impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1412    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1413    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1414        DockerDeployCluster {
1415            key,
1416            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1417
1418            next_port: Rc::new(RefCell::new(1000)),
1419            rust_crate: Rc::new(RefCell::new(None)),
1420
1421            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1422
1423            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1424
1425            compilation_options: self.compilation_options,
1426            config: self.config,
1427
1428            count: self.count,
1429
1430            base_image: self.base_image,
1431            linux_compile_type: self.linux_compile_type,
1432        }
1433    }
1434}
1435
1436impl DockerDeployProcessSpec {
1437    /// Set the base Docker image for this process.
1438    /// Defaults to `scratch` if not specified.
1439    pub fn base_image(mut self, image: impl Into<String>) -> Self {
1440        self.base_image = Some(image.into());
1441        self
1442    }
1443
1444    /// Set the Linux compile type (glibc or musl) for this process.
1445    /// Defaults to `Musl` if not specified.
1446    pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1447        self.linux_compile_type = compile_type;
1448        self
1449    }
1450}
1451
1452impl DockerDeployClusterSpec {
1453    /// Set the base Docker image for this cluster.
1454    /// Defaults to `scratch` if not specified.
1455    pub fn base_image(mut self, image: impl Into<String>) -> Self {
1456        self.base_image = Some(image.into());
1457        self
1458    }
1459
1460    /// Set the Linux compile type (glibc or musl) for this cluster.
1461    /// Defaults to `Musl` if not specified.
1462    pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1463        self.linux_compile_type = compile_type;
1464        self
1465    }
1466}
1467
1468/// Represents an external process outside of the management of hydro deploy.
1469pub struct DockerDeployExternalSpec {
1470    name: String,
1471}
1472
1473impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1474    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1475    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1476        DockerDeployExternal {
1477            key,
1478            name: self.name,
1479            next_port: Rc::new(RefCell::new(10000)),
1480            next_external_port_id: Rc::new(RefCell::new(ExternalPortId::default())),
1481            ports: Rc::new(RefCell::new(HashMap::new())),
1482            connection_info: Rc::new(RefCell::new(HashMap::new())),
1483        }
1484    }
1485}