From 2ec8cc44794d5bc1a0649c56550a6e011f74cc2e Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 15:41:46 +0100 Subject: [PATCH 01/20] clean build --- Cargo.lock | 45 ++++++++++++++++++++++----------------------- Cargo.toml | 2 +- src/dht.rs | 10 +++++----- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b75701..19b1485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -748,6 +748,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "dht" +version = "6.1.0" +source = "git+https://github.com/Nuhvi/mainline?branch=main#e20393e5d92f6c7b7b84f12ff01bc507c15ebc62" +dependencies = [ + "crc", + "document-features", + "dyn-clone", + "ed25519-dalek", + "flume", + "futures-lite", + "getrandom 0.3.3", + "lru 0.13.0", + "serde", + "serde_bencode", + "serde_bytes", + "sha1_smol", + "thiserror 2.0.12", + "tracing", +] + [[package]] name = "diatomic-waker" version = "0.2.3" @@ -794,12 +815,12 @@ dependencies = [ "actor-helper", "anyhow", "chrono", + "dht", "ed25519-dalek", "ed25519-dalek-hpke", "futures-lite", "iroh", "iroh-gossip", - "mainline", "postcard", "rand 0.9.2", "serde", @@ -2105,28 +2126,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" -[[package]] -name = "mainline" -version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5be6c12ff79bfbf65bcbec84882a4bf700177df6d83a7b866c6a01cda7db4777" -dependencies = [ - "crc", - "document-features", - "dyn-clone", - "ed25519-dalek", - "flume", - "futures-lite", - "getrandom 0.3.3", - "lru 0.16.2", - "serde", - "serde_bencode", - "serde_bytes", - "sha1_smol", - "thiserror 2.0.12", - "tracing", -] - [[package]] name = "matchers" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index fb503d3..5bb88ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ futures-lite = "2" chrono = { version = "0.4", default-features = false, features = ["clock"] } -mainline = { version = "6", default-features = false, features = ["async"]} +dht = { git="https://github.com/Nuhvi/mainline", branch="main", default-features = false, features = ["async"]} rand = { version = "0.9", default-features = false, features = ["std", "std_rng"] } actor-helper = { version = "0.2", features = ["tokio", "anyhow"] } postcard = "1" diff --git a/src/dht.rs b/src/dht.rs index ec8f90b..f074d39 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -9,7 +9,7 @@ use actor_helper::{Action, Actor, Handle, Receiver, act}; use anyhow::{Context, Result, bail}; use ed25519_dalek::VerifyingKey; use futures_lite::StreamExt; -use mainline::{MutableItem, SigningKey}; +use dht::{MutableItem, SigningKey}; const RETRY_DEFAULT: usize = 3; @@ -25,7 +25,7 @@ pub struct Dht { #[derive(Debug)] struct DhtActor { rx: Receiver>, - dht: Option, + dht: Option, } impl Dht { @@ -153,13 +153,13 @@ impl DhtActor { let item = if let Some(mut_item) = most_recent_result { MutableItem::new( - signing_key.clone(), + &signing_key, &data, mut_item.seq() + 1, salt.as_deref(), ) } else { - MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref()) + MutableItem::new(&signing_key, &data, 0, salt.as_deref()) }; let put_result = match tokio::time::timeout( @@ -186,7 +186,7 @@ impl DhtActor { } async fn reset(&mut self) -> Result<()> { - self.dht = Some(mainline::Dht::builder().build()?.as_async()); + self.dht = Some(dht::Dht::builder().build()?.as_async()); Ok(()) } } From b15f00c4b5c7a4d0932dccd82e078067c59f29cf Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 16:01:24 +0100 Subject: [PATCH 02/20] fixed examples --- examples/chat_no_wait.rs | 2 +- examples/e2e_test.rs | 2 +- tests/tests.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/chat_no_wait.rs b/examples/chat_no_wait.rs index 1d42dd3..7af924f 100644 --- a/examples/chat_no_wait.rs +++ b/examples/chat_no_wait.rs @@ -4,7 +4,7 @@ use iroh_gossip::{api::Event, net::Gossip}; // Imports from distrubuted-topic-tracker use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; -use mainline::SigningKey; +use dht::SigningKey; #[tokio::main] async fn main() -> Result<()> { diff --git a/examples/e2e_test.rs b/examples/e2e_test.rs index e3fafd2..6e016bb 100644 --- a/examples/e2e_test.rs +++ b/examples/e2e_test.rs @@ -9,7 +9,7 @@ use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; async fn main() -> Result<()> { // Generate a new random secret key let secret_key = SecretKey::generate(&mut rand::rng()); - let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes()); + let signing_key = dht::SigningKey::from_bytes(&secret_key.to_bytes()); // Set up endpoint with discovery enabled let endpoint = Endpoint::builder() diff --git a/tests/tests.rs b/tests/tests.rs index 5d5cd17..6889c11 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -4,7 +4,7 @@ use distributed_topic_tracker::{ DefaultSecretRotation, EncryptedRecord, GossipRecordContent, Record, RecordTopic, RotationHandle, encryption_keypair, salt, signing_keypair, unix_minute, }; -use mainline::SigningKey; +use dht::SigningKey; #[test] fn test_record_serialization_roundtrip() { From 1c0273eb57d650c8651bea8d12a8b90ad8932ac9 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 21:58:46 +0100 Subject: [PATCH 03/20] added experimental feature + e2e test --- .github/workflows/test.yml | 15 ++- Cargo.lock | 23 ++++ Cargo.toml | 14 +- Dockerfile.experimental | 26 ++++ examples/chat_experimental.rs | 78 ++++++++++++ examples/chat_no_wait.rs | 2 +- examples/e2e_test.rs | 4 +- examples/e2e_test_experimental.rs | 53 ++++++++ src/core/mod.rs | 2 + src/{gossip => core}/receiver.rs | 0 src/{gossip => core}/sender.rs | 0 src/dht.rs | 17 ++- src/dht_experimental.rs | 67 ++++++++++ src/gossip/mod.rs | 11 +- src/gossip/topic/bootstrap.rs | 3 +- src/gossip/topic/topic.rs | 4 +- src/gossip_experimental.rs | 205 ++++++++++++++++++++++++++++++ src/lib.rs | 32 ++++- tests/tests.rs | 2 +- 19 files changed, 537 insertions(+), 21 deletions(-) create mode 100644 Dockerfile.experimental create mode 100644 examples/chat_experimental.rs create mode 100644 examples/e2e_test_experimental.rs create mode 100644 src/core/mod.rs rename src/{gossip => core}/receiver.rs (100%) rename src/{gossip => core}/sender.rs (100%) create mode 100644 src/dht_experimental.rs create mode 100644 src/gossip_experimental.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a95d9c5..c8c1fb4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,7 @@ env: CARGO_TERM_COLOR: always jobs: - e2e-tests: + e2e-test: name: End-to-End Tests runs-on: ubuntu-latest @@ -25,4 +25,17 @@ jobs: - name: Run end-to-end test run: ./test-e2e.sh + + e2e-test-experimental: + name: End-to-End Tests (Experimental) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Build Docker image + run: docker build -t distributed-topic-tracker --file Dockerfile.experimental . + + - name: Run end-to-end test + run: ./test-e2e.sh diff --git a/Cargo.lock b/Cargo.lock index 19b1485..62d8cbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -821,6 +821,7 @@ dependencies = [ "futures-lite", "iroh", "iroh-gossip", + "mainline", "postcard", "rand 0.9.2", "serde", @@ -2126,6 +2127,28 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "mainline" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be6c12ff79bfbf65bcbec84882a4bf700177df6d83a7b866c6a01cda7db4777" +dependencies = [ + "crc", + "document-features", + "dyn-clone", + "ed25519-dalek", + "flume", + "futures-lite", + "getrandom 0.3.3", + "lru 0.16.2", + "serde", + "serde_bencode", + "serde_bytes", + "sha1_smol", + "thiserror 2.0.12", + "tracing", +] + [[package]] name = "matchers" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5bb88ef..a6eb39b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ edition = "2024" [features] default = ["iroh-gossip"] iroh-gossip = ["dep:iroh", "dep:iroh-gossip"] +experimental = ["dep:mainline_exp"] [dependencies] @@ -32,7 +33,6 @@ futures-lite = "2" chrono = { version = "0.4", default-features = false, features = ["clock"] } -dht = { git="https://github.com/Nuhvi/mainline", branch="main", default-features = false, features = ["async"]} rand = { version = "0.9", default-features = false, features = ["std", "std_rng"] } actor-helper = { version = "0.2", features = ["tokio", "anyhow"] } postcard = "1" @@ -41,6 +41,9 @@ serde = { version = "1", default-features = false, features = ["std"] } tracing = { version = "0.1", default-features = false, features = ["std"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "env-filter", "ansi"] } +mainline = { version = "6.0.0", default-features = false, features = ["async"] } +mainline_exp = { git = "https://github.com/Nuhvi/mainline", branch = "main", default-features = false, features = ["async"], optional = true, package = "dht" } + [lib] crate-type = ["cdylib", "rlib"] @@ -52,6 +55,7 @@ required-features = ["iroh-gossip"] name = "tests" # only build examples if the "iroh-gossip" feature is enabled + [[example]] name = "chat" required-features = ["iroh-gossip"] @@ -71,3 +75,11 @@ required-features = ["iroh-gossip"] [[example]] name = "simple" required-features = ["iroh-gossip"] + +[[example]] +name = "chat_experimental" +required-features = ["iroh-gossip", "experimental"] + +[[example]] +name = "e2e_test_experimental" +required-features = ["iroh-gossip", "experimental"] \ No newline at end of file diff --git a/Dockerfile.experimental b/Dockerfile.experimental new file mode 100644 index 0000000..e387c94 --- /dev/null +++ b/Dockerfile.experimental @@ -0,0 +1,26 @@ +FROM rust:1.89.0 as builder + +WORKDIR /app + +COPY Cargo.toml Cargo.lock ./ +COPY README.md ./ +COPY src ./src +COPY tests ./tests +COPY examples ./examples + +# test then build +RUN cargo build --release --example e2e_test_experimental --features="iroh-gossip experimental" + +# create a minimal runtime image +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + + + +# Copy the built binary from the builder stage +COPY --from=builder /app/target/release/examples/e2e_test_experimental /usr/local/bin/e2e_test_experimental + +# Set the default command +CMD ["e2e_test_experimental"] diff --git a/examples/chat_experimental.rs b/examples/chat_experimental.rs new file mode 100644 index 0000000..efdf840 --- /dev/null +++ b/examples/chat_experimental.rs @@ -0,0 +1,78 @@ +use anyhow::Result; +use distributed_topic_tracker::AutoDiscoveryGossip; +use iroh::{Endpoint, SecretKey}; +use iroh_gossip::{api::Event, net::Gossip}; + +use ed25519_dalek::SigningKey; + +// Imports from distrubuted-topic-tracker + +#[tokio::main] +async fn main() -> Result<()> { + // tracing init - only show distributed_topic_tracker logs + use tracing_subscriber::filter::EnvFilter; + + tracing_subscriber::fmt() + .with_thread_ids(true) + .with_ansi(true) + .with_env_filter( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")), + ) + .init(); + + // Generate a new random secret key + let secret_key = SecretKey::generate(&mut rand::rng()); + let signing_key = SigningKey::from_bytes(&secret_key.to_bytes()); + + // Set up endpoint with discovery enabled + let endpoint = Endpoint::builder() + .secret_key(secret_key.clone()) + .bind() + .await?; + + // Initialize gossip with auto-discovery + let gossip = Gossip::builder().spawn(endpoint.clone()); + + // Set up protocol router + let _router = iroh::protocol::Router::builder(endpoint.clone()) + .accept(iroh_gossip::ALPN, gossip.clone()) + .spawn(); + + let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); + + // Split into sink (sending) and stream (receiving) + let (gossip_sender, gossip_receiver) = gossip + .subscribe_and_join_with_auto_discovery(topic_id, signing_key).await?.split().await; + + println!("Joined topic"); + + // Spawn listener for incoming messages + tokio::spawn(async move { + while let Some(Ok(event)) = gossip_receiver.next().await { + if let Event::Received(msg) = event { + println!( + "\nMessage from {}: {}", + &msg.delivered_from.to_string()[0..8], + String::from_utf8(msg.content.to_vec()).unwrap() + ); + } else if let Event::NeighborUp(peer) = event { + println!("\nJoined by {}", &peer.to_string()[0..8]); + } + } + }); + + // Main input loop for sending messages + let mut buffer = String::new(); + let stdin = std::io::stdin(); + loop { + print!("\n> "); + stdin.read_line(&mut buffer).unwrap(); + gossip_sender + .broadcast(buffer.clone().replace("\n", "").into()) + .await + .unwrap(); + println!(" - (sent)"); + buffer.clear(); + } +} diff --git a/examples/chat_no_wait.rs b/examples/chat_no_wait.rs index 7af924f..ea0c4cc 100644 --- a/examples/chat_no_wait.rs +++ b/examples/chat_no_wait.rs @@ -4,7 +4,7 @@ use iroh_gossip::{api::Event, net::Gossip}; // Imports from distrubuted-topic-tracker use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; -use dht::SigningKey; +use ed25519_dalek::SigningKey; #[tokio::main] async fn main() -> Result<()> { diff --git a/examples/e2e_test.rs b/examples/e2e_test.rs index 6e016bb..d451823 100644 --- a/examples/e2e_test.rs +++ b/examples/e2e_test.rs @@ -2,6 +2,8 @@ use anyhow::Result; use iroh::{Endpoint, SecretKey}; use iroh_gossip::net::Gossip; +use ed25519_dalek::SigningKey; + // Imports from distrubuted-topic-tracker use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; @@ -9,7 +11,7 @@ use distributed_topic_tracker::{AutoDiscoveryGossip, RecordPublisher, TopicId}; async fn main() -> Result<()> { // Generate a new random secret key let secret_key = SecretKey::generate(&mut rand::rng()); - let signing_key = dht::SigningKey::from_bytes(&secret_key.to_bytes()); + let signing_key = SigningKey::from_bytes(&secret_key.to_bytes()); // Set up endpoint with discovery enabled let endpoint = Endpoint::builder() diff --git a/examples/e2e_test_experimental.rs b/examples/e2e_test_experimental.rs new file mode 100644 index 0000000..8d363b0 --- /dev/null +++ b/examples/e2e_test_experimental.rs @@ -0,0 +1,53 @@ +use anyhow::Result; +use iroh::{Endpoint, SecretKey}; +use iroh_gossip::net::Gossip; + +// Imports from distrubuted-topic-tracker +use distributed_topic_tracker::AutoDiscoveryGossip; + +#[tokio::main] +async fn main() -> Result<()> { + // Generate a new random secret key + let secret_key = SecretKey::generate(&mut rand::rng()); + let signing_key = ed25519_dalek::SigningKey::from_bytes(&secret_key.to_bytes()); + + // Set up endpoint with discovery enabled + let endpoint = Endpoint::builder() + .secret_key(secret_key.clone()) + .bind() + .await?; + + // Initialize gossip with auto-discovery + let gossip = Gossip::builder().spawn(endpoint.clone()); + + // Set up protocol router + let _router = iroh::protocol::Router::builder(endpoint.clone()) + .accept(iroh_gossip::ALPN, gossip.clone()) + .spawn(); + + + let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); + let (gossip_sender, gossip_receiver) = gossip + .subscribe_and_join_with_auto_discovery(topic_id, signing_key).await?.split().await; + + tokio::spawn(async move { + while let Some(Ok(event)) = gossip_receiver.next().await { + println!("event: {event:?}"); + } + }); + + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + gossip_sender + .broadcast(format!("hi from {}", endpoint.id()).into()) + .await?; + + println!("[joined topic]"); + + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + println!("[finished]"); + + // successfully joined + // exit with code 0 + Ok(()) +} diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..f732700 --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,2 @@ +pub mod sender; +pub mod receiver; \ No newline at end of file diff --git a/src/gossip/receiver.rs b/src/core/receiver.rs similarity index 100% rename from src/gossip/receiver.rs rename to src/core/receiver.rs diff --git a/src/gossip/sender.rs b/src/core/sender.rs similarity index 100% rename from src/gossip/sender.rs rename to src/core/sender.rs diff --git a/src/dht.rs b/src/dht.rs index f074d39..277c048 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -2,14 +2,14 @@ //! //! Provides async interface for DHT get/put operations with automatic //! retry logic and connection management. - use std::time::Duration; use actor_helper::{Action, Actor, Handle, Receiver, act}; use anyhow::{Context, Result, bail}; use ed25519_dalek::VerifyingKey; use futures_lite::StreamExt; -use dht::{MutableItem, SigningKey}; + +use mainline::{Dht as MainlineDht, MutableItem, SigningKey, async_dht::AsyncDht}; const RETRY_DEFAULT: usize = 3; @@ -25,7 +25,7 @@ pub struct Dht { #[derive(Debug)] struct DhtActor { rx: Receiver>, - dht: Option, + dht: Option, } impl Dht { @@ -153,13 +153,18 @@ impl DhtActor { let item = if let Some(mut_item) = most_recent_result { MutableItem::new( - &signing_key, + signing_key.clone(), &data, mut_item.seq() + 1, salt.as_deref(), ) } else { - MutableItem::new(&signing_key, &data, 0, salt.as_deref()) + MutableItem::new( + signing_key.clone(), + &data, + 0, + salt.as_deref(), + ) }; let put_result = match tokio::time::timeout( @@ -186,7 +191,7 @@ impl DhtActor { } async fn reset(&mut self) -> Result<()> { - self.dht = Some(dht::Dht::builder().build()?.as_async()); + self.dht = Some(MainlineDht::builder().build()?.as_async()); Ok(()) } } diff --git a/src/dht_experimental.rs b/src/dht_experimental.rs new file mode 100644 index 0000000..4c791a8 --- /dev/null +++ b/src/dht_experimental.rs @@ -0,0 +1,67 @@ +use anyhow::{Context, Result}; +use ed25519_dalek::VerifyingKey; + +use futures_lite::StreamExt; +use mainline_exp::{Dht as MainlineDht, Id, SigningKey, async_dht::AsyncDht}; +use sha2::Digest; + +#[derive(Debug, Clone)] +pub struct Dht { + dht: Option, + signing_key: SigningKey, +} + +impl Dht { + pub fn new(signing_key: &SigningKey) -> Self { + Self { + dht: None, + signing_key: signing_key.clone(), + } + } + + pub fn reset(&mut self) -> Result<()> { + self.dht = Some(MainlineDht::builder().build()?.as_async()); + Ok(()) + } + + pub async fn get_peers(&mut self, topic_bytes: &Vec) -> Result> { + if self.dht.is_none() { + self.reset()?; + } + let mut hasher = sha2::Sha512::new(); + hasher.update(topic_bytes); + let hash: [u8; 20] = hasher.finalize()[..20] + .try_into() + .context("Failed to convert hash")?; + + let dht = self.dht.as_mut().context("DHT not initialized")?; + let id = Id::from_bytes(hash)?; + + let topic_stream = dht.get_signed_peers(id).await.collect::>(); + Ok(topic_stream + .await + .iter() + .flatten() + .filter_map(|item| VerifyingKey::from_bytes(item.key()).ok()) + .collect::>()) + } + + pub async fn announce_self(&mut self, topic_bytes: &Vec) -> Result<()> { + if self.dht.is_none() { + self.reset()?; + } + let mut hasher = sha2::Sha512::new(); + hasher.update(topic_bytes); + let hash: [u8; 20] = hasher.finalize()[..20] + .try_into() + .context("Failed to convert hash")?; + + let dht = self.dht.as_mut().context("DHT not initialized")?; + let id = Id::from_bytes(hash)?; + + dht.announce_signed_peer(id, &self.signing_key) + .await + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {}", e)) + } +} diff --git a/src/gossip/mod.rs b/src/gossip/mod.rs index fb79200..26b36f3 100644 --- a/src/gossip/mod.rs +++ b/src/gossip/mod.rs @@ -4,16 +4,15 @@ //! and topic joining without prior knowledge of peers. Includes bubble detection //! and message overlap merging for cluster topology optimization. + mod merge; -mod receiver; -mod sender; mod topic; pub use merge::{BubbleMerge, MessageOverlapMerge}; -pub use receiver::GossipReceiver; -pub use sender::GossipSender; -use serde::{Deserialize, Serialize}; pub use topic::{Bootstrap, Publisher, Topic, TopicId}; +pub use crate::core::receiver::GossipReceiver; +pub use crate::core::sender::GossipSender; +use serde::{Deserialize, Serialize}; use crate::RecordPublisher; @@ -65,4 +64,4 @@ impl AutoDiscoveryGossip for iroh_gossip::net::Gossip { ) -> anyhow::Result { Topic::new(record_publisher, self.clone(), true).await } -} +} \ No newline at end of file diff --git a/src/gossip/topic/bootstrap.rs b/src/gossip/topic/bootstrap.rs index 94ba99c..3f5b8a5 100644 --- a/src/gossip/topic/bootstrap.rs +++ b/src/gossip/topic/bootstrap.rs @@ -10,7 +10,8 @@ use tokio::time::sleep; use crate::{ GossipSender, crypto::Record, - gossip::{GossipRecordContent, receiver::GossipReceiver}, + gossip::GossipRecordContent, + core::receiver::GossipReceiver }; /// Manages the peer discovery and joining process. diff --git a/src/gossip/topic/topic.rs b/src/gossip/topic/topic.rs index df213f3..a96291c 100644 --- a/src/gossip/topic/topic.rs +++ b/src/gossip/topic/topic.rs @@ -144,7 +144,7 @@ impl Topic { } /// Split into sender and receiver for message exchange. - pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> { + pub async fn split(&self) -> Result<(GossipSender, crate::core::receiver::GossipReceiver)> { Ok((self.gossip_sender().await?, self.gossip_receiver().await?)) } @@ -156,7 +156,7 @@ impl Topic { } /// Get the gossip receiver for this topic. - pub async fn gossip_receiver(&self) -> Result { + pub async fn gossip_receiver(&self) -> Result { self.api .call(act!(actor => actor.bootstrap.gossip_receiver())) .await diff --git a/src/gossip_experimental.rs b/src/gossip_experimental.rs new file mode 100644 index 0000000..c44ef1f --- /dev/null +++ b/src/gossip_experimental.rs @@ -0,0 +1,205 @@ +use std::{ + collections::HashSet, + sync::{Arc, atomic::AtomicBool}, + time::Duration, +}; + +use ed25519_dalek::{SigningKey, VerifyingKey}; +use iroh::PublicKey; +use iroh_gossip::Gossip; +use sha2::Digest; +use tokio::{sync::Mutex, task::JoinHandle}; + +use crate::{dht_experimental::Dht, receiver::GossipReceiver, sender::GossipSender}; + +pub trait AutoDiscoveryGossip { + #[allow(async_fn_in_trait)] + async fn subscribe_and_join_with_auto_discovery( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result; + + #[allow(async_fn_in_trait)] + async fn subscribe_and_join_with_auto_discovery_no_wait( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result; +} + +impl AutoDiscoveryGossip for iroh_gossip::net::Gossip { + async fn subscribe_and_join_with_auto_discovery( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result { + let topic = self + .subscribe_and_join_with_auto_discovery_no_wait(topic_id, signing_key) + .await?; + topic.wait_for_join().await; + Ok(topic) + } + + async fn subscribe_and_join_with_auto_discovery_no_wait( + &self, + topic_id: Vec, + signing_key: SigningKey, + ) -> anyhow::Result { + let gossip_topic: iroh_gossip::api::GossipTopic = self + .subscribe(iroh_gossip::proto::TopicId::from_bytes(topic_hash_32(&topic_id)), vec![]) + .await?; + let (gossip_sender, gossip_receiver) = gossip_topic.split(); + let (gossip_sender, gossip_receiver) = ( + GossipSender::new(gossip_sender, self.clone()), + GossipReceiver::new(gossip_receiver, self.clone()), + ); + + let topic = Topic::new( + topic_id, + &signing_key, + self.clone(), + gossip_sender, + gossip_receiver, + ); + + topic.start_self_publish()?; + topic.start_infinite_bootstrap()?; + + Ok(topic) + } +} + +pub struct Topic { + dht: Arc>, + _gossip: Gossip, + sender: GossipSender, + receiver: GossipReceiver, + topic_bytes: Vec, + + // never used an atomic bool before but why not ^^ + running: Arc, + added_nodes: Arc>>, +} + +impl Topic { + pub async fn split(&self) -> (GossipSender, crate::core::receiver::GossipReceiver) { + (self.sender.clone(), self.receiver.clone()) + } + + pub async fn gossip_sender(&self) -> GossipSender { + self.sender.clone() + } + + pub async fn gossip_receiver(&self) -> GossipReceiver { + self.receiver.clone() + } + + pub async fn stop_background_loops(&self) { + self.running.store(false, std::sync::atomic::Ordering::Relaxed); + } +} + +impl Topic { + pub(self) fn new( + topic_bytes: Vec, + signing_key: &SigningKey, + gossip: Gossip, + sender: GossipSender, + receiver: GossipReceiver, + ) -> Self { + let dht = Dht::new(signing_key); + Self { + dht: Arc::new(Mutex::new(dht)), + _gossip: gossip, + sender, + receiver, + running: Arc::new(AtomicBool::new(true)), + topic_bytes, + added_nodes: Arc::new(Mutex::new(HashSet::from([signing_key.verifying_key()]))), + } + } + + pub(self) fn start_self_publish(&self) -> anyhow::Result> { + let is_running = self.running.clone(); + let dht = self.dht.clone(); + let topic_bytes = self.topic_bytes.clone(); + Ok(tokio::spawn(async move { + while is_running.load(std::sync::atomic::Ordering::Relaxed) { + { + let mut lock = dht.lock().await; + let res = lock.announce_self(&topic_bytes).await; + tracing::debug!("self_announce: {:?}", res); + } + tokio::time::sleep(Duration::from_secs(300)).await; + } + })) + } + + pub(self) fn start_infinite_bootstrap(&self) -> anyhow::Result> { + let is_running = self.running.clone(); + let dht = self.dht.clone(); + let topic_bytes = self.topic_bytes.clone(); + let sender = self.sender.clone(); + let added_nodes = self.added_nodes.clone(); + Ok(tokio::spawn(async move { + while is_running.load(std::sync::atomic::Ordering::Relaxed) { + let peers = { + let mut lock = dht.lock().await; + lock.get_peers(&topic_bytes).await.unwrap_or_default() + }; + if !peers.is_empty() { + let mut added_nodes_lock = added_nodes.lock().await; + let unknown_peers = peers + .iter() + .filter_map(|peer| { + if !added_nodes_lock.contains(peer) { + Some(*peer) + } else { + None + } + }) + .collect::>(); + + if let Err(e) = sender + .join_peers( + unknown_peers + .iter() + .filter_map(|p| PublicKey::from_bytes(p.as_bytes()).ok()) + .collect::>(), + Some(5), + ) + .await + { + tracing::debug!("bootstrap join_peers error: {:?}", e); + } else { + tracing::debug!("bootstrap joined {} new peers", unknown_peers.len()); + added_nodes_lock.extend(unknown_peers); + } + } + + if added_nodes.lock().await.len() < 2 { + tracing::debug!("not enough peers yet, waiting before next bootstrap attempt"); + tokio::time::sleep(Duration::from_secs(1)).await; + } else { + tokio::time::sleep(Duration::from_secs(20)).await; + } + } + })) + } + + pub(self) async fn wait_for_join(&self) { + loop { + if self.added_nodes.lock().await.len() >= 2 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} + +fn topic_hash_32(topic_bytes: &Vec) -> [u8; 32] { + let mut hasher = sha2::Sha256::new(); + hasher.update(topic_bytes); + hasher.finalize()[..32].try_into().expect("hashing failed") +} diff --git a/src/lib.rs b/src/lib.rs index 1001740..2a549e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,26 +1,55 @@ #![doc = include_str!("../README.md")] +#[cfg(not(feature = "experimental"))] mod crypto; +#[cfg(not(feature = "experimental"))] mod dht; +#[cfg(feature = "experimental")] +mod dht_experimental; +mod core; + +pub use core::*; + +#[cfg(not(feature = "experimental"))] #[cfg(feature = "iroh-gossip")] mod gossip; +#[cfg(not(feature = "experimental"))] #[cfg(feature = "iroh-gossip")] pub use gossip::{ AutoDiscoveryGossip, Bootstrap, BubbleMerge, GossipReceiver, GossipRecordContent, GossipSender, MessageOverlapMerge, Publisher, Topic, TopicId, }; +#[cfg(feature = "experimental")] +#[cfg(feature = "iroh-gossip")] +mod gossip_experimental; +#[cfg(feature = "experimental")] +#[cfg(feature = "iroh-gossip")] +pub use gossip_experimental::{ + AutoDiscoveryGossip, Topic +}; + +#[cfg(not(feature = "experimental"))] pub use crypto::{ DefaultSecretRotation, EncryptedRecord, Record, RecordPublisher, RecordTopic, RotationHandle, SecretRotation, encryption_keypair, salt, signing_keypair, }; -pub use dht::Dht; + +#[cfg(feature = "experimental")] +pub use mainline_exp::Dht; +#[cfg(not(feature = "experimental"))] +pub use mainline::Dht; + + + + /// Maximum number of bootstrap records allowed per topic per time slot (minute). /// /// When publishing to the DHT, records are not published if this threshold /// has already been reached for the current minute slot. +#[cfg(not(feature = "experimental"))] pub const MAX_BOOTSTRAP_RECORDS: usize = 100; /// Get the current Unix minute timestamp, optionally offset. @@ -35,6 +64,7 @@ pub const MAX_BOOTSTRAP_RECORDS: usize = 100; /// let now = unix_minute(0); /// let prev_minute = unix_minute(-1); /// ``` +#[cfg(not(feature = "experimental"))] pub fn unix_minute(minute_offset: i64) -> u64 { ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64 } diff --git a/tests/tests.rs b/tests/tests.rs index 6889c11..849a004 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -4,7 +4,7 @@ use distributed_topic_tracker::{ DefaultSecretRotation, EncryptedRecord, GossipRecordContent, Record, RecordTopic, RotationHandle, encryption_keypair, salt, signing_keypair, unix_minute, }; -use dht::SigningKey; +use ed25519_dalek::SigningKey; #[test] fn test_record_serialization_roundtrip() { From dcf6503353d8c1257c41aeeb4b0c09fc432abf2c Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 22:04:53 +0100 Subject: [PATCH 04/20] removed custom docker-compose command field --- docker-compose.yml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 04a59a8..2fa8c77 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,27 +4,15 @@ services: container_name: dtt-node1 environment: - RUST_LOG=info - command: > - sh -c "echo 'Starting node1...' && - sleep 2 && - e2e_test" node2: build: . container_name: dtt-node2 environment: - RUST_LOG=info - command: > - sh -c "echo 'Starting node2...' && - sleep 5 && - e2e_test" node3: build: . container_name: dtt-node3 environment: - RUST_LOG=info - command: > - sh -c "echo 'Starting node3...' && - sleep 8 && - e2e_test" From 0c6a950ae48890e187c30478264c1dfb6b226fd0 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 22:20:59 +0100 Subject: [PATCH 05/20] test fix debug --- .github/workflows/test.yml | 4 ++-- docker-compose-experimental.yml | 24 ++++++++++++++++++++++++ test-e2e.sh | 6 +++--- 3 files changed, 29 insertions(+), 5 deletions(-) create mode 100644 docker-compose-experimental.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c8c1fb4..a2c7be4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,7 @@ jobs: run: docker build -t distributed-topic-tracker . - name: Run end-to-end test - run: ./test-e2e.sh + run: COMPOSE_FILE=docker-compose-experimental.yaml ./test-e2e.sh e2e-test-experimental: name: End-to-End Tests (Experimental) @@ -37,5 +37,5 @@ jobs: run: docker build -t distributed-topic-tracker --file Dockerfile.experimental . - name: Run end-to-end test - run: ./test-e2e.sh + run: COMPOSE_FILE=docker-compose-experimental.yaml ./test-e2e.sh diff --git a/docker-compose-experimental.yml b/docker-compose-experimental.yml new file mode 100644 index 0000000..00cd724 --- /dev/null +++ b/docker-compose-experimental.yml @@ -0,0 +1,24 @@ +services: + node1: + build: + context: . + dockerfile: Dockerfile.experimental + container_name: dtt-node1 + environment: + - RUST_LOG=info + + node2: + build: + context: . + dockerfile: Dockerfile.experimental + container_name: dtt-node2 + environment: + - RUST_LOG=info + + node3: + build: + context: . + dockerfile: Dockerfile.experimental + container_name: dtt-node3 + environment: + - RUST_LOG=info diff --git a/test-e2e.sh b/test-e2e.sh index b7021e7..2acd59f 100755 --- a/test-e2e.sh +++ b/test-e2e.sh @@ -8,11 +8,11 @@ set -e echo "Starting end-to-end test..." # Clean up any existing containers -docker compose down --remove-orphans || true +docker compose --file $COMPOSE_FILE down --remove-orphans || true # Build and start the containers echo "Building and starting containers..." -docker compose up --build -d +docker compose --file $COMPOSE_FILE up --build -d # Function to check if a container has printed "Joined topic" check_joined_topic() { @@ -91,7 +91,7 @@ echo "=========================================" # Clean up echo "Cleaning up containers..." -docker compose down +docker compose --file $COMPOSE_FILE down if [ "$success" = true ]; then echo "End-to-end test PASSED: All nodes successfully joined the topic and completed execution" From 0df31dcec6d47459e74144a5cac174c3283a34a7 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 22:27:31 +0100 Subject: [PATCH 06/20] test debug --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a2c7be4..e695409 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,7 @@ jobs: run: docker build -t distributed-topic-tracker . - name: Run end-to-end test - run: COMPOSE_FILE=docker-compose-experimental.yaml ./test-e2e.sh + run: COMPOSE_FILE=./docker-compose.yaml ./test-e2e.sh e2e-test-experimental: name: End-to-End Tests (Experimental) @@ -37,5 +37,5 @@ jobs: run: docker build -t distributed-topic-tracker --file Dockerfile.experimental . - name: Run end-to-end test - run: COMPOSE_FILE=docker-compose-experimental.yaml ./test-e2e.sh + run: COMPOSE_FILE=./docker-compose-experimental.yaml ./test-e2e.sh From 8f1b5ea6e844665daf0a3a7b4c7941110d14d585 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 22:36:13 +0100 Subject: [PATCH 07/20] test debug: i love gh actions (why is ./ not repo root) --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e695409..aa77364 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,7 @@ jobs: run: docker build -t distributed-topic-tracker . - name: Run end-to-end test - run: COMPOSE_FILE=./docker-compose.yaml ./test-e2e.sh + run: COMPOSE_FILE=../docker-compose.yaml ./test-e2e.sh e2e-test-experimental: name: End-to-End Tests (Experimental) @@ -37,5 +37,5 @@ jobs: run: docker build -t distributed-topic-tracker --file Dockerfile.experimental . - name: Run end-to-end test - run: COMPOSE_FILE=./docker-compose-experimental.yaml ./test-e2e.sh + run: COMPOSE_FILE=../docker-compose-experimental.yaml ./test-e2e.sh From bee9af1e39476b7c2d973d1ec315ba67a0bfe529 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Thu, 30 Oct 2025 22:48:20 +0100 Subject: [PATCH 08/20] =?UTF-8?q?.yaml=20->=20.yaml=20|=C2=A0i=20am=20blin?= =?UTF-8?q?d?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index aa77364..9d6a291 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,7 @@ jobs: run: docker build -t distributed-topic-tracker . - name: Run end-to-end test - run: COMPOSE_FILE=../docker-compose.yaml ./test-e2e.sh + run: COMPOSE_FILE=./docker-compose.yml ./test-e2e.sh e2e-test-experimental: name: End-to-End Tests (Experimental) @@ -37,5 +37,5 @@ jobs: run: docker build -t distributed-topic-tracker --file Dockerfile.experimental . - name: Run end-to-end test - run: COMPOSE_FILE=../docker-compose-experimental.yaml ./test-e2e.sh + run: COMPOSE_FILE=./docker-compose-experimental.yml ./test-e2e.sh From 292ed21039ea05bb74745472584086e92c73db42 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 12:08:47 +0100 Subject: [PATCH 09/20] cleanup + hashing fix --- .../dht.rs} | 23 ++++++------ .../gossip.rs} | 5 +-- src/experimental/mod.rs | 5 +++ src/lib.rs | 36 ++++++++----------- 4 files changed, 32 insertions(+), 37 deletions(-) rename src/{dht_experimental.rs => experimental/dht.rs} (75%) rename src/{gossip_experimental.rs => experimental/gossip.rs} (97%) create mode 100644 src/experimental/mod.rs diff --git a/src/dht_experimental.rs b/src/experimental/dht.rs similarity index 75% rename from src/dht_experimental.rs rename to src/experimental/dht.rs index 4c791a8..ccb3fde 100644 --- a/src/dht_experimental.rs +++ b/src/experimental/dht.rs @@ -28,14 +28,9 @@ impl Dht { if self.dht.is_none() { self.reset()?; } - let mut hasher = sha2::Sha512::new(); - hasher.update(topic_bytes); - let hash: [u8; 20] = hasher.finalize()[..20] - .try_into() - .context("Failed to convert hash")?; let dht = self.dht.as_mut().context("DHT not initialized")?; - let id = Id::from_bytes(hash)?; + let id = Id::from_bytes(topic_hash_20(topic_bytes))?; let topic_stream = dht.get_signed_peers(id).await.collect::>(); Ok(topic_stream @@ -50,14 +45,9 @@ impl Dht { if self.dht.is_none() { self.reset()?; } - let mut hasher = sha2::Sha512::new(); - hasher.update(topic_bytes); - let hash: [u8; 20] = hasher.finalize()[..20] - .try_into() - .context("Failed to convert hash")?; - + let dht = self.dht.as_mut().context("DHT not initialized")?; - let id = Id::from_bytes(hash)?; + let id = Id::from_bytes(topic_hash_20(topic_bytes))?; dht.announce_signed_peer(id, &self.signing_key) .await @@ -65,3 +55,10 @@ impl Dht { .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {}", e)) } } + +fn topic_hash_20(topic_bytes: &Vec) -> [u8; 20] { + let mut hasher = sha2::Sha512::new(); + hasher.update("/iroh/distributed-topic-tracker"); + hasher.update(topic_bytes); + hasher.finalize()[..20].try_into().expect("hashing failed") +} diff --git a/src/gossip_experimental.rs b/src/experimental/gossip.rs similarity index 97% rename from src/gossip_experimental.rs rename to src/experimental/gossip.rs index c44ef1f..a4bd2c1 100644 --- a/src/gossip_experimental.rs +++ b/src/experimental/gossip.rs @@ -10,7 +10,7 @@ use iroh_gossip::Gossip; use sha2::Digest; use tokio::{sync::Mutex, task::JoinHandle}; -use crate::{dht_experimental::Dht, receiver::GossipReceiver, sender::GossipSender}; +use crate::{experimental::Dht, receiver::GossipReceiver, sender::GossipSender}; pub trait AutoDiscoveryGossip { #[allow(async_fn_in_trait)] @@ -199,7 +199,8 @@ impl Topic { } fn topic_hash_32(topic_bytes: &Vec) -> [u8; 32] { - let mut hasher = sha2::Sha256::new(); + let mut hasher = sha2::Sha512::new(); + hasher.update("/iroh/distributed-topic-tracker"); hasher.update(topic_bytes); hasher.finalize()[..32].try_into().expect("hashing failed") } diff --git a/src/experimental/mod.rs b/src/experimental/mod.rs new file mode 100644 index 0000000..605b9ed --- /dev/null +++ b/src/experimental/mod.rs @@ -0,0 +1,5 @@ +mod gossip; +mod dht; + +pub use gossip::{AutoDiscoveryGossip, Topic}; +pub use dht::Dht; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 2a549e1..fb98a8a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,14 @@ #![doc = include_str!("../README.md")] +mod core; + + #[cfg(not(feature = "experimental"))] mod crypto; #[cfg(not(feature = "experimental"))] mod dht; -#[cfg(feature = "experimental")] -mod dht_experimental; - -mod core; pub use core::*; - #[cfg(not(feature = "experimental"))] #[cfg(feature = "iroh-gossip")] mod gossip; @@ -20,31 +18,14 @@ pub use gossip::{ AutoDiscoveryGossip, Bootstrap, BubbleMerge, GossipReceiver, GossipRecordContent, GossipSender, MessageOverlapMerge, Publisher, Topic, TopicId, }; -#[cfg(feature = "experimental")] -#[cfg(feature = "iroh-gossip")] -mod gossip_experimental; -#[cfg(feature = "experimental")] -#[cfg(feature = "iroh-gossip")] -pub use gossip_experimental::{ - AutoDiscoveryGossip, Topic -}; - - #[cfg(not(feature = "experimental"))] pub use crypto::{ DefaultSecretRotation, EncryptedRecord, Record, RecordPublisher, RecordTopic, RotationHandle, SecretRotation, encryption_keypair, salt, signing_keypair, }; - -#[cfg(feature = "experimental")] -pub use mainline_exp::Dht; #[cfg(not(feature = "experimental"))] pub use mainline::Dht; - - - - /// Maximum number of bootstrap records allowed per topic per time slot (minute). /// /// When publishing to the DHT, records are not published if this threshold @@ -68,3 +49,14 @@ pub const MAX_BOOTSTRAP_RECORDS: usize = 100; pub fn unix_minute(minute_offset: i64) -> u64 { ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64 } + + +// Experimental announce_signed_peer and get_signed_peers DHT implementation +#[cfg(feature = "experimental")] +mod experimental; +#[cfg(feature = "experimental")] +pub use experimental::{AutoDiscoveryGossip, Topic}; + + +#[cfg(feature = "experimental")] +pub use mainline_exp::Dht; From 39fddaabe72fb5b5c414c15dcae706c5ae8b6fdf Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 12:17:31 +0100 Subject: [PATCH 10/20] added extra_bootstrap --- Cargo.toml | 2 +- src/experimental/dht.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a6eb39b..957279c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ version = "0.2.4" edition = "2024" [features] -default = ["iroh-gossip"] +default = ["iroh-gossip","experimental"] iroh-gossip = ["dep:iroh", "dep:iroh-gossip"] experimental = ["dep:mainline_exp"] diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs index ccb3fde..e7ed241 100644 --- a/src/experimental/dht.rs +++ b/src/experimental/dht.rs @@ -20,7 +20,7 @@ impl Dht { } pub fn reset(&mut self) -> Result<()> { - self.dht = Some(MainlineDht::builder().build()?.as_async()); + self.dht = Some(MainlineDht::builder().extra_bootstrap(&["pkarr.rustonbsd.com:6881"]).build()?.as_async()); Ok(()) } @@ -45,7 +45,7 @@ impl Dht { if self.dht.is_none() { self.reset()?; } - + let dht = self.dht.as_mut().context("DHT not initialized")?; let id = Id::from_bytes(topic_hash_20(topic_bytes))?; From c11cbf2d911adcad1c6de96d0102e25d7a66373c Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 12:18:19 +0100 Subject: [PATCH 11/20] fmt --- examples/chat_experimental.rs | 7 +++++-- examples/e2e_test_experimental.rs | 6 ++++-- src/core/mod.rs | 2 +- src/dht.rs | 7 +------ src/experimental/dht.rs | 7 ++++++- src/experimental/gossip.rs | 8 ++++++-- src/experimental/mod.rs | 4 ++-- src/gossip/mod.rs | 7 +++---- src/gossip/topic/bootstrap.rs | 5 +---- src/lib.rs | 13 +++++-------- 10 files changed, 34 insertions(+), 32 deletions(-) diff --git a/examples/chat_experimental.rs b/examples/chat_experimental.rs index efdf840..27a2ce9 100644 --- a/examples/chat_experimental.rs +++ b/examples/chat_experimental.rs @@ -39,11 +39,14 @@ async fn main() -> Result<()> { .accept(iroh_gossip::ALPN, gossip.clone()) .spawn(); - let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); + let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); // Split into sink (sending) and stream (receiving) let (gossip_sender, gossip_receiver) = gossip - .subscribe_and_join_with_auto_discovery(topic_id, signing_key).await?.split().await; + .subscribe_and_join_with_auto_discovery(topic_id, signing_key) + .await? + .split() + .await; println!("Joined topic"); diff --git a/examples/e2e_test_experimental.rs b/examples/e2e_test_experimental.rs index 8d363b0..6ff61e9 100644 --- a/examples/e2e_test_experimental.rs +++ b/examples/e2e_test_experimental.rs @@ -25,10 +25,12 @@ async fn main() -> Result<()> { .accept(iroh_gossip::ALPN, gossip.clone()) .spawn(); - let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); let (gossip_sender, gossip_receiver) = gossip - .subscribe_and_join_with_auto_discovery(topic_id, signing_key).await?.split().await; + .subscribe_and_join_with_auto_discovery(topic_id, signing_key) + .await? + .split() + .await; tokio::spawn(async move { while let Some(Ok(event)) = gossip_receiver.next().await { diff --git a/src/core/mod.rs b/src/core/mod.rs index f732700..42cde01 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,2 +1,2 @@ +pub mod receiver; pub mod sender; -pub mod receiver; \ No newline at end of file diff --git a/src/dht.rs b/src/dht.rs index 277c048..d247804 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -159,12 +159,7 @@ impl DhtActor { salt.as_deref(), ) } else { - MutableItem::new( - signing_key.clone(), - &data, - 0, - salt.as_deref(), - ) + MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref()) }; let put_result = match tokio::time::timeout( diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs index e7ed241..4118676 100644 --- a/src/experimental/dht.rs +++ b/src/experimental/dht.rs @@ -20,7 +20,12 @@ impl Dht { } pub fn reset(&mut self) -> Result<()> { - self.dht = Some(MainlineDht::builder().extra_bootstrap(&["pkarr.rustonbsd.com:6881"]).build()?.as_async()); + self.dht = Some( + MainlineDht::builder() + .extra_bootstrap(&["pkarr.rustonbsd.com:6881"]) + .build()? + .as_async(), + ); Ok(()) } diff --git a/src/experimental/gossip.rs b/src/experimental/gossip.rs index a4bd2c1..b811afd 100644 --- a/src/experimental/gossip.rs +++ b/src/experimental/gossip.rs @@ -47,7 +47,10 @@ impl AutoDiscoveryGossip for iroh_gossip::net::Gossip { signing_key: SigningKey, ) -> anyhow::Result { let gossip_topic: iroh_gossip::api::GossipTopic = self - .subscribe(iroh_gossip::proto::TopicId::from_bytes(topic_hash_32(&topic_id)), vec![]) + .subscribe( + iroh_gossip::proto::TopicId::from_bytes(topic_hash_32(&topic_id)), + vec![], + ) .await?; let (gossip_sender, gossip_receiver) = gossip_topic.split(); let (gossip_sender, gossip_receiver) = ( @@ -96,7 +99,8 @@ impl Topic { } pub async fn stop_background_loops(&self) { - self.running.store(false, std::sync::atomic::Ordering::Relaxed); + self.running + .store(false, std::sync::atomic::Ordering::Relaxed); } } diff --git a/src/experimental/mod.rs b/src/experimental/mod.rs index 605b9ed..7b3ea69 100644 --- a/src/experimental/mod.rs +++ b/src/experimental/mod.rs @@ -1,5 +1,5 @@ -mod gossip; mod dht; +mod gossip; +pub use dht::Dht; pub use gossip::{AutoDiscoveryGossip, Topic}; -pub use dht::Dht; \ No newline at end of file diff --git a/src/gossip/mod.rs b/src/gossip/mod.rs index 26b36f3..e76e769 100644 --- a/src/gossip/mod.rs +++ b/src/gossip/mod.rs @@ -4,15 +4,14 @@ //! and topic joining without prior knowledge of peers. Includes bubble detection //! and message overlap merging for cluster topology optimization. - mod merge; mod topic; -pub use merge::{BubbleMerge, MessageOverlapMerge}; -pub use topic::{Bootstrap, Publisher, Topic, TopicId}; pub use crate::core::receiver::GossipReceiver; pub use crate::core::sender::GossipSender; +pub use merge::{BubbleMerge, MessageOverlapMerge}; use serde::{Deserialize, Serialize}; +pub use topic::{Bootstrap, Publisher, Topic, TopicId}; use crate::RecordPublisher; @@ -64,4 +63,4 @@ impl AutoDiscoveryGossip for iroh_gossip::net::Gossip { ) -> anyhow::Result { Topic::new(record_publisher, self.clone(), true).await } -} \ No newline at end of file +} diff --git a/src/gossip/topic/bootstrap.rs b/src/gossip/topic/bootstrap.rs index 3f5b8a5..5669a20 100644 --- a/src/gossip/topic/bootstrap.rs +++ b/src/gossip/topic/bootstrap.rs @@ -8,10 +8,7 @@ use iroh::EndpointId; use tokio::time::sleep; use crate::{ - GossipSender, - crypto::Record, - gossip::GossipRecordContent, - core::receiver::GossipReceiver + GossipSender, core::receiver::GossipReceiver, crypto::Record, gossip::GossipRecordContent, }; /// Manages the peer discovery and joining process. diff --git a/src/lib.rs b/src/lib.rs index fb98a8a..91137ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ mod core; - #[cfg(not(feature = "experimental"))] mod crypto; #[cfg(not(feature = "experimental"))] @@ -13,17 +12,17 @@ pub use core::*; #[cfg(feature = "iroh-gossip")] mod gossip; #[cfg(not(feature = "experimental"))] +pub use crypto::{ + DefaultSecretRotation, EncryptedRecord, Record, RecordPublisher, RecordTopic, RotationHandle, + SecretRotation, encryption_keypair, salt, signing_keypair, +}; +#[cfg(not(feature = "experimental"))] #[cfg(feature = "iroh-gossip")] pub use gossip::{ AutoDiscoveryGossip, Bootstrap, BubbleMerge, GossipReceiver, GossipRecordContent, GossipSender, MessageOverlapMerge, Publisher, Topic, TopicId, }; #[cfg(not(feature = "experimental"))] -pub use crypto::{ - DefaultSecretRotation, EncryptedRecord, Record, RecordPublisher, RecordTopic, RotationHandle, - SecretRotation, encryption_keypair, salt, signing_keypair, -}; -#[cfg(not(feature = "experimental"))] pub use mainline::Dht; /// Maximum number of bootstrap records allowed per topic per time slot (minute). @@ -50,13 +49,11 @@ pub fn unix_minute(minute_offset: i64) -> u64 { ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64 } - // Experimental announce_signed_peer and get_signed_peers DHT implementation #[cfg(feature = "experimental")] mod experimental; #[cfg(feature = "experimental")] pub use experimental::{AutoDiscoveryGossip, Topic}; - #[cfg(feature = "experimental")] pub use mainline_exp::Dht; From 78d66be1a772f62a4ed79e20104289627f71e41d Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 12:22:46 +0100 Subject: [PATCH 12/20] stupid --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 957279c..a6eb39b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ version = "0.2.4" edition = "2024" [features] -default = ["iroh-gossip","experimental"] +default = ["iroh-gossip"] iroh-gossip = ["dep:iroh", "dep:iroh-gossip"] experimental = ["dep:mainline_exp"] From 6fe78b26cda9fb085a8c3cde57dbae19e8606d6f Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 13:45:10 +0100 Subject: [PATCH 13/20] some refinements --- Dockerfile.experimental | 2 +- examples/e2e_test_experimental.rs | 38 +++++++++++++++++++--- src/experimental/dht.rs | 19 ++++++----- src/experimental/gossip.rs | 53 +++++++++++++++++-------------- 4 files changed, 75 insertions(+), 37 deletions(-) diff --git a/Dockerfile.experimental b/Dockerfile.experimental index e387c94..eb6d339 100644 --- a/Dockerfile.experimental +++ b/Dockerfile.experimental @@ -23,4 +23,4 @@ RUN apt-get update && apt-get install -y \ COPY --from=builder /app/target/release/examples/e2e_test_experimental /usr/local/bin/e2e_test_experimental # Set the default command -CMD ["e2e_test_experimental"] +CMD ["e2e_test_experimental","2"] diff --git a/examples/e2e_test_experimental.rs b/examples/e2e_test_experimental.rs index 6ff61e9..f18942e 100644 --- a/examples/e2e_test_experimental.rs +++ b/examples/e2e_test_experimental.rs @@ -1,12 +1,31 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + use anyhow::Result; use iroh::{Endpoint, SecretKey}; use iroh_gossip::net::Gossip; // Imports from distrubuted-topic-tracker use distributed_topic_tracker::AutoDiscoveryGossip; +use tokio::time::Instant; #[tokio::main] async fn main() -> Result<()> { + /* + use tracing_subscriber::filter::EnvFilter; + + tracing_subscriber::fmt() + .with_thread_ids(true) + .with_ansi(true) + .with_env_filter( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")), + ) + .init(); + */ + + // from input first param + let expected_neighbours = std::env::args().nth(1).unwrap_or("1".to_string()).parse::()?; + // Generate a new random secret key let secret_key = SecretKey::generate(&mut rand::rng()); let signing_key = ed25519_dalek::SigningKey::from_bytes(&secret_key.to_bytes()); @@ -25,6 +44,7 @@ async fn main() -> Result<()> { .accept(iroh_gossip::ALPN, gossip.clone()) .spawn(); + let cold_start_timer = Instant::now(); let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); let (gossip_sender, gossip_receiver) = gossip .subscribe_and_join_with_auto_discovery(topic_id, signing_key) @@ -32,20 +52,30 @@ async fn main() -> Result<()> { .split() .await; - tokio::spawn(async move { + println!("Cold start time: {:.0}ms", cold_start_timer.elapsed().as_millis()); + + let total_messages_recv = Arc::new(AtomicUsize::new(0)); + tokio::spawn({ + + let total_messages_recv = total_messages_recv.clone(); + async move { while let Some(Ok(event)) = gossip_receiver.next().await { println!("event: {event:?}"); + if matches!(event, iroh_gossip::api::Event::NeighborUp(_)) { + total_messages_recv.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } } - }); + }}); - tokio::time::sleep(std::time::Duration::from_secs(3)).await; gossip_sender .broadcast(format!("hi from {}", endpoint.id()).into()) .await?; println!("[joined topic]"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + while total_messages_recv.load(std::sync::atomic::Ordering::Relaxed) < expected_neighbours { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } println!("[finished]"); diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs index 4118676..0cda7b5 100644 --- a/src/experimental/dht.rs +++ b/src/experimental/dht.rs @@ -19,19 +19,22 @@ impl Dht { } } - pub fn reset(&mut self) -> Result<()> { - self.dht = Some( - MainlineDht::builder() + pub async fn reset(&mut self) -> Result<()> { + let dht = MainlineDht::builder() .extra_bootstrap(&["pkarr.rustonbsd.com:6881"]) .build()? - .as_async(), - ); + .as_async(); + if !dht.bootstrapped().await { + anyhow::bail!("DHT bootstrap failed"); + } + self.dht = Some(dht); + Ok(()) } pub async fn get_peers(&mut self, topic_bytes: &Vec) -> Result> { if self.dht.is_none() { - self.reset()?; + self.reset().await?; } let dht = self.dht.as_mut().context("DHT not initialized")?; @@ -48,7 +51,7 @@ impl Dht { pub async fn announce_self(&mut self, topic_bytes: &Vec) -> Result<()> { if self.dht.is_none() { - self.reset()?; + self.reset().await?; } let dht = self.dht.as_mut().context("DHT not initialized")?; @@ -57,7 +60,7 @@ impl Dht { dht.announce_signed_peer(id, &self.signing_key) .await .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {e}")) } } diff --git a/src/experimental/gossip.rs b/src/experimental/gossip.rs index b811afd..7aea160 100644 --- a/src/experimental/gossip.rs +++ b/src/experimental/gossip.rs @@ -129,13 +129,20 @@ impl Topic { let dht = self.dht.clone(); let topic_bytes = self.topic_bytes.clone(); Ok(tokio::spawn(async move { + let mut backoff = 0; while is_running.load(std::sync::atomic::Ordering::Relaxed) { - { + let wait_dur = { let mut lock = dht.lock().await; - let res = lock.announce_self(&topic_bytes).await; - tracing::debug!("self_announce: {:?}", res); - } - tokio::time::sleep(Duration::from_secs(300)).await; + if let Err(e) = lock.announce_self(&topic_bytes).await { + tracing::debug!("self_announce: {e}"); + backoff += 1; + Duration::from_secs(backoff.min(300)) + } else { + backoff = 0; + Duration::from_secs(300) + } + }; + tokio::time::sleep(wait_dur).await; } })) } @@ -145,6 +152,7 @@ impl Topic { let dht = self.dht.clone(); let topic_bytes = self.topic_bytes.clone(); let sender = self.sender.clone(); + let receiver = self.receiver.clone(); let added_nodes = self.added_nodes.clone(); Ok(tokio::spawn(async move { while is_running.load(std::sync::atomic::Ordering::Relaxed) { @@ -152,6 +160,7 @@ impl Topic { let mut lock = dht.lock().await; lock.get_peers(&topic_bytes).await.unwrap_or_default() }; + if !peers.is_empty() { let mut added_nodes_lock = added_nodes.lock().await; let unknown_peers = peers @@ -165,24 +174,23 @@ impl Topic { }) .collect::>(); - if let Err(e) = sender - .join_peers( - unknown_peers - .iter() - .filter_map(|p| PublicKey::from_bytes(p.as_bytes()).ok()) - .collect::>(), - Some(5), - ) - .await - { - tracing::debug!("bootstrap join_peers error: {:?}", e); - } else { - tracing::debug!("bootstrap joined {} new peers", unknown_peers.len()); - added_nodes_lock.extend(unknown_peers); + for p in unknown_peers { + if let Err(e) = sender + .join_peers(vec![PublicKey::from_verifying_key(p)], + None, + ) + .await + { + tracing::debug!("bootstrap join_peers error: {:?}", e); + } else { + tracing::debug!("bootstrap joined new peers"); + } + added_nodes_lock.insert(p); } + } - if added_nodes.lock().await.len() < 2 { + if !receiver.is_joined().await || added_nodes.lock().await.len() < 2 { tracing::debug!("not enough peers yet, waiting before next bootstrap attempt"); tokio::time::sleep(Duration::from_secs(1)).await; } else { @@ -193,10 +201,7 @@ impl Topic { } pub(self) async fn wait_for_join(&self) { - loop { - if self.added_nodes.lock().await.len() >= 2 { - break; - } + while !self.receiver.is_joined().await { tokio::time::sleep(Duration::from_millis(100)).await; } } From bd3215e95ef88cb97afdbb3970bb820787082907 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 14:33:33 +0100 Subject: [PATCH 14/20] backoff timings --- examples/e2e_test_experimental.rs | 8 +++++--- src/experimental/dht.rs | 5 ++++- src/experimental/gossip.rs | 16 +++++++++++----- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/examples/e2e_test_experimental.rs b/examples/e2e_test_experimental.rs index f18942e..6b76fde 100644 --- a/examples/e2e_test_experimental.rs +++ b/examples/e2e_test_experimental.rs @@ -10,7 +10,7 @@ use tokio::time::Instant; #[tokio::main] async fn main() -> Result<()> { - /* + use tracing_subscriber::filter::EnvFilter; tracing_subscriber::fmt() @@ -21,7 +21,7 @@ async fn main() -> Result<()> { .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")), ) .init(); - */ + // from input first param let expected_neighbours = std::env::args().nth(1).unwrap_or("1".to_string()).parse::()?; @@ -45,7 +45,7 @@ async fn main() -> Result<()> { .spawn(); let cold_start_timer = Instant::now(); - let topic_id = "my-iroh-gossip-topic-experimental".as_bytes().to_vec(); + let topic_id = "my-iroh-gossip-topic-experimental-1".as_bytes().to_vec(); let (gossip_sender, gossip_receiver) = gossip .subscribe_and_join_with_auto_discovery(topic_id, signing_key) .await? @@ -79,6 +79,8 @@ async fn main() -> Result<()> { println!("[finished]"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // successfully joined // exit with code 0 Ok(()) diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs index 0cda7b5..e186872 100644 --- a/src/experimental/dht.rs +++ b/src/experimental/dht.rs @@ -20,6 +20,9 @@ impl Dht { } pub async fn reset(&mut self) -> Result<()> { + if self.dht.is_some() { + return Ok(()); + } let dht = MainlineDht::builder() .extra_bootstrap(&["pkarr.rustonbsd.com:6881"]) .build()? @@ -57,7 +60,7 @@ impl Dht { let dht = self.dht.as_mut().context("DHT not initialized")?; let id = Id::from_bytes(topic_hash_20(topic_bytes))?; - dht.announce_signed_peer(id, &self.signing_key) + tokio::time::timeout(std::time::Duration::from_secs(5), dht.announce_signed_peer(id, &self.signing_key)) .await .map(|_| ()) .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {e}")) diff --git a/src/experimental/gossip.rs b/src/experimental/gossip.rs index 7aea160..717d94b 100644 --- a/src/experimental/gossip.rs +++ b/src/experimental/gossip.rs @@ -133,11 +133,13 @@ impl Topic { while is_running.load(std::sync::atomic::Ordering::Relaxed) { let wait_dur = { let mut lock = dht.lock().await; + if let Err(e) = lock.announce_self(&topic_bytes).await { - tracing::debug!("self_announce: {e}"); + tracing::warn!("self_announce: {e}"); backoff += 1; Duration::from_secs(backoff.min(300)) } else { + tracing::info!("self_announce: success"); backoff = 0; Duration::from_secs(300) } @@ -155,12 +157,13 @@ impl Topic { let receiver = self.receiver.clone(); let added_nodes = self.added_nodes.clone(); Ok(tokio::spawn(async move { + let mut backoff = 0; while is_running.load(std::sync::atomic::Ordering::Relaxed) { let peers = { let mut lock = dht.lock().await; - lock.get_peers(&topic_bytes).await.unwrap_or_default() + lock.get_peers(&topic_bytes).await.unwrap_or_default().clone() }; - + tracing::debug!("bootstrap found {} peers", peers.len()); if !peers.is_empty() { let mut added_nodes_lock = added_nodes.lock().await; let unknown_peers = peers @@ -192,9 +195,12 @@ impl Topic { if !receiver.is_joined().await || added_nodes.lock().await.len() < 2 { tracing::debug!("not enough peers yet, waiting before next bootstrap attempt"); - tokio::time::sleep(Duration::from_secs(1)).await; + backoff += 1; + tokio::time::sleep(Duration::from_secs(backoff.min(60))).await; } else { - tokio::time::sleep(Duration::from_secs(20)).await; + backoff = 0; + tracing::info!("bootstrap successful"); + tokio::time::sleep(Duration::from_secs(60)).await; } } })) From ec0859d01752a9854b8d59a49b862099c9b29bdb Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 21:21:28 +0100 Subject: [PATCH 15/20] playing with backoff --- Cargo.lock | 1 + Cargo.toml | 2 ++ examples/e2e_test_experimental.rs | 7 ++++--- src/experimental/dht.rs | 35 ++++++++++++++++++++----------- src/experimental/gossip.rs | 17 ++++++++++----- 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 62d8cbb..359c302 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -819,6 +819,7 @@ dependencies = [ "ed25519-dalek", "ed25519-dalek-hpke", "futures-lite", + "hex", "iroh", "iroh-gossip", "mainline", diff --git a/Cargo.toml b/Cargo.toml index a6eb39b..66f8695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,8 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["s mainline = { version = "6.0.0", default-features = false, features = ["async"] } mainline_exp = { git = "https://github.com/Nuhvi/mainline", branch = "main", default-features = false, features = ["async"], optional = true, package = "dht" } +hex = "0.4" + [lib] crate-type = ["cdylib", "rlib"] diff --git a/examples/e2e_test_experimental.rs b/examples/e2e_test_experimental.rs index 6b76fde..1dd687b 100644 --- a/examples/e2e_test_experimental.rs +++ b/examples/e2e_test_experimental.rs @@ -12,19 +12,20 @@ use tokio::time::Instant; async fn main() -> Result<()> { use tracing_subscriber::filter::EnvFilter; - tracing_subscriber::fmt() .with_thread_ids(true) .with_ansi(true) .with_env_filter( EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")), + .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug,dht=debug")), ) .init(); + // from input first param let expected_neighbours = std::env::args().nth(1).unwrap_or("1".to_string()).parse::()?; + let topic_salt = std::env::args().nth(2).unwrap_or("default-salt".to_string()); // Generate a new random secret key let secret_key = SecretKey::generate(&mut rand::rng()); @@ -45,7 +46,7 @@ async fn main() -> Result<()> { .spawn(); let cold_start_timer = Instant::now(); - let topic_id = "my-iroh-gossip-topic-experimental-1".as_bytes().to_vec(); + let topic_id = format!("my-iroh-gossip-topic-experimental-{topic_salt}").as_bytes().to_vec(); let (gossip_sender, gossip_receiver) = gossip .subscribe_and_join_with_auto_discovery(topic_id, signing_key) .await? diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs index e186872..fcbcacf 100644 --- a/src/experimental/dht.rs +++ b/src/experimental/dht.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use anyhow::{Context, Result}; use ed25519_dalek::VerifyingKey; @@ -24,14 +26,14 @@ impl Dht { return Ok(()); } let dht = MainlineDht::builder() - .extra_bootstrap(&["pkarr.rustonbsd.com:6881"]) - .build()? - .as_async(); - if !dht.bootstrapped().await { + .extra_bootstrap(&["pkarr.rustonbsd.com:6881"]) + .build()? + .as_async(); + if !dht.bootstrapped().await { anyhow::bail!("DHT bootstrap failed"); } self.dht = Some(dht); - + Ok(()) } @@ -43,9 +45,15 @@ impl Dht { let dht = self.dht.as_mut().context("DHT not initialized")?; let id = Id::from_bytes(topic_hash_20(topic_bytes))?; - let topic_stream = dht.get_signed_peers(id).await.collect::>(); - Ok(topic_stream - .await + let mut stream = dht.get_signed_peers(id).await; + let mut results = vec![]; + + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + while let Ok(Some(item)) = tokio::time::timeout_at(deadline, stream.next()).await { + results.push(item); + } + + Ok(results .iter() .flatten() .filter_map(|item| VerifyingKey::from_bytes(item.key()).ok()) @@ -60,10 +68,13 @@ impl Dht { let dht = self.dht.as_mut().context("DHT not initialized")?; let id = Id::from_bytes(topic_hash_20(topic_bytes))?; - tokio::time::timeout(std::time::Duration::from_secs(5), dht.announce_signed_peer(id, &self.signing_key)) - .await - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {e}")) + tokio::time::timeout( + std::time::Duration::from_secs(5), + dht.announce_signed_peer(id, &self.signing_key), + ) + .await + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {e}")) } } diff --git a/src/experimental/gossip.rs b/src/experimental/gossip.rs index 717d94b..7c8ca0b 100644 --- a/src/experimental/gossip.rs +++ b/src/experimental/gossip.rs @@ -129,7 +129,7 @@ impl Topic { let dht = self.dht.clone(); let topic_bytes = self.topic_bytes.clone(); Ok(tokio::spawn(async move { - let mut backoff = 0; + let mut backoff = 5; while is_running.load(std::sync::atomic::Ordering::Relaxed) { let wait_dur = { let mut lock = dht.lock().await; @@ -140,7 +140,7 @@ impl Topic { Duration::from_secs(backoff.min(300)) } else { tracing::info!("self_announce: success"); - backoff = 0; + backoff = 5; Duration::from_secs(300) } }; @@ -157,11 +157,18 @@ impl Topic { let receiver = self.receiver.clone(); let added_nodes = self.added_nodes.clone(); Ok(tokio::spawn(async move { - let mut backoff = 0; + let mut backoff = 5; while is_running.load(std::sync::atomic::Ordering::Relaxed) { let peers = { let mut lock = dht.lock().await; - lock.get_peers(&topic_bytes).await.unwrap_or_default().clone() + let res = lock.get_peers(&topic_bytes).await; + + if let Err(e) = res { + tracing::error!("bootstrap get_peers error: {e:?}"); + Vec::new() + } else { + res.unwrap_or_default().clone() + } }; tracing::debug!("bootstrap found {} peers", peers.len()); if !peers.is_empty() { @@ -198,7 +205,7 @@ impl Topic { backoff += 1; tokio::time::sleep(Duration::from_secs(backoff.min(60))).await; } else { - backoff = 0; + backoff = 5; tracing::info!("bootstrap successful"); tokio::time::sleep(Duration::from_secs(60)).await; } From 863b5a8525dd4b3903b644c02cbf2248082cdb99 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 21:26:51 +0100 Subject: [PATCH 16/20] random topic id in e2e test --- Dockerfile.experimental | 2 +- docker-compose-experimental.yml | 6 ++++++ test-e2e.sh | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Dockerfile.experimental b/Dockerfile.experimental index eb6d339..e387c94 100644 --- a/Dockerfile.experimental +++ b/Dockerfile.experimental @@ -23,4 +23,4 @@ RUN apt-get update && apt-get install -y \ COPY --from=builder /app/target/release/examples/e2e_test_experimental /usr/local/bin/e2e_test_experimental # Set the default command -CMD ["e2e_test_experimental","2"] +CMD ["e2e_test_experimental"] diff --git a/docker-compose-experimental.yml b/docker-compose-experimental.yml index 00cd724..74bb883 100644 --- a/docker-compose-experimental.yml +++ b/docker-compose-experimental.yml @@ -6,6 +6,8 @@ services: container_name: dtt-node1 environment: - RUST_LOG=info + - TOPIC_ID=${TOPIC_ID} + command: sh -c "e2e_test_experimental 2 $${TOPIC_ID}" node2: build: @@ -14,6 +16,8 @@ services: container_name: dtt-node2 environment: - RUST_LOG=info + - TOPIC_ID=${TOPIC_ID} + command: sh -c "e2e_test_experimental 2 $${TOPIC_ID}" node3: build: @@ -22,3 +26,5 @@ services: container_name: dtt-node3 environment: - RUST_LOG=info + - TOPIC_ID=${TOPIC_ID} + command: sh -c "e2e_test_experimental 2 $${TOPIC_ID}" \ No newline at end of file diff --git a/test-e2e.sh b/test-e2e.sh index 2acd59f..6f6d24a 100755 --- a/test-e2e.sh +++ b/test-e2e.sh @@ -12,7 +12,7 @@ docker compose --file $COMPOSE_FILE down --remove-orphans || true # Build and start the containers echo "Building and starting containers..." -docker compose --file $COMPOSE_FILE up --build -d +TOPIC_ID=$RANDOM docker compose --file $COMPOSE_FILE up --build -d # Function to check if a container has printed "Joined topic" check_joined_topic() { From 69fdb40f76247a07c171d6bc572b925e1fd221e2 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Fri, 31 Oct 2025 21:36:47 +0100 Subject: [PATCH 17/20] export topic_id --- test-e2e.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test-e2e.sh b/test-e2e.sh index 6f6d24a..013e160 100755 --- a/test-e2e.sh +++ b/test-e2e.sh @@ -7,12 +7,14 @@ set -e echo "Starting end-to-end test..." +export TOPIC_ID=$RANDOM + # Clean up any existing containers docker compose --file $COMPOSE_FILE down --remove-orphans || true # Build and start the containers echo "Building and starting containers..." -TOPIC_ID=$RANDOM docker compose --file $COMPOSE_FILE up --build -d +docker compose --file $COMPOSE_FILE up --build -d # Function to check if a container has printed "Joined topic" check_joined_topic() { From e395e7d8b05eb567e4bc02ee7783b4f4b06fd94b Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Mon, 3 Nov 2025 15:23:54 +0100 Subject: [PATCH 18/20] added more_recent_than test branch --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/experimental/dht.rs | 2 +- test-e2e.sh | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 359c302..855af13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -751,7 +751,7 @@ dependencies = [ [[package]] name = "dht" version = "6.1.0" -source = "git+https://github.com/Nuhvi/mainline?branch=main#e20393e5d92f6c7b7b84f12ff01bc507c15ebc62" +source = "git+https://github.com/rustonbsd/mainline?branch=feat-get-signed-peers-ttl#aad2154c83ffb238f588b7ba322e7704b9106431" dependencies = [ "crc", "document-features", diff --git a/Cargo.toml b/Cargo.toml index 66f8695..2f9e1ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ tracing = { version = "0.1", default-features = false, features = ["std"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "env-filter", "ansi"] } mainline = { version = "6.0.0", default-features = false, features = ["async"] } -mainline_exp = { git = "https://github.com/Nuhvi/mainline", branch = "main", default-features = false, features = ["async"], optional = true, package = "dht" } +mainline_exp = { git = "https://github.com/rustonbsd/mainline", branch = "feat-get-signed-peers-ttl", default-features = false, features = ["async"], optional = true, package = "dht" } hex = "0.4" diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs index fcbcacf..6907efa 100644 --- a/src/experimental/dht.rs +++ b/src/experimental/dht.rs @@ -45,7 +45,7 @@ impl Dht { let dht = self.dht.as_mut().context("DHT not initialized")?; let id = Id::from_bytes(topic_hash_20(topic_bytes))?; - let mut stream = dht.get_signed_peers(id).await; + let mut stream = dht.get_signed_peers(id, Some(chrono::Utc::now().timestamp()-60)).await; let mut results = vec![]; let deadline = tokio::time::Instant::now() + Duration::from_secs(30); diff --git a/test-e2e.sh b/test-e2e.sh index 013e160..466f4c5 100755 --- a/test-e2e.sh +++ b/test-e2e.sh @@ -7,7 +7,8 @@ set -e echo "Starting end-to-end test..." -export TOPIC_ID=$RANDOM +#export TOPIC_ID=$RANDOM +export TOPIC_ID=NOTRANDOMSOWECANSEEIFTHE_MORE_RECENT_THAN_PARAM_IN_MAINLINE_FOR_GET_SIGNED_PEERS_WORKS # Clean up any existing containers docker compose --file $COMPOSE_FILE down --remove-orphans || true From 94cb750e97c83548be4025fc7297ec476b3e5b03 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Wed, 5 Nov 2025 15:06:26 +0100 Subject: [PATCH 19/20] optional timeouts (internally) + 10 sec staggers between node starts in e2e test + (reset) --- Cargo.lock | 2 +- Cargo.toml | 4 +-- docker-compose-experimental.yml | 8 ++++-- docker-compose.yml | 8 ++++++ src/experimental/dht.rs | 47 ++++++++++++++++++++++----------- src/experimental/gossip.rs | 12 +++++---- test-e2e.sh | 3 +-- 7 files changed, 57 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 855af13..57d92e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -751,7 +751,7 @@ dependencies = [ [[package]] name = "dht" version = "6.1.0" -source = "git+https://github.com/rustonbsd/mainline?branch=feat-get-signed-peers-ttl#aad2154c83ffb238f588b7ba322e7704b9106431" +source = "git+https://github.com/Nuhvi/mainline?branch=main#95f192b48f875e9070452a32350b0312ad691954" dependencies = [ "crc", "document-features", diff --git a/Cargo.toml b/Cargo.toml index 2f9e1ef..83e84c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ version = "0.2.4" edition = "2024" [features] -default = ["iroh-gossip"] +default = ["iroh-gossip","experimental"] iroh-gossip = ["dep:iroh", "dep:iroh-gossip"] experimental = ["dep:mainline_exp"] @@ -42,7 +42,7 @@ tracing = { version = "0.1", default-features = false, features = ["std"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "env-filter", "ansi"] } mainline = { version = "6.0.0", default-features = false, features = ["async"] } -mainline_exp = { git = "https://github.com/rustonbsd/mainline", branch = "feat-get-signed-peers-ttl", default-features = false, features = ["async"], optional = true, package = "dht" } +mainline_exp = { git = "https://github.com/Nuhvi/mainline", branch = "main", default-features = false, features = ["async"], optional = true, package = "dht" } hex = "0.4" diff --git a/docker-compose-experimental.yml b/docker-compose-experimental.yml index 74bb883..8bd1fd1 100644 --- a/docker-compose-experimental.yml +++ b/docker-compose-experimental.yml @@ -17,7 +17,9 @@ services: environment: - RUST_LOG=info - TOPIC_ID=${TOPIC_ID} - command: sh -c "e2e_test_experimental 2 $${TOPIC_ID}" + command: sh -c "sleep 10 && e2e_test_experimental 2 $${TOPIC_ID}" + depends_on: + - node1 node3: build: @@ -27,4 +29,6 @@ services: environment: - RUST_LOG=info - TOPIC_ID=${TOPIC_ID} - command: sh -c "e2e_test_experimental 2 $${TOPIC_ID}" \ No newline at end of file + command: sh -c "sleep 20 && e2e_test_experimental 2 $${TOPIC_ID}" + depends_on: + - node2 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2fa8c77..b4e9aea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,15 +4,23 @@ services: container_name: dtt-node1 environment: - RUST_LOG=info + command: sh -c "e2e_test" node2: build: . container_name: dtt-node2 environment: - RUST_LOG=info + command: sh -c "sleep 10 && e2e_test" + depends_on: + - node1 + node3: build: . container_name: dtt-node3 environment: - RUST_LOG=info + command: sh -c "sleep 20 && e2e_test" + depends_on: + - node1 diff --git a/src/experimental/dht.rs b/src/experimental/dht.rs index 6907efa..82d7ec5 100644 --- a/src/experimental/dht.rs +++ b/src/experimental/dht.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{collections::HashSet, time::Duration}; use anyhow::{Context, Result}; use ed25519_dalek::VerifyingKey; @@ -37,7 +37,11 @@ impl Dht { Ok(()) } - pub async fn get_peers(&mut self, topic_bytes: &Vec) -> Result> { + pub async fn get_peers( + &mut self, + topic_bytes: &Vec, + timeout: Option, + ) -> Result> { if self.dht.is_none() { self.reset().await?; } @@ -45,22 +49,30 @@ impl Dht { let dht = self.dht.as_mut().context("DHT not initialized")?; let id = Id::from_bytes(topic_hash_20(topic_bytes))?; - let mut stream = dht.get_signed_peers(id, Some(chrono::Utc::now().timestamp()-60)).await; + let mut stream = dht.get_signed_peers(id).await; let mut results = vec![]; - let deadline = tokio::time::Instant::now() + Duration::from_secs(30); - while let Ok(Some(item)) = tokio::time::timeout_at(deadline, stream.next()).await { - results.push(item); + if let Some(timeout) = timeout { + let deadline = tokio::time::Instant::now() + timeout; + while let Ok(Some(item)) = tokio::time::timeout_at(deadline, stream.next()).await { + results.push(item); + } + } else { + results = stream.collect::>>().await; } Ok(results .iter() .flatten() .filter_map(|item| VerifyingKey::from_bytes(item.key()).ok()) - .collect::>()) + .collect::>()) } - pub async fn announce_self(&mut self, topic_bytes: &Vec) -> Result<()> { + pub async fn announce_self( + &mut self, + topic_bytes: &Vec, + timeout: Option, + ) -> Result { if self.dht.is_none() { self.reset().await?; } @@ -68,13 +80,18 @@ impl Dht { let dht = self.dht.as_mut().context("DHT not initialized")?; let id = Id::from_bytes(topic_hash_20(topic_bytes))?; - tokio::time::timeout( - std::time::Duration::from_secs(5), - dht.announce_signed_peer(id, &self.signing_key), - ) - .await - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Failed to announce signed peer: {e}")) + let announce_future = dht.announce_signed_peer(id, &self.signing_key); + + if let Some(timeout) = timeout { + tokio::time::timeout(timeout, announce_future) + .await + .context("Announce timed out")? + .context("Failed to announce signed peer") + } else { + announce_future + .await + .context("Failed to announce signed peer") + } } } diff --git a/src/experimental/gossip.rs b/src/experimental/gossip.rs index 7c8ca0b..ee9391e 100644 --- a/src/experimental/gossip.rs +++ b/src/experimental/gossip.rs @@ -134,9 +134,9 @@ impl Topic { let wait_dur = { let mut lock = dht.lock().await; - if let Err(e) = lock.announce_self(&topic_bytes).await { + if let Err(e) = lock.announce_self(&topic_bytes, None).await { tracing::warn!("self_announce: {e}"); - backoff += 1; + backoff += 5; Duration::from_secs(backoff.min(300)) } else { tracing::info!("self_announce: success"); @@ -161,15 +161,17 @@ impl Topic { while is_running.load(std::sync::atomic::Ordering::Relaxed) { let peers = { let mut lock = dht.lock().await; - let res = lock.get_peers(&topic_bytes).await; + let res = lock.get_peers(&topic_bytes, None).await; if let Err(e) = res { tracing::error!("bootstrap get_peers error: {e:?}"); - Vec::new() + HashSet::new() } else { res.unwrap_or_default().clone() } }; + + println!("peer-hashes: {}", peers.iter().map(|p| hex::encode(p.as_bytes())).collect::>().join(", ")); tracing::debug!("bootstrap found {} peers", peers.len()); if !peers.is_empty() { let mut added_nodes_lock = added_nodes.lock().await; @@ -202,7 +204,7 @@ impl Topic { if !receiver.is_joined().await || added_nodes.lock().await.len() < 2 { tracing::debug!("not enough peers yet, waiting before next bootstrap attempt"); - backoff += 1; + backoff += 5; tokio::time::sleep(Duration::from_secs(backoff.min(60))).await; } else { backoff = 5; diff --git a/test-e2e.sh b/test-e2e.sh index 466f4c5..013e160 100755 --- a/test-e2e.sh +++ b/test-e2e.sh @@ -7,8 +7,7 @@ set -e echo "Starting end-to-end test..." -#export TOPIC_ID=$RANDOM -export TOPIC_ID=NOTRANDOMSOWECANSEEIFTHE_MORE_RECENT_THAN_PARAM_IN_MAINLINE_FOR_GET_SIGNED_PEERS_WORKS +export TOPIC_ID=$RANDOM # Clean up any existing containers docker compose --file $COMPOSE_FILE down --remove-orphans || true From 898f8a7df9325dc2830df1829a3ed715905fcc29 Mon Sep 17 00:00:00 2001 From: rustonbsd Date: Wed, 5 Nov 2025 15:39:17 +0100 Subject: [PATCH 20/20] fixed default feature exp --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 83e84c3..66f8695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ version = "0.2.4" edition = "2024" [features] -default = ["iroh-gossip","experimental"] +default = ["iroh-gossip"] iroh-gossip = ["dep:iroh", "dep:iroh-gossip"] experimental = ["dep:mainline_exp"]