From 2569a34b648e7b8f0a744a8918e6b84f5984778d Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 14 Nov 2025 21:03:41 +0100 Subject: [PATCH 1/2] update async-compression again --- Cargo.lock | 26 ++++++++++++-------------- Cargo.toml | 11 +---------- 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60b3d2a3d..432c9db09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -364,9 +364,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.32" +version = "0.4.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a89bce6054c720275ac2432fbba080a66a2106a44a1b804553930ca6909f4e0" +checksum = "93c1f86859c1af3d514fa19e8323147ff10ea98684e6c7b307912509f50e67b2" dependencies = [ "compression-codecs", "compression-core", @@ -1181,9 +1181,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.10.1" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "bytes-utils" @@ -1249,9 +1249,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.45" +version = "1.2.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35900b6c8d709fb1d854671ae27aeaa9eec2f8b01b364e1619a40da3e6fe2afe" +checksum = "b97463e1064cb1b1c1384ad0a0b9c8abd0988e2a91f52606c80ef14aadb63e36" dependencies = [ "find-msvc-tools", "jobserver", @@ -1408,9 +1408,9 @@ dependencies = [ [[package]] name = "compression-codecs" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef8a506ec4b81c460798f572caead636d57d3d7e940f998160f52bd254bf2d23" +checksum = "680dc087785c5230f8e8843e2e57ac7c1c90488b6a91b88caa265410568f441b" dependencies = [ "bzip2", "compression-core", @@ -1422,9 +1422,9 @@ dependencies = [ [[package]] name = "compression-core" -version = "0.4.29" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" +checksum = "3a9b614a5787ef0c8802a55766480563cb3a93b435898c422ed2a359cf811582" [[package]] name = "comrak" @@ -2046,8 +2046,6 @@ dependencies = [ "bzip2", "chrono", "clap", - "compression-codecs", - "compression-core", "comrak", "constant_time_eq", "crates-index", @@ -2296,9 +2294,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" [[package]] name = "findshlibs" diff --git a/Cargo.toml b/Cargo.toml index 25a464d28..cfe026c3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"] derive_builder = "0.20.2" # Async +async-compression = { version = "0.4.32", features = ["tokio", "bzip2", "zstd", "gzip"] } tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process", "sync"] } tokio-util = { version = "0.7.15", default-features = false, features = ["io"] } tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] } @@ -76,16 +77,6 @@ aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } http = "1.0.0" uuid = { version = "1.1.2", features = ["v4"]} -# freeze async-compression and it's dependencies. -# Currently a part of the archive indexes on S3 are broken, and this old version -# can read the broken indexes. -# Will unfreeze when we recompressed the indexes. -# see: -# * https://github.com/rust-lang/docs.rs/pull/2988 -# * https://github.com/Nullus157/async-compression/issues/420 -async-compression = { version = "=0.4.32", features = ["tokio", "bzip2", "zstd", "gzip"] } -compression-codecs = "=0.4.31" -compression-core = "=0.4.29" # Data serialization and deserialization serde = { version = "1.0", features = ["derive"] } From 9d11b5c4d48f997c0dd5b8b62d3219b3ee467969 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Mon, 17 Nov 2025 08:52:30 +0100 Subject: [PATCH 2/2] remove breaking test --- src/storage/mod.rs | 188 +-------------------------------------------- 1 file changed, 1 insertion(+), 187 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8b2400c1d..ec0f8703e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1241,7 +1241,7 @@ pub(crate) fn source_archive_path(name: &str, version: &Version) -> String { #[cfg(test)] mod test { use super::*; - use crate::test::{TestEnvironment, V0_1}; + use crate::test::TestEnvironment; use std::env; use test_case::test_case; @@ -1456,192 +1456,6 @@ mod test { assert_eq!(detected_mime, expected_mime); } - #[tokio::test(flavor = "multi_thread")] - async fn test_recompress_just_check() -> Result<()> { - let env = TestEnvironment::with_config( - TestEnvironment::base_config() - .storage_backend(StorageKind::S3) - .build()?, - ) - .await?; - - let storage = env.async_storage(); - - const KRATE: &str = "test_crate"; - let rid = env - .fake_release() - .await - .name(KRATE) - .version(V0_1) - .archive_storage(true) - .keywords(vec!["kw 1".into(), "kw 2".into()]) - .create() - .await?; - - // run the recompression logic - let mut conn = env.async_db().async_conn().await; - let (checked, recompressed) = storage - .recompress_index_files_in_bucket(&mut conn, None, None, None) - .await?; - assert_eq!(checked, 2); - assert_eq!(recompressed, 0); - - assert!( - storage - .get(&rustdoc_archive_path(KRATE, &V0_1), usize::MAX) - .await - .is_ok() - ); - assert!( - storage - .get(&source_archive_path(KRATE, &V0_1), usize::MAX) - .await - .is_ok() - ); - - // release-id-min = the target release id for the iterator - // (we start at the latest, and go down). - // So setting that "target" to rid.0 + 1 means we stop before we hit our only release. - let (checked, recompressed) = storage - .recompress_index_files_in_bucket(&mut conn, Some(ReleaseId(rid.0 + 1)), None, None) - .await?; - assert_eq!(checked, 0); - assert_eq!(recompressed, 0); - - // release-id-max = where we start iterating the releases - // (we start at the max, and go down). - // So setting that "start" to rid.0 - 1 means we start behind our only release - let (checked, recompressed) = storage - .recompress_index_files_in_bucket(&mut conn, None, Some(ReleaseId(rid.0 - 1)), None) - .await?; - assert_eq!(checked, 0); - assert_eq!(recompressed, 0); - - // setting min & max to the same value that is also our only release - // tests if we filter as inclusive range. - let (checked, recompressed) = storage - .recompress_index_files_in_bucket(&mut conn, Some(rid), Some(rid), None) - .await?; - assert_eq!(checked, 2); - assert_eq!(recompressed, 0); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_recompress_index_files_in_bucket() -> Result<()> { - use std::io::Cursor; - use tokio::io; - - let env = TestEnvironment::with_config( - TestEnvironment::base_config() - .storage_backend(StorageKind::S3) - .build()?, - ) - .await?; - - const CONTENT: &[u8] = b"Hello, world! Hello, world! Hello, world! Hello, world!"; - let alg = Some(CompressionAlgorithm::Zstd); - - use async_compression::tokio::write; - - let broken_archive = { - // broken compression implementation, `.shutdown` missing. - let mut buf = Vec::new(); - let mut enc = write::ZstdEncoder::new(&mut buf); - io::copy(&mut Cursor::new(CONTENT), &mut enc).await?; - // check if it's really broken, EOF missing - assert_ne!(buf.last_chunk::<3>().unwrap(), &ZSTD_EOF_BYTES); - buf - }; - - const KRATE: &str = "test_crate"; - env.fake_release() - .await - .name(KRATE) - .version(V0_1) - .archive_storage(true) - .keywords(vec!["kw 1".into(), "kw 2".into()]) - .create() - .await?; - - let storage = env.async_storage(); - // delete everything in storage created by the fake_release above - for p in &["rustdoc/", "sources/"] { - storage.delete_prefix(p).await?; - } - - // use raw inner storage backend so we can fetch the compressed file without automatic - // decompression - let StorageBackend::S3(raw_storage) = &storage.backend else { - panic!("S3 backend set above"); - }; - - let index_path = format!("{}.index", rustdoc_archive_path(KRATE, &V0_1)); - - // upload as-is to the storage, into the place of an archive index. - // `.store_inner` doesn't compress - storage - .store_inner(vec![Blob { - path: index_path.clone(), - mime: mime::APPLICATION_OCTET_STREAM, - date_updated: Utc::now(), - content: broken_archive.clone(), - compression: alg, - }]) - .await?; - - // validate how the old compressed blob looks like, even though we just uploaded it - let old_compressed_blob = raw_storage - .get_stream(&index_path, None) - .await? - .materialize(usize::MAX) - .await?; - assert_eq!(old_compressed_blob.compression, alg); - - // try getting the decompressed broken blob via normal storage API. - // old async-compression can do this without choking. - assert_eq!( - CONTENT, - &storage.get(&index_path, usize::MAX).await?.content - ); - - // run the recompression logic - let mut conn = env.async_db().async_conn().await; - let (checked, recompressed) = storage - .recompress_index_files_in_bucket(&mut conn, None, None, None) - .await?; - assert_eq!(checked, 1); - assert_eq!(recompressed, 1); - - let new_compressed_blob = raw_storage - .get_stream(&index_path, None) - .await? - .materialize(usize::MAX) - .await?; - assert_eq!(new_compressed_blob.compression, alg); - - // after fixing, getting the decompressed blob via normal storage API still works - assert_eq!( - CONTENT, - &storage.get(&index_path, usize::MAX).await?.content - ); - - // after recompression the content length should be different, 3 bytes more for - // the zstd EOF - assert_eq!( - new_compressed_blob.content.len(), - old_compressed_blob.content.len() + ZSTD_EOF_BYTES.len() - ); - - assert_eq!( - [&old_compressed_blob.content[..], &ZSTD_EOF_BYTES].concat(), - new_compressed_blob.content - ); - - Ok(()) - } - #[tokio::test(flavor = "multi_thread")] async fn test_outdated_local_archive_index_gets_redownloaded() -> Result<()> { use tokio::fs;