Skip to content

Commit 0edfa0e

Browse files
committed
introduce RwLock to control access to local archive index files
1 parent 7dfb7f4 commit 0edfa0e

File tree

3 files changed

+83
-58
lines changed

3 files changed

+83
-58
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
6464
derive_builder = "0.20.2"
6565

6666
# Async
67-
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
67+
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process", "sync"] }
6868
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
6969
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
7070
futures-util = "0.3.5"

src/config.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{cdn::CdnKind, storage::StorageKind};
22
use anyhow::{Context, Result, anyhow, bail};
3-
use std::{env::VarError, error::Error, path::PathBuf, str::FromStr, time::Duration};
3+
use std::{env::VarError, error::Error, io, path, path::PathBuf, str::FromStr, time::Duration};
44
use tracing::trace;
55
use url::Url;
66

@@ -209,10 +209,10 @@ impl Config {
209209
.cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?))
210210
.cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?)
211211
.cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?)
212-
.local_archive_cache_path(env(
212+
.local_archive_cache_path(ensure_absolute_path(env(
213213
"DOCSRS_ARCHIVE_INDEX_CACHE_PATH",
214214
prefix.join("archive_cache"),
215-
)?)
215+
)?)?)
216216
.compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?)
217217
.temp_dir(temp_dir)
218218
.rustwide_workspace(env(
@@ -235,6 +235,14 @@ impl Config {
235235
}
236236
}
237237

238+
fn ensure_absolute_path(path: PathBuf) -> io::Result<PathBuf> {
239+
if path.is_absolute() {
240+
Ok(path)
241+
} else {
242+
Ok(path::absolute(&path)?)
243+
}
244+
}
245+
238246
fn env<T>(var: &str, default: T) -> Result<T>
239247
where
240248
T: FromStr,

src/storage/mod.rs

Lines changed: 71 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,33 @@ use crate::{
2222
};
2323
use anyhow::anyhow;
2424
use chrono::{DateTime, Utc};
25+
use dashmap::DashMap;
2526
use fn_error_context::context;
26-
use futures_util::stream::BoxStream;
27+
use futures_util::{StreamExt as _, stream::BoxStream};
2728
use mime::Mime;
2829
use path_slash::PathExt;
2930
use std::{
3031
fmt,
3132
fs::{self, File},
3233
io::{self, BufReader},
34+
iter,
3335
num::ParseIntError,
3436
ops::RangeInclusive,
3537
path::{Path, PathBuf},
38+
str::FromStr,
3639
sync::Arc,
40+
time::{Instant, SystemTime},
3741
};
38-
use std::{iter, str::FromStr};
3942
use tokio::{
4043
io::{AsyncRead, AsyncWriteExt},
4144
runtime,
45+
sync::RwLock,
4246
};
43-
use tracing::{error, info_span, instrument, trace};
47+
use tracing::{debug, error, info_span, instrument, trace};
4448
use walkdir::WalkDir;
4549

50+
static ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";
51+
4652
type FileRange = RangeInclusive<u64>;
4753

4854
#[derive(Debug, thiserror::Error)]
@@ -181,6 +187,8 @@ enum StorageBackend {
181187
pub struct AsyncStorage {
182188
backend: StorageBackend,
183189
config: Arc<Config>,
190+
/// Locks to synchronize access to the locally cached archive index files.
191+
locks: DashMap<PathBuf, Arc<RwLock<()>>>,
184192
}
185193

186194
impl AsyncStorage {
@@ -199,6 +207,7 @@ impl AsyncStorage {
199207
}
200208
},
201209
config,
210+
locks: DashMap::new(),
202211
})
203212
}
204213

@@ -313,12 +322,10 @@ impl AsyncStorage {
313322
path: &str,
314323
) -> Result<bool> {
315324
match self
316-
.download_archive_index(archive_path, latest_build_id)
325+
.find_in_archive_index(archive_path, latest_build_id, path)
317326
.await
318327
{
319-
Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path)
320-
.await?
321-
.is_some()),
328+
Ok(file_info) => Ok(file_info.is_some()),
322329
Err(err) => {
323330
if err.downcast_ref::<PathNotFoundError>().is_some() {
324331
Ok(false)
@@ -375,41 +382,67 @@ impl AsyncStorage {
375382
Ok(blob.decompress())
376383
}
377384

385+
fn local_index_cache_lock(&self, local_index_path: impl AsRef<Path>) -> Arc<RwLock<()>> {
386+
let local_index_path = local_index_path.as_ref().to_path_buf();
387+
388+
self.locks
389+
.entry(local_index_path)
390+
.or_insert_with(|| Arc::new(RwLock::new(())))
391+
.downgrade()
392+
.clone()
393+
}
394+
378395
#[instrument]
379-
pub(super) async fn download_archive_index(
396+
async fn find_in_archive_index(
380397
&self,
381398
archive_path: &str,
382399
latest_build_id: Option<BuildId>,
383-
) -> Result<PathBuf> {
384-
// remote/folder/and/x.zip.index
385-
let remote_index_path = format!("{archive_path}.index");
400+
path_in_archive: &str,
401+
) -> Result<Option<archive_index::FileInfo>> {
402+
// we know that config.local_archive_cache_path is an absolute path, not relative.
403+
// So it will be usable as key in the DashMap.
386404
let local_index_path = self.config.local_archive_cache_path.join(format!(
387-
"{archive_path}.{}.index",
405+
"{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}",
388406
latest_build_id.map(|id| id.0).unwrap_or(0)
389407
));
390408

391-
if !local_index_path.exists() {
392-
let index_content = self.get(&remote_index_path, usize::MAX).await?.content;
409+
let rwlock = self.local_index_cache_lock(&local_index_path);
393410

394-
tokio::fs::create_dir_all(
395-
local_index_path
396-
.parent()
397-
.ok_or_else(|| anyhow!("index path without parent"))?,
398-
)
399-
.await?;
411+
// directly acquire the read-lock, so the syscall (`path.exists()`) below is already
412+
// protected.
413+
let mut _read_guard = rwlock.read().await;
414+
415+
if !tokio::fs::try_exists(&local_index_path).await? {
416+
// upgrade the lock to a write-lock for downloading & storing the index.
417+
drop(_read_guard);
418+
let _write_guard = rwlock.write().await;
419+
420+
// check existance again in case of Race Condition (TOCTOU)
421+
if !tokio::fs::try_exists(&local_index_path).await? {
422+
// remote/folder/and/x.zip.index
423+
let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");
424+
425+
tokio::fs::create_dir_all(
426+
local_index_path
427+
.parent()
428+
.ok_or_else(|| anyhow!("index path without parent"))?,
429+
)
430+
.await?;
431+
432+
{
433+
let mut file = tokio::fs::File::create(&local_index_path).await?;
434+
let mut stream = self.get_stream(&remote_index_path).await?.content;
435+
436+
tokio::io::copy(&mut stream, &mut file).await?;
437+
438+
file.flush().await?;
439+
}
440+
}
400441

401-
// when we don't have a locally cached index and many parallel request
402-
// we might download the same archive index multiple times here.
403-
// So we're storing the content into a temporary file before renaming it
404-
// into the final location.
405-
let temp_path = tempfile::NamedTempFile::new_in(&self.config.local_archive_cache_path)?
406-
.into_temp_path();
407-
let mut file = tokio::fs::File::create(&temp_path).await?;
408-
file.write_all(&index_content).await?;
409-
tokio::fs::rename(temp_path, &local_index_path).await?;
442+
_read_guard = _write_guard.downgrade();
410443
}
411444

412-
Ok(local_index_path)
445+
archive_index::find_in_file(local_index_path, path_in_archive).await
413446
}
414447

415448
#[instrument]
@@ -420,11 +453,8 @@ impl AsyncStorage {
420453
path: &str,
421454
max_size: usize,
422455
) -> Result<Blob> {
423-
let index_filename = self
424-
.download_archive_index(archive_path, latest_build_id)
425-
.await?;
426-
427-
let info = archive_index::find_in_file(index_filename, path)
456+
let info = self
457+
.find_in_archive_index(archive_path, latest_build_id, path)
428458
.await?
429459
.ok_or(PathNotFoundError)?;
430460

@@ -454,11 +484,8 @@ impl AsyncStorage {
454484
latest_build_id: Option<BuildId>,
455485
path: &str,
456486
) -> Result<StreamingBlob> {
457-
let index_filename = self
458-
.download_archive_index(archive_path, latest_build_id)
459-
.await?;
460-
461-
let info = archive_index::find_in_file(index_filename, path)
487+
let info = self
488+
.find_in_archive_index(archive_path, latest_build_id, path)
462489
.await?
463490
.ok_or(PathNotFoundError)?;
464491

@@ -531,7 +558,7 @@ impl AsyncStorage {
531558
.await?;
532559

533560
let alg = CompressionAlgorithm::default();
534-
let remote_index_path = format!("{}.index", &archive_path);
561+
let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path);
535562
let compressed_index_content = {
536563
let _span = info_span!("create_archive_index", %remote_index_path).entered();
537564

@@ -843,17 +870,6 @@ impl Storage {
843870
.block_on(self.inner.get_range(path, max_size, range, compression))
844871
}
845872

846-
pub(super) fn download_index(
847-
&self,
848-
archive_path: &str,
849-
latest_build_id: Option<BuildId>,
850-
) -> Result<PathBuf> {
851-
self.runtime.block_on(
852-
self.inner
853-
.download_archive_index(archive_path, latest_build_id),
854-
)
855-
}
856-
857873
pub(crate) fn get_from_archive(
858874
&self,
859875
archive_path: &str,
@@ -1387,16 +1403,17 @@ mod backend_tests {
13871403
fs::write(path, "data")?;
13881404
}
13891405

1406+
let remote_index_filename = format!("folder/test.zip.0.{ARCHIVE_INDEX_FILE_EXTENSION}");
13901407
let local_index_location = storage
13911408
.inner
13921409
.config
13931410
.local_archive_cache_path
1394-
.join("folder/test.zip.0.index");
1411+
.join(&remote_index_filename);
13951412

13961413
let (stored_files, compression_alg) =
13971414
storage.store_all_in_archive("folder/test.zip", dir.path())?;
13981415

1399-
assert!(storage.exists("folder/test.zip.index")?);
1416+
assert!(storage.exists(&remote_index_filename)?);
14001417

14011418
assert_eq!(compression_alg, CompressionAlgorithm::Bzip2);
14021419
assert_eq!(stored_files.len(), files.len());

0 commit comments

Comments
 (0)