diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a95d9c5..9d6a291 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,7 @@ env: CARGO_TERM_COLOR: always jobs: - e2e-tests: + e2e-test: name: End-to-End Tests runs-on: ubuntu-latest @@ -24,5 +24,18 @@ jobs: run: docker build -t distributed-topic-tracker . - name: Run end-to-end test - run: ./test-e2e.sh + run: COMPOSE_FILE=./docker-compose.yml ./test-e2e.sh + + e2e-test-experimental: + name: End-to-End Tests (Experimental) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Build Docker image + run: docker build -t distributed-topic-tracker --file Dockerfile.experimental . + + - name: Run end-to-end test + run: COMPOSE_FILE=./docker-compose-experimental.yml ./test-e2e.sh diff --git a/Cargo.lock b/Cargo.lock index 5b75701..57d92e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -748,6 +748,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "dht" +version = "6.1.0" +source = "git+https://github.com/Nuhvi/mainline?branch=main#95f192b48f875e9070452a32350b0312ad691954" +dependencies = [ + "crc", + "document-features", + "dyn-clone", + "ed25519-dalek", + "flume", + "futures-lite", + "getrandom 0.3.3", + "lru 0.13.0", + "serde", + "serde_bencode", + "serde_bytes", + "sha1_smol", + "thiserror 2.0.12", + "tracing", +] + [[package]] name = "diatomic-waker" version = "0.2.3" @@ -794,9 +815,11 @@ dependencies = [ "actor-helper", "anyhow", "chrono", + "dht", "ed25519-dalek", "ed25519-dalek-hpke", "futures-lite", + "hex", "iroh", "iroh-gossip", "mainline", diff --git a/Cargo.toml b/Cargo.toml index fb503d3..66f8695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ edition = "2024" [features] default = ["iroh-gossip"] iroh-gossip = ["dep:iroh", "dep:iroh-gossip"] +experimental = ["dep:mainline_exp"] [dependencies] @@ -32,7 +33,6 @@ futures-lite = "2" chrono = { version = "0.4", default-features = false, features = ["clock"] } -mainline = { version = "6", default-features = false, features = ["async"]} rand = { version = "0.9", default-features = false, features = ["std", "std_rng"] } actor-helper = { version = "0.2", features = ["tokio", "anyhow"] } postcard = "1" @@ -41,6 +41,11 @@ serde = { version = "1", default-features = false, features = ["std"] } tracing = { version = "0.1", default-features = false, features = ["std"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "env-filter", "ansi"] } +mainline = { version = "6.0.0", default-features = false, features = ["async"] } +mainline_exp = { git = "https://github.com/Nuhvi/mainline", branch = "main", default-features = false, features = ["async"], optional = true, package = "dht" } + +hex = "0.4" + [lib] crate-type = ["cdylib", "rlib"] @@ -52,6 +57,7 @@ required-features = ["iroh-gossip"] name = "tests" # only build examples if the "iroh-gossip" feature is enabled + [[example]] name = "chat" required-features = ["iroh-gossip"] @@ -71,3 +77,11 @@ required-features = ["iroh-gossip"] [[example]] name = "simple" required-features = ["iroh-gossip"] + +[[example]] +name = "chat_experimental" +required-features = ["iroh-gossip", "experimental"] + +[[example]] +name = "e2e_test_experimental" +required-features = ["iroh-gossip", "experimental"] \ No newline at end of file diff --git a/Dockerfile.experimental b/Dockerfile.experimental new file mode 100644 index 0000000..e387c94 --- /dev/null +++ b/Dockerfile.experimental @@ -0,0 +1,26 @@ +FROM rust:1.89.0 as builder + +WORKDIR /app + +COPY Cargo.toml Cargo.lock ./ +COPY README.md ./ +COPY src ./src +COPY tests ./tests +COPY examples ./examples + +# test then build +RUN cargo build --release --example e2e_test_experimental --features="iroh-gossip experimental" + +# create a minimal runtime image +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + + + +# Copy the built binary from the builder stage +COPY --from=builder /app/target/release/examples/e2e_test_experimental /usr/local/bin/e2e_test_experimental + +# Set the default command +CMD ["e2e_test_experimental"] diff --git a/docker-compose-experimental.yml b/docker-compose-experimental.yml new file mode 100644 index 0000000..8bd1fd1 --- /dev/null +++ b/docker-compose-experimental.yml @@ -0,0 +1,34 @@ +services: + node1: + build: + context: . + dockerfile: Dockerfile.experimental + container_name: dtt-node1 + environment: + - RUST_LOG=info + - TOPIC_ID=${TOPIC_ID} + command: sh -c "e2e_test_experimental 2 $${TOPIC_ID}" + + node2: + build: + context: . + dockerfile: Dockerfile.experimental + container_name: dtt-node2 + environment: + - RUST_LOG=info + - TOPIC_ID=${TOPIC_ID} + command: sh -c "sleep 10 && e2e_test_experimental 2 $${TOPIC_ID}" + depends_on: + - node1 + + node3: + build: + context: . + dockerfile: Dockerfile.experimental + container_name: dtt-node3 + environment: + - RUST_LOG=info + - TOPIC_ID=${TOPIC_ID} + command: sh -c "sleep 20 && e2e_test_experimental 2 $${TOPIC_ID}" + depends_on: + - node2 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 04a59a8..b4e9aea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,27 +4,23 @@ services: container_name: dtt-node1 environment: - RUST_LOG=info - command: > - sh -c "echo 'Starting node1...' && - sleep 2 && - e2e_test" + command: sh -c "e2e_test" node2: build: . container_name: dtt-node2 environment: - RUST_LOG=info - command: > - sh -c "echo 'Starting node2...' && - sleep 5 && - e2e_test" + command: sh -c "sleep 10 && e2e_test" + depends_on: + - node1 + node3: build: . container_name: dtt-node3 environment: - RUST_LOG=info - command: > - sh -c "echo 'Starting node3...' && - sleep 8 && - e2e_test" + command: sh -c "sleep 20 && e2e_test" + depends_on: + - node1 diff --git a/examples/chat_experimental.rs b/examples/chat_experimental.rs new file mode 100644 index 0000000..27a2ce9 --- /dev/null +++ b/examples/chat_experimental.rs @@ -0,0 +1,81 @@ +use anyhow::Result; +use distributed_topic_tracker::AutoDiscoveryGossip; +use iroh::{Endpoint, SecretKey}; +use iroh_gossip::{api::Event, net::Gossip}; + +use ed25519_dalek::SigningKey; + +// Imports from distrubuted-topic-tracker + +#[tokio::main] +async fn main() -> Result<()> { + // tracing init - only show distributed_topic_tracker logs + use tracing_subscriber::filter::EnvFilter; + + tracing_subscriber::fmt() + .with_thread_ids(true) + .with_ansi(true) + .with_env_filter( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")), + ) + .init(); + + // Generate a new random secret key + let secret_key = SecretKey::generate(&mut rand::rng()); + let signing_key = SigningKey::from_bytes(&secret_key.to_bytes()); + + // Set up endpoint with discovery enabled + let endpoint = Endpoint::builder() + .secret_key(secret_key.clone()) + .bind() + .await?; + + // Initialize gossip with auto-discovery + let gossip = Gossip::builder().spawn(endpoint.clone()); + + // Set up protocol router + let _router = iroh::protocol::Router::builder(endpoint.clone()) + .accept(iroh_gossip::ALPN, gossip.clone()) + .spawn(); + + let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); + + // Split into sink (sending) and stream (receiving) + let (gossip_sender, gossip_receiver) = gossip + .subscribe_and_join_with_auto_discovery(topic_id, signing_key) + .await? + .split() + .await; + + println!("Joined topic"); + + // Spawn listener for incoming messages + tokio::spawn(async move { + while let Some(Ok(event)) = gossip_receiver.next().await { + if let Event::Received(msg) = event { + println!( + "\nMessage from {}: {}", + &msg.delivered_from.to_string()[0..8], + String::from_utf8(msg.content.to_vec()).unwrap() + ); + } else if let Event::NeighborUp(peer) = event { + println!("\nJoined by {}", &peer.to_string()[0..8]); + } + } + }); + + // Main input loop for sending messages + let mut buffer = String::new(); + let stdin = std::io::stdin(); + loop { + print!("\n> "); + stdin.read_line(&mut buffer).unwrap(); + gossip_sender + .broadcast(buffer.clone().replace("\n", "").into()) + .await + .unwrap(); + println!(" - (sent)"); + buffer.clear(); + } +} diff --git a/examples/chat_no_wait.rs b/examples/chat_no_wait.rs index 1d42dd3..ea0c4cc 100644 --- a/examples/chat_no_wait.rs +++ b/examples/chat_no_wait.rs @@ -4,7 +4,7 @@ use iroh_gossip::{api::Event, net::Gossip}; // Imports from distrubuted-topic-tracker use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; -use mainline::SigningKey; +use ed25519_dalek::SigningKey; #[tokio::main] async fn main() -> Result<()> { diff --git a/examples/e2e_test.rs b/examples/e2e_test.rs index e3fafd2..d451823 100644 --- a/examples/e2e_test.rs +++ b/examples/e2e_test.rs @@ -2,6 +2,8 @@ use anyhow::Result; use iroh::{Endpoint, SecretKey}; use iroh_gossip::net::Gossip; +use ed25519_dalek::SigningKey; + // Imports from distrubuted-topic-tracker use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; @@ -9,7 +11,7 @@ use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; async fn main() -> Result<()> { // Generate a new random secret key let secret_key = SecretKey::generate(&mut rand::rng()); - let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes()); + let signing_key = SigningKey::from_bytes(&secret_key.to_bytes()); // Set up endpoint with discovery enabled let endpoint = Endpoint::builder() diff --git a/examples/e2e_test_experimental.rs b/examples/e2e_test_experimental.rs new file mode 100644 index 0000000..1dd687b --- /dev/null +++ b/examples/e2e_test_experimental.rs @@ -0,0 +1,88 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use anyhow::Result; +use iroh::{Endpoint, SecretKey}; +use iroh_gossip::net::Gossip; + +// Imports from distrubuted-topic-tracker +use distributed_topic_tracker::AutoDiscoveryGossip; +use tokio::time::Instant; + +#[tokio::main] +async fn main() -> Result<()> { + + use tracing_subscriber::filter::EnvFilter; + tracing_subscriber::fmt() + .with_thread_ids(true) + .with_ansi(true) + .with_env_filter( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug,dht=debug")), + ) + .init(); + + + + // from input first param + let expected_neighbours = std::env::args().nth(1).unwrap_or("1".to_string()).parse::()?; + let topic_salt = std::env::args().nth(2).unwrap_or("default-salt".to_string()); + + // Generate a new random secret key + let secret_key = SecretKey::generate(&mut rand::rng()); + let signing_key = ed25519_dalek::SigningKey::from_bytes(&secret_key.to_bytes()); + + // Set up endpoint with discovery enabled + let endpoint = Endpoint::builder() + .secret_key(secret_key.clone()) + .bind() + .await?; + + // Initialize gossip with auto-discovery + let gossip = Gossip::builder().spawn(endpoint.clone()); + + // Set up protocol router + let _router = iroh::protocol::Router::builder(endpoint.clone()) + .accept(iroh_gossip::ALPN, gossip.clone()) + .spawn(); + + let cold_start_timer = Instant::now(); + let topic_id = format!("my-iroh-gossip-topic-experimental-{topic_salt}").as_bytes().to_vec(); + let (gossip_sender, gossip_receiver) = gossip + .subscribe_and_join_with_auto_discovery(topic_id, signing_key) + .await? + .split() + .await; + + println!("Cold start time: {:.0}ms", cold_start_timer.elapsed().as_millis()); + + let total_messages_recv = Arc::new(AtomicUsize::new(0)); + tokio::spawn({ + + let total_messages_recv = total_messages_recv.clone(); + async move { + while let Some(Ok(event)) = gossip_receiver.next().await { + println!("event: {event:?}"); + if matches!(event, iroh_gossip::api::Event::NeighborUp(_)) { + total_messages_recv.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + } + }}); + + gossip_sender + .broadcast(format!("hi from {}", endpoint.id()).into()) + .await?; + + println!("[joined topic]"); + + while total_messages_recv.load(std::sync::atomic::Ordering::Relaxed) < expected_neighbours { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + println!("[finished]"); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // successfully joined + // exit with code 0 + Ok(()) +} diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..42cde01 --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,2 @@ +pub mod receiver; +pub mod sender; diff --git a/src/gossip/receiver.rs b/src/core/receiver.rs similarity index 100% rename from src/gossip/receiver.rs rename to src/core/receiver.rs diff --git a/src/gossip/sender.rs b/src/core/sender.rs similarity index 100% rename from src/gossip/sender.rs rename to src/core/sender.rs diff --git a/src/dht.rs b/src/dht.rs index ec8f90b..d247804 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -2,14 +2,14 @@ //! //! Provides async interface for DHT get/put operations with automatic //! retry logic and connection management. - use std::time::Duration; use actor_helper::{Action, Actor, Handle, Receiver, act}; use anyhow::{Context, Result, bail}; use ed25519_dalek::VerifyingKey; use futures_lite::StreamExt; -use mainline::{MutableItem, SigningKey}; + +use mainline::{Dht as MainlineDht, MutableItem, SigningKey, async_dht::AsyncDht}; const RETRY_DEFAULT: usize = 3; @@ -25,7 +25,7 @@ pub struct Dht { #[derive(Debug)] struct DhtActor { rx: Receiver>, - dht: Option, + dht: Option, } impl Dht { @@ -186,7 +186,7 @@ impl DhtActor { } async fn reset(&mut self) -> Result<()> { - self.dht = Some(mainline::Dht::builder().build()?.as_async()); + self.dht = Some(MainlineDht::builder().build()?.as_async()); Ok(()) } } diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs new file mode 100644 index 0000000..82d7ec5 --- /dev/null +++ b/src/experimental/dht.rs @@ -0,0 +1,103 @@ +use std::{collections::HashSet, time::Duration}; + +use anyhow::{Context, Result}; +use ed25519_dalek::VerifyingKey; + +use futures_lite::StreamExt; +use mainline_exp::{Dht as MainlineDht, Id, SigningKey, async_dht::AsyncDht}; +use sha2::Digest; + +#[derive(Debug, Clone)] +pub struct Dht { + dht: Option, + signing_key: SigningKey, +} + +impl Dht { + pub fn new(signing_key: &SigningKey) -> Self { + Self { + dht: None, + signing_key: signing_key.clone(), + } + } + + pub async fn reset(&mut self) -> Result<()> { + if self.dht.is_some() { + return Ok(()); + } + let dht = MainlineDht::builder() + .extra_bootstrap(&["pkarr.rustonbsd.com:6881"]) + .build()? + .as_async(); + if !dht.bootstrapped().await { + anyhow::bail!("DHT bootstrap failed"); + } + self.dht = Some(dht); + + Ok(()) + } + + pub async fn get_peers( + &mut self, + topic_bytes: &Vec, + timeout: Option, + ) -> Result> { + if self.dht.is_none() { + self.reset().await?; + } + + let dht = self.dht.as_mut().context("DHT not initialized")?; + let id = Id::from_bytes(topic_hash_20(topic_bytes))?; + + let mut stream = dht.get_signed_peers(id).await; + let mut results = vec![]; + + if let Some(timeout) = timeout { + let deadline = tokio::time::Instant::now() + timeout; + while let Ok(Some(item)) = tokio::time::timeout_at(deadline, stream.next()).await { + results.push(item); + } + } else { + results = stream.collect::>>().await; + } + + Ok(results + .iter() + .flatten() + .filter_map(|item| VerifyingKey::from_bytes(item.key()).ok()) + .collect::>()) + } + + pub async fn announce_self( + &mut self, + topic_bytes: &Vec, + timeout: Option, + ) -> Result { + if self.dht.is_none() { + self.reset().await?; + } + + let dht = self.dht.as_mut().context("DHT not initialized")?; + let id = Id::from_bytes(topic_hash_20(topic_bytes))?; + + let announce_future = dht.announce_signed_peer(id, &self.signing_key); + + if let Some(timeout) = timeout { + tokio::time::timeout(timeout, announce_future) + .await + .context("Announce timed out")? + .context("Failed to announce signed peer") + } else { + announce_future + .await + .context("Failed to announce signed peer") + } + } +} + +fn topic_hash_20(topic_bytes: &Vec) -> [u8; 20] { + let mut hasher = sha2::Sha512::new(); + hasher.update("/iroh/distributed-topic-tracker"); + hasher.update(topic_bytes); + hasher.finalize()[..20].try_into().expect("hashing failed") +} diff --git a/src/experimental/gossip.rs b/src/experimental/gossip.rs new file mode 100644 index 0000000..ee9391e --- /dev/null +++ b/src/experimental/gossip.rs @@ -0,0 +1,230 @@ +use std::{ + collections::HashSet, + sync::{Arc, atomic::AtomicBool}, + time::Duration, +}; + +use ed25519_dalek::{SigningKey, VerifyingKey}; +use iroh::PublicKey; +use iroh_gossip::Gossip; +use sha2::Digest; +use tokio::{sync::Mutex, task::JoinHandle}; + +use crate::{experimental::Dht, receiver::GossipReceiver, sender::GossipSender}; + +pub trait AutoDiscoveryGossip { + #[allow(async_fn_in_trait)] + async fn subscribe_and_join_with_auto_discovery( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result; + + #[allow(async_fn_in_trait)] + async fn subscribe_and_join_with_auto_discovery_no_wait( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result; +} + +impl AutoDiscoveryGossip for iroh_gossip::net::Gossip { + async fn subscribe_and_join_with_auto_discovery( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result { + let topic = self + .subscribe_and_join_with_auto_discovery_no_wait(topic_id, signing_key) + .await?; + topic.wait_for_join().await; + Ok(topic) + } + + async fn subscribe_and_join_with_auto_discovery_no_wait( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result { + let gossip_topic: iroh_gossip::api::GossipTopic = self + .subscribe( + iroh_gossip::proto::TopicId::from_bytes(topic_hash_32(&topic_id)), + vec![], + ) + .await?; + let (gossip_sender, gossip_receiver) = gossip_topic.split(); + let (gossip_sender, gossip_receiver) = ( + GossipSender::new(gossip_sender, self.clone()), + GossipReceiver::new(gossip_receiver, self.clone()), + ); + + let topic = Topic::new( + topic_id, + &signing_key, + self.clone(), + gossip_sender, + gossip_receiver, + ); + + topic.start_self_publish()?; + topic.start_infinite_bootstrap()?; + + Ok(topic) + } +} + +pub struct Topic { + dht: Arc>, + _gossip: Gossip, + sender: GossipSender, + receiver: GossipReceiver, + topic_bytes: Vec, + + // never used an atomic bool before but why not ^^ + running: Arc, + added_nodes: Arc>>, +} + +impl Topic { + pub async fn split(&self) -> (GossipSender, crate::core::receiver::GossipReceiver) { + (self.sender.clone(), self.receiver.clone()) + } + + pub async fn gossip_sender(&self) -> GossipSender { + self.sender.clone() + } + + pub async fn gossip_receiver(&self) -> GossipReceiver { + self.receiver.clone() + } + + pub async fn stop_background_loops(&self) { + self.running + .store(false, std::sync::atomic::Ordering::Relaxed); + } +} + +impl Topic { + pub(self) fn new( + topic_bytes: Vec, + signing_key: &SigningKey, + gossip: Gossip, + sender: GossipSender, + receiver: GossipReceiver, + ) -> Self { + let dht = Dht::new(signing_key); + Self { + dht: Arc::new(Mutex::new(dht)), + _gossip: gossip, + sender, + receiver, + running: Arc::new(AtomicBool::new(true)), + topic_bytes, + added_nodes: Arc::new(Mutex::new(HashSet::from([signing_key.verifying_key()]))), + } + } + + pub(self) fn start_self_publish(&self) -> anyhow::Result> { + let is_running = self.running.clone(); + let dht = self.dht.clone(); + let topic_bytes = self.topic_bytes.clone(); + Ok(tokio::spawn(async move { + let mut backoff = 5; + while is_running.load(std::sync::atomic::Ordering::Relaxed) { + let wait_dur = { + let mut lock = dht.lock().await; + + if let Err(e) = lock.announce_self(&topic_bytes, None).await { + tracing::warn!("self_announce: {e}"); + backoff += 5; + Duration::from_secs(backoff.min(300)) + } else { + tracing::info!("self_announce: success"); + backoff = 5; + Duration::from_secs(300) + } + }; + tokio::time::sleep(wait_dur).await; + } + })) + } + + pub(self) fn start_infinite_bootstrap(&self) -> anyhow::Result> { + let is_running = self.running.clone(); + let dht = self.dht.clone(); + let topic_bytes = self.topic_bytes.clone(); + let sender = self.sender.clone(); + let receiver = self.receiver.clone(); + let added_nodes = self.added_nodes.clone(); + Ok(tokio::spawn(async move { + let mut backoff = 5; + while is_running.load(std::sync::atomic::Ordering::Relaxed) { + let peers = { + let mut lock = dht.lock().await; + let res = lock.get_peers(&topic_bytes, None).await; + + if let Err(e) = res { + tracing::error!("bootstrap get_peers error: {e:?}"); + HashSet::new() + } else { + res.unwrap_or_default().clone() + } + }; + + println!("peer-hashes: {}", peers.iter().map(|p| hex::encode(p.as_bytes())).collect::>().join(", ")); + tracing::debug!("bootstrap found {} peers", peers.len()); + if !peers.is_empty() { + let mut added_nodes_lock = added_nodes.lock().await; + let unknown_peers = peers + .iter() + .filter_map(|peer| { + if !added_nodes_lock.contains(peer) { + Some(*peer) + } else { + None + } + }) + .collect::>(); + + for p in unknown_peers { + if let Err(e) = sender + .join_peers(vec![PublicKey::from_verifying_key(p)], + None, + ) + .await + { + tracing::debug!("bootstrap join_peers error: {:?}", e); + } else { + tracing::debug!("bootstrap joined new peers"); + } + added_nodes_lock.insert(p); + } + + } + + if !receiver.is_joined().await || added_nodes.lock().await.len() < 2 { + tracing::debug!("not enough peers yet, waiting before next bootstrap attempt"); + backoff += 5; + tokio::time::sleep(Duration::from_secs(backoff.min(60))).await; + } else { + backoff = 5; + tracing::info!("bootstrap successful"); + tokio::time::sleep(Duration::from_secs(60)).await; + } + } + })) + } + + pub(self) async fn wait_for_join(&self) { + while !self.receiver.is_joined().await { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} + +fn topic_hash_32(topic_bytes: &Vec) -> [u8; 32] { + let mut hasher = sha2::Sha512::new(); + hasher.update("/iroh/distributed-topic-tracker"); + hasher.update(topic_bytes); + hasher.finalize()[..32].try_into().expect("hashing failed") +} diff --git a/src/experimental/mod.rs b/src/experimental/mod.rs new file mode 100644 index 0000000..7b3ea69 --- /dev/null +++ b/src/experimental/mod.rs @@ -0,0 +1,5 @@ +mod dht; +mod gossip; + +pub use dht::Dht; +pub use gossip::{AutoDiscoveryGossip, Topic}; diff --git a/src/gossip/mod.rs b/src/gossip/mod.rs index fb79200..e76e769 100644 --- a/src/gossip/mod.rs +++ b/src/gossip/mod.rs @@ -5,13 +5,11 @@ //! and message overlap merging for cluster topology optimization. mod merge; -mod receiver; -mod sender; mod topic; +pub use crate::core::receiver::GossipReceiver; +pub use crate::core::sender::GossipSender; pub use merge::{BubbleMerge, MessageOverlapMerge}; -pub use receiver::GossipReceiver; -pub use sender::GossipSender; use serde::{Deserialize, Serialize}; pub use topic::{Bootstrap, Publisher, Topic, TopicId}; diff --git a/src/gossip/topic/bootstrap.rs b/src/gossip/topic/bootstrap.rs index 94ba99c..5669a20 100644 --- a/src/gossip/topic/bootstrap.rs +++ b/src/gossip/topic/bootstrap.rs @@ -8,9 +8,7 @@ use iroh::EndpointId; use tokio::time::sleep; use crate::{ - GossipSender, - crypto::Record, - gossip::{GossipRecordContent, receiver::GossipReceiver}, + GossipSender, core::receiver::GossipReceiver, crypto::Record, gossip::GossipRecordContent, }; /// Manages the peer discovery and joining process. diff --git a/src/gossip/topic/topic.rs b/src/gossip/topic/topic.rs index df213f3..a96291c 100644 --- a/src/gossip/topic/topic.rs +++ b/src/gossip/topic/topic.rs @@ -144,7 +144,7 @@ impl Topic { } /// Split into sender and receiver for message exchange. - pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> { + pub async fn split(&self) -> Result<(GossipSender, crate::core::receiver::GossipReceiver)> { Ok((self.gossip_sender().await?, self.gossip_receiver().await?)) } @@ -156,7 +156,7 @@ impl Topic { } /// Get the gossip receiver for this topic. - pub async fn gossip_receiver(&self) -> Result { + pub async fn gossip_receiver(&self) -> Result { self.api .call(act!(actor => actor.bootstrap.gossip_receiver())) .await diff --git a/src/lib.rs b/src/lib.rs index 1001740..91137ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,26 +1,35 @@ #![doc = include_str!("../README.md")] +mod core; + +#[cfg(not(feature = "experimental"))] mod crypto; +#[cfg(not(feature = "experimental"))] mod dht; +pub use core::*; +#[cfg(not(feature = "experimental"))] #[cfg(feature = "iroh-gossip")] mod gossip; +#[cfg(not(feature = "experimental"))] +pub use crypto::{ + DefaultSecretRotation, EncryptedRecord, Record, RecordPublisher, RecordTopic, RotationHandle, + SecretRotation, encryption_keypair, salt, signing_keypair, +}; +#[cfg(not(feature = "experimental"))] #[cfg(feature = "iroh-gossip")] pub use gossip::{ AutoDiscoveryGossip, Bootstrap, BubbleMerge, GossipReceiver, GossipRecordContent, GossipSender, MessageOverlapMerge, Publisher, Topic, TopicId, }; - -pub use crypto::{ - DefaultSecretRotation, EncryptedRecord, Record, RecordPublisher, RecordTopic, RotationHandle, - SecretRotation, encryption_keypair, salt, signing_keypair, -}; -pub use dht::Dht; +#[cfg(not(feature = "experimental"))] +pub use mainline::Dht; /// Maximum number of bootstrap records allowed per topic per time slot (minute). /// /// When publishing to the DHT, records are not published if this threshold /// has already been reached for the current minute slot. +#[cfg(not(feature = "experimental"))] pub const MAX_BOOTSTRAP_RECORDS: usize = 100; /// Get the current Unix minute timestamp, optionally offset. @@ -35,6 +44,16 @@ pub const MAX_BOOTSTRAP_RECORDS: usize = 100; /// let now = unix_minute(0); /// let prev_minute = unix_minute(-1); /// ``` +#[cfg(not(feature = "experimental"))] pub fn unix_minute(minute_offset: i64) -> u64 { ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64 } + +// Experimental announce_signed_peer and get_signed_peers DHT implementation +#[cfg(feature = "experimental")] +mod experimental; +#[cfg(feature = "experimental")] +pub use experimental::{AutoDiscoveryGossip, Topic}; + +#[cfg(feature = "experimental")] +pub use mainline_exp::Dht; diff --git a/test-e2e.sh b/test-e2e.sh index b7021e7..013e160 100755 --- a/test-e2e.sh +++ b/test-e2e.sh @@ -7,12 +7,14 @@ set -e echo "Starting end-to-end test..." +export TOPIC_ID=$RANDOM + # Clean up any existing containers -docker compose down --remove-orphans || true +docker compose --file $COMPOSE_FILE down --remove-orphans || true # Build and start the containers echo "Building and starting containers..." -docker compose up --build -d +docker compose --file $COMPOSE_FILE up --build -d # Function to check if a container has printed "Joined topic" check_joined_topic() { @@ -91,7 +93,7 @@ echo "=========================================" # Clean up echo "Cleaning up containers..." -docker compose down +docker compose --file $COMPOSE_FILE down if [ "$success" = true ]; then echo "End-to-end test PASSED: All nodes successfully joined the topic and completed execution" diff --git a/tests/tests.rs b/tests/tests.rs index 5d5cd17..849a004 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -4,7 +4,7 @@ use distributed_topic_tracker::{ DefaultSecretRotation, EncryptedRecord, GossipRecordContent, Record, RecordTopic, RotationHandle, encryption_keypair, salt, signing_keypair, unix_minute, }; -use mainline::SigningKey; +use ed25519_dalek::SigningKey; #[test] fn test_record_serialization_roundtrip() {