diff --git a/Cargo.toml b/Cargo.toml index ce46138e1..25a464d28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"] derive_builder = "0.20.2" # Async -tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] } +tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process", "sync"] } tokio-util = { version = "0.7.15", default-features = false, features = ["io"] } tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] } futures-util = "0.3.5" diff --git a/src/config.rs b/src/config.rs index b99eb77db..8a46714ac 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,6 @@ use crate::{cdn::CdnKind, storage::StorageKind}; use anyhow::{Context, Result, anyhow, bail}; -use std::{env::VarError, error::Error, path::PathBuf, str::FromStr, time::Duration}; +use std::{env::VarError, error::Error, io, path, path::PathBuf, str::FromStr, time::Duration}; use tracing::trace; use url::Url; @@ -209,10 +209,10 @@ impl Config { .cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?)) .cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?) .cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?) - .local_archive_cache_path(env( + .local_archive_cache_path(ensure_absolute_path(env( "DOCSRS_ARCHIVE_INDEX_CACHE_PATH", prefix.join("archive_cache"), - )?) + )?)?) .compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?) .temp_dir(temp_dir) .rustwide_workspace(env( @@ -235,6 +235,14 @@ impl Config { } } +fn ensure_absolute_path(path: PathBuf) -> io::Result { + if path.is_absolute() { + Ok(path) + } else { + Ok(path::absolute(&path)?) + } +} + fn env(var: &str, default: T) -> Result where T: FromStr, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 76655714d..37ed63ba3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -22,6 +22,7 @@ use crate::{ }; use anyhow::{anyhow, bail}; use chrono::{DateTime, Utc}; +use dashmap::DashMap; use fn_error_context::context; use futures_util::{TryStreamExt as _, stream::BoxStream}; use mime::Mime; @@ -30,24 +31,27 @@ use std::{ fmt, fs::{self, File}, io::{self, BufReader}, + iter, num::ParseIntError, ops::RangeInclusive, path::{Path, PathBuf}, + str::FromStr, sync::{ Arc, atomic::{AtomicU64, Ordering}, }, }; -use std::{iter, str::FromStr}; use tokio::{ io::{AsyncRead, AsyncWriteExt}, runtime, + sync::RwLock, }; use tracing::{error, info, info_span, instrument, trace, warn}; use tracing_futures::Instrument as _; use walkdir::WalkDir; const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; +const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index"; type FileRange = RangeInclusive; @@ -186,6 +190,8 @@ enum StorageBackend { pub struct AsyncStorage { backend: StorageBackend, config: Arc, + /// Locks to synchronize access to the locally cached archive index files. + locks: DashMap>>, } impl AsyncStorage { @@ -204,6 +210,7 @@ impl AsyncStorage { } }, config, + locks: DashMap::new(), }) } @@ -318,12 +325,10 @@ impl AsyncStorage { path: &str, ) -> Result { match self - .download_archive_index(archive_path, latest_build_id) + .find_in_archive_index(archive_path, latest_build_id, path) .await { - Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path) - .await? - .is_some()), + Ok(file_info) => Ok(file_info.is_some()), Err(err) => { if err.downcast_ref::().is_some() { Ok(false) @@ -384,41 +389,67 @@ impl AsyncStorage { Ok(blob.decompress()) } + fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { + let local_index_path = local_index_path.as_ref().to_path_buf(); + + self.locks + .entry(local_index_path) + .or_insert_with(|| Arc::new(RwLock::new(()))) + .downgrade() + .clone() + } + #[instrument] - pub(super) async fn download_archive_index( + async fn find_in_archive_index( &self, archive_path: &str, latest_build_id: Option, - ) -> Result { - // remote/folder/and/x.zip.index - let remote_index_path = format!("{archive_path}.index"); + path_in_archive: &str, + ) -> Result> { + // we know that config.local_archive_cache_path is an absolute path, not relative. + // So it will be usable as key in the DashMap. let local_index_path = self.config.local_archive_cache_path.join(format!( - "{archive_path}.{}.index", + "{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", latest_build_id.map(|id| id.0).unwrap_or(0) )); - if !local_index_path.exists() { - let index_content = self.get(&remote_index_path, usize::MAX).await?.content; + let rwlock = self.local_index_cache_lock(&local_index_path); - tokio::fs::create_dir_all( - local_index_path - .parent() - .ok_or_else(|| anyhow!("index path without parent"))?, - ) - .await?; + // directly acquire the read-lock, so the syscall (`path.exists()`) below is already + // protected. + let mut _read_guard = rwlock.read().await; + + if !tokio::fs::try_exists(&local_index_path).await? { + // upgrade the lock to a write-lock for downloading & storing the index. + drop(_read_guard); + let _write_guard = rwlock.write().await; - // when we don't have a locally cached index and many parallel request - // we might download the same archive index multiple times here. - // So we're storing the content into a temporary file before renaming it - // into the final location. - let temp_path = tempfile::NamedTempFile::new_in(&self.config.local_archive_cache_path)? - .into_temp_path(); - let mut file = tokio::fs::File::create(&temp_path).await?; - file.write_all(&index_content).await?; - tokio::fs::rename(temp_path, &local_index_path).await?; + // check existence again in case of Race Condition (TOCTOU) + if !tokio::fs::try_exists(&local_index_path).await? { + // remote/folder/and/x.zip.index + let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + + tokio::fs::create_dir_all( + local_index_path + .parent() + .ok_or_else(|| anyhow!("index path without parent"))?, + ) + .await?; + + { + let mut file = tokio::fs::File::create(&local_index_path).await?; + let mut stream = self.get_stream(&remote_index_path).await?.content; + + tokio::io::copy(&mut stream, &mut file).await?; + + file.flush().await?; + } + } + + _read_guard = _write_guard.downgrade(); } - Ok(local_index_path) + archive_index::find_in_file(local_index_path, path_in_archive).await } #[instrument] @@ -429,11 +460,8 @@ impl AsyncStorage { path: &str, max_size: usize, ) -> Result { - let index_filename = self - .download_archive_index(archive_path, latest_build_id) - .await?; - - let info = archive_index::find_in_file(index_filename, path) + let info = self + .find_in_archive_index(archive_path, latest_build_id, path) .await? .ok_or(PathNotFoundError)?; @@ -463,11 +491,8 @@ impl AsyncStorage { latest_build_id: Option, path: &str, ) -> Result { - let index_filename = self - .download_archive_index(archive_path, latest_build_id) - .await?; - - let info = archive_index::find_in_file(index_filename, path) + let info = self + .find_in_archive_index(archive_path, latest_build_id, path) .await? .ok_or(PathNotFoundError)?; @@ -540,7 +565,7 @@ impl AsyncStorage { .await?; let alg = CompressionAlgorithm::default(); - let remote_index_path = format!("{}.index", &archive_path); + let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path); let compressed_index_content = { let _span = info_span!("create_archive_index", %remote_index_path).entered(); @@ -994,17 +1019,6 @@ impl Storage { .block_on(self.inner.get_range(path, max_size, range, compression)) } - pub(super) fn download_index( - &self, - archive_path: &str, - latest_build_id: Option, - ) -> Result { - self.runtime.block_on( - self.inner - .download_archive_index(archive_path, latest_build_id), - ) - } - pub(crate) fn get_from_archive( &self, archive_path: &str, @@ -1801,12 +1815,12 @@ mod backend_tests { .inner .config .local_archive_cache_path - .join("folder/test.zip.0.index"); + .join(format!("folder/test.zip.0.{ARCHIVE_INDEX_FILE_EXTENSION}")); let (stored_files, compression_alg) = storage.store_all_in_archive("folder/test.zip", dir.path())?; - assert!(storage.exists("folder/test.zip.index")?); + assert!(storage.exists(&format!("folder/test.zip.{ARCHIVE_INDEX_FILE_EXTENSION}"))?); assert_eq!(compression_alg, CompressionAlgorithm::Bzip2); assert_eq!(stored_files.len(), files.len());