Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 1 addition & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
derive_builder = "0.20.2"

# Async
async-compression = { version = "0.4.32", features = ["tokio", "bzip2", "zstd", "gzip"] }
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process", "sync"] }
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
Expand All @@ -76,16 +77,6 @@ aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] }
http = "1.0.0"
uuid = { version = "1.1.2", features = ["v4"]}

# freeze async-compression and it's dependencies.
# Currently a part of the archive indexes on S3 are broken, and this old version
# can read the broken indexes.
# Will unfreeze when we recompressed the indexes.
# see:
# * https://github.com/rust-lang/docs.rs/pull/2988
# * https://github.com/Nullus157/async-compression/issues/420
async-compression = { version = "=0.4.32", features = ["tokio", "bzip2", "zstd", "gzip"] }
compression-codecs = "=0.4.31"
compression-core = "=0.4.29"

# Data serialization and deserialization
serde = { version = "1.0", features = ["derive"] }
Expand Down
188 changes: 1 addition & 187 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ pub(crate) fn source_archive_path(name: &str, version: &Version) -> String {
#[cfg(test)]
mod test {
use super::*;
use crate::test::{TestEnvironment, V0_1};
use crate::test::TestEnvironment;
use std::env;
use test_case::test_case;

Expand Down Expand Up @@ -1456,192 +1456,6 @@ mod test {
assert_eq!(detected_mime, expected_mime);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_recompress_just_check() -> Result<()> {
let env = TestEnvironment::with_config(
TestEnvironment::base_config()
.storage_backend(StorageKind::S3)
.build()?,
)
.await?;

let storage = env.async_storage();

const KRATE: &str = "test_crate";
let rid = env
.fake_release()
.await
.name(KRATE)
.version(V0_1)
.archive_storage(true)
.keywords(vec!["kw 1".into(), "kw 2".into()])
.create()
.await?;

// run the recompression logic
let mut conn = env.async_db().async_conn().await;
let (checked, recompressed) = storage
.recompress_index_files_in_bucket(&mut conn, None, None, None)
.await?;
assert_eq!(checked, 2);
assert_eq!(recompressed, 0);

assert!(
storage
.get(&rustdoc_archive_path(KRATE, &V0_1), usize::MAX)
.await
.is_ok()
);
assert!(
storage
.get(&source_archive_path(KRATE, &V0_1), usize::MAX)
.await
.is_ok()
);

// release-id-min = the target release id for the iterator
// (we start at the latest, and go down).
// So setting that "target" to rid.0 + 1 means we stop before we hit our only release.
let (checked, recompressed) = storage
.recompress_index_files_in_bucket(&mut conn, Some(ReleaseId(rid.0 + 1)), None, None)
.await?;
assert_eq!(checked, 0);
assert_eq!(recompressed, 0);

// release-id-max = where we start iterating the releases
// (we start at the max, and go down).
// So setting that "start" to rid.0 - 1 means we start behind our only release
let (checked, recompressed) = storage
.recompress_index_files_in_bucket(&mut conn, None, Some(ReleaseId(rid.0 - 1)), None)
.await?;
assert_eq!(checked, 0);
assert_eq!(recompressed, 0);

// setting min & max to the same value that is also our only release
// tests if we filter as inclusive range.
let (checked, recompressed) = storage
.recompress_index_files_in_bucket(&mut conn, Some(rid), Some(rid), None)
.await?;
assert_eq!(checked, 2);
assert_eq!(recompressed, 0);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_recompress_index_files_in_bucket() -> Result<()> {
use std::io::Cursor;
use tokio::io;

let env = TestEnvironment::with_config(
TestEnvironment::base_config()
.storage_backend(StorageKind::S3)
.build()?,
)
.await?;

const CONTENT: &[u8] = b"Hello, world! Hello, world! Hello, world! Hello, world!";
let alg = Some(CompressionAlgorithm::Zstd);

use async_compression::tokio::write;

let broken_archive = {
// broken compression implementation, `.shutdown` missing.
let mut buf = Vec::new();
let mut enc = write::ZstdEncoder::new(&mut buf);
io::copy(&mut Cursor::new(CONTENT), &mut enc).await?;
// check if it's really broken, EOF missing
assert_ne!(buf.last_chunk::<3>().unwrap(), &ZSTD_EOF_BYTES);
buf
};

const KRATE: &str = "test_crate";
env.fake_release()
.await
.name(KRATE)
.version(V0_1)
.archive_storage(true)
.keywords(vec!["kw 1".into(), "kw 2".into()])
.create()
.await?;

let storage = env.async_storage();
// delete everything in storage created by the fake_release above
for p in &["rustdoc/", "sources/"] {
storage.delete_prefix(p).await?;
}

// use raw inner storage backend so we can fetch the compressed file without automatic
// decompression
let StorageBackend::S3(raw_storage) = &storage.backend else {
panic!("S3 backend set above");
};

let index_path = format!("{}.index", rustdoc_archive_path(KRATE, &V0_1));

// upload as-is to the storage, into the place of an archive index.
// `.store_inner` doesn't compress
storage
.store_inner(vec![Blob {
path: index_path.clone(),
mime: mime::APPLICATION_OCTET_STREAM,
date_updated: Utc::now(),
content: broken_archive.clone(),
compression: alg,
}])
.await?;

// validate how the old compressed blob looks like, even though we just uploaded it
let old_compressed_blob = raw_storage
.get_stream(&index_path, None)
.await?
.materialize(usize::MAX)
.await?;
assert_eq!(old_compressed_blob.compression, alg);

// try getting the decompressed broken blob via normal storage API.
// old async-compression can do this without choking.
assert_eq!(
CONTENT,
&storage.get(&index_path, usize::MAX).await?.content
);

// run the recompression logic
let mut conn = env.async_db().async_conn().await;
let (checked, recompressed) = storage
.recompress_index_files_in_bucket(&mut conn, None, None, None)
.await?;
assert_eq!(checked, 1);
assert_eq!(recompressed, 1);

let new_compressed_blob = raw_storage
.get_stream(&index_path, None)
.await?
.materialize(usize::MAX)
.await?;
assert_eq!(new_compressed_blob.compression, alg);

// after fixing, getting the decompressed blob via normal storage API still works
assert_eq!(
CONTENT,
&storage.get(&index_path, usize::MAX).await?.content
);

// after recompression the content length should be different, 3 bytes more for
// the zstd EOF
assert_eq!(
new_compressed_blob.content.len(),
old_compressed_blob.content.len() + ZSTD_EOF_BYTES.len()
);

assert_eq!(
[&old_compressed_blob.content[..], &ZSTD_EOF_BYTES].concat(),
new_compressed_blob.content
);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_outdated_local_archive_index_gets_redownloaded() -> Result<()> {
use tokio::fs;
Expand Down
Loading