From bc55aada1cf596fa92081f84f5c63d4a17f7c2cd Mon Sep 17 00:00:00 2001 From: ChengyuZhu6 Date: Wed, 5 Nov 2025 22:41:13 +0800 Subject: [PATCH 1/2] transfer service Signed-off-by: ChengyuZhu6 --- cmd/nerdctl/container/container_run_test.go | 2 +- cmd/nerdctl/image/image_load_test.go | 5 +- mod/tigron/test/data.go | 24 +- mod/tigron/test/data_test.go | 2 +- pkg/cmd/image/ensure.go | 5 +- pkg/cmd/image/import.go | 318 ++++++++++++------ pkg/cmd/image/push.go | 56 +-- pkg/cmd/image/save.go | 105 ++++-- pkg/cmd/image/tag.go | 62 +--- pkg/cmd/login/login.go | 2 +- pkg/errutil/errors_check.go | 31 +- .../dockerconfigresolver.go | 25 ++ pkg/imgutil/imgutil.go | 35 +- pkg/imgutil/load/load.go | 118 +++---- pkg/imgutil/transfer.go | 189 +++++++++++ pkg/transferutil/progress.go | 223 ++++++++++++ 16 files changed, 829 insertions(+), 373 deletions(-) create mode 100644 pkg/imgutil/transfer.go create mode 100644 pkg/transferutil/progress.go diff --git a/cmd/nerdctl/container/container_run_test.go b/cmd/nerdctl/container/container_run_test.go index e3d50940f8b..ea424a2c889 100644 --- a/cmd/nerdctl/container/container_run_test.go +++ b/cmd/nerdctl/container/container_run_test.go @@ -786,7 +786,7 @@ func TestRunFromOCIArchive(t *testing.T) { tarPath := fmt.Sprintf("%s/%s.tar", buildCtx, imageName) base.Cmd("build", "--tag", tag, fmt.Sprintf("--output=type=oci,dest=%s", tarPath), buildCtx).AssertOK() - base.Cmd("run", "--rm", fmt.Sprintf("oci-archive://%s", tarPath)).AssertOutContainsAll(fmt.Sprintf("Loaded image: %s", tag), sentinel) + base.Cmd("run", "--rm", fmt.Sprintf("oci-archive://%s", tarPath)).AssertOutContainsAll(tag, sentinel) } func TestRunDomainname(t *testing.T) { diff --git a/cmd/nerdctl/image/image_load_test.go b/cmd/nerdctl/image/image_load_test.go index 2618b81c64f..90e8c970d1a 100644 --- a/cmd/nerdctl/image/image_load_test.go +++ b/cmd/nerdctl/image/image_load_test.go @@ -17,7 +17,6 @@ package image import ( - "fmt" "os" "path/filepath" "strings" @@ -61,7 +60,7 @@ func TestLoadStdinFromPipe(t *testing.T) { identifier := data.Identifier() return &test.Expected{ Output: expect.All( - expect.Contains(fmt.Sprintf("Loaded image: %s:latest", identifier)), + expect.Contains(identifier), func(stdout string, t tig.T) { assert.Assert(t, strings.Contains(helpers.Capture("images"), identifier)) }, @@ -107,7 +106,7 @@ func TestLoadQuiet(t *testing.T) { Expected: func(data test.Data, helpers test.Helpers) *test.Expected { return &test.Expected{ Output: expect.All( - expect.Contains(fmt.Sprintf("Loaded image: %s:latest", data.Identifier())), + expect.Contains(data.Identifier()), expect.DoesNotContain("Loading layer"), ), } diff --git a/mod/tigron/test/data.go b/mod/tigron/test/data.go index eabbe81c379..5fd80e90b29 100644 --- a/mod/tigron/test/data.go +++ b/mod/tigron/test/data.go @@ -31,7 +31,6 @@ import ( const ( identifierMaxLength = 76 - identifierSeparator = "-" identifierSignatureLength = 8 ) @@ -250,27 +249,18 @@ func newData(t tig.T, seed, parent Data) Data { func defaultIdentifierHashing(names ...string) string { // Notes: identifier MAY be used for namespaces, image names, etc. // So, the rules are stringent on what it can contain. - replaceWith := []byte(identifierSeparator) - name := strings.ToLower(strings.Join(names, string(replaceWith))) + // Join names without separator to avoid '-' which causes display issues in progress output + name := strings.ToLower(strings.Join(names, "")) // Ensure we have a unique identifier despite characters replacements // (well, as unique as the names collection being passed) signature := fmt.Sprintf("%x", sha256.Sum256([]byte(name)))[0:identifierSignatureLength] - // Make sure we do not use any unsafe characters - safeName := regexp.MustCompile(`[^a-z0-9-]+`) - // And we avoid repeats of the separator - noRepeat := regexp.MustCompile(fmt.Sprintf(`[%s]{2,}`, replaceWith)) - escapedName := safeName.ReplaceAll([]byte(name), replaceWith) - escapedName = noRepeat.ReplaceAll(escapedName, replaceWith) - // Do not allow trailing or leading dash (as that may stutter) - name = strings.Trim(string(escapedName), string(replaceWith)) + safeName := regexp.MustCompile(`[^a-z0-9]+`) + escapedName := safeName.ReplaceAll([]byte(name), []byte("")) + name = string(escapedName) // Ensure we will never go above 76 characters in length (with signature) - if len(name) > (identifierMaxLength - len(signature)) { - name = name[0 : identifierMaxLength-identifierSignatureLength-len(identifierSeparator)] - } - - if name[len(name)-1:] != identifierSeparator { - signature = identifierSeparator + signature + if len(name) > (identifierMaxLength - identifierSignatureLength) { + name = name[0 : identifierMaxLength-identifierSignatureLength] } return name + signature diff --git a/mod/tigron/test/data_test.go b/mod/tigron/test/data_test.go index 37e0c1c010b..53e3d952fab 100644 --- a/mod/tigron/test/data_test.go +++ b/mod/tigron/test/data_test.go @@ -71,7 +71,7 @@ func TestDataIdentifier(t *testing.T) { assertive.HasPrefix(t, one, "testdataidentifier") three := dataObj.Identifier("Some Add ∞ Funky∞Prefix") - assertive.HasPrefix(t, three, "testdataidentifier-some-add-funky-prefix") + assertive.HasPrefix(t, three, "testdataidentifiersomeaddfunkyprefix") } func TestDataIdentifierThatIsReallyReallyReallyReallyReallyReallyReallyReallyReallyReallyReallyLong( diff --git a/pkg/cmd/image/ensure.go b/pkg/cmd/image/ensure.go index c3315e58c29..f7c64284df0 100644 --- a/pkg/cmd/image/ensure.go +++ b/pkg/cmd/image/ensure.go @@ -18,8 +18,6 @@ package image import ( "context" - "errors" - "net/http" "os" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -80,7 +78,6 @@ func ensureOne(ctx context.Context, client *containerd.Client, rawRef string, ta // Get a resolver var dOpts []dockerconfigresolver.Opt if options.InsecureRegistry { - log.G(ctx).Warnf("skipping verifying HTTPS certs for %q", parsedReference.Domain) dOpts = append(dOpts, dockerconfigresolver.WithSkipVerifyCerts(true)) } dOpts = append(dOpts, dockerconfigresolver.WithHostsDirs(options.HostsDir)) @@ -99,7 +96,7 @@ func ensureOne(ctx context.Context, client *containerd.Client, rawRef string, ta if err != nil { // In some circumstance (e.g. people just use 80 port to support pure http), the error will contain message like "dial tcp : connection refused". - if !errors.Is(err, http.ErrSchemeMismatch) && !errutil.IsErrConnectionRefused(err) { + if !errutil.IsErrHTTPSFallbackNeeded(err) { return err } if options.InsecureRegistry { diff --git a/pkg/cmd/image/import.go b/pkg/cmd/image/import.go index a8e61eb6944..432d5665a90 100644 --- a/pkg/cmd/image/import.go +++ b/pkg/cmd/image/import.go @@ -17,6 +17,7 @@ package image import ( + "archive/tar" "bytes" "compress/gzip" "context" @@ -25,73 +26,190 @@ import ( "encoding/json" "fmt" "io" + "os" + pathpkg "path" "time" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/identity" - "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/content" - "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/leases" + "github.com/containerd/containerd/v2/core/transfer" + tarchive "github.com/containerd/containerd/v2/core/transfer/archive" + transferimage "github.com/containerd/containerd/v2/core/transfer/image" "github.com/containerd/containerd/v2/pkg/archive/compression" "github.com/containerd/errdefs" "github.com/containerd/platforms" "github.com/containerd/nerdctl/v2/pkg/api/types" "github.com/containerd/nerdctl/v2/pkg/referenceutil" + "github.com/containerd/nerdctl/v2/pkg/transferutil" ) func Import(ctx context.Context, client *containerd.Client, options types.ImageImportOptions) (string, error) { - img, err := importRootfs(ctx, client, options.GOptions.Snapshotter, options) + prefix := options.Reference + if prefix == "" { + prefix = fmt.Sprintf("import-%s", time.Now().Format("2006-01-02")) + } + + parsed, err := referenceutil.Parse(prefix) if err != nil { return "", err } - return img.Name, nil + imageName := parsed.String() + + platUnpack := platforms.DefaultSpec() + var opts []transferimage.StoreOpt + if options.Platform != "" { + p, err := platforms.Parse(options.Platform) + if err != nil { + return "", err + } + platUnpack = p + opts = append(opts, transferimage.WithPlatforms(platUnpack)) + } + + opts = append(opts, transferimage.WithUnpack(platUnpack, options.GOptions.Snapshotter)) + opts = append(opts, transferimage.WithDigestRef(imageName, true, true)) + + var r io.ReadCloser + if rc, ok := options.Stdin.(io.ReadCloser); ok { + r = rc + } else { + r = io.NopCloser(options.Stdin) + } + + converted, cleanup, err := ensureOCIArchive(ctx, client, r, options, prefix) + if err != nil { + return "", err + } + defer cleanup() + + iis := tarchive.NewImageImportStream(converted, "") + is := transferimage.NewStore("", opts...) + + pf, done := transferutil.ProgressHandler(ctx, os.Stderr) + defer done() + + if err := client.Transfer(ctx, iis, is, transfer.WithProgress(pf)); err != nil { + return "", err + } + + return imageName, nil +} + +func ensureOCIArchive(ctx context.Context, client *containerd.Client, r io.ReadCloser, options types.ImageImportOptions, prefix string) (io.ReadCloser, func(), error) { + buf := &bytes.Buffer{} + tee := io.TeeReader(r, buf) + + isStandardArchive, err := detectStandardImageArchive(tee) + if err != nil { + return nil, func() {}, err + } + + combined := io.NopCloser(io.MultiReader(buf, r)) + if isStandardArchive { + return combined, func() { r.Close() }, nil + } + + converted, err := convertRootfsToOCIArchive(ctx, client, combined, options, prefix) + if err != nil { + r.Close() + return nil, func() {}, err + } + + cleanup := func() { + r.Close() + if converted != nil { + converted.Close() + } + } + + return converted, cleanup, nil } -func importRootfs(ctx context.Context, client *containerd.Client, snapshotter string, options types.ImageImportOptions) (images.Image, error) { - var zero images.Image +func detectStandardImageArchive(r io.Reader) (bool, error) { + tr := tar.NewReader(r) + const maxHeadersToCheck = 10 + + for i := 0; i < maxHeadersToCheck; i++ { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return false, err + } + + name := pathpkg.Clean(hdr.Name) + if name == "manifest.json" || name == ocispec.ImageLayoutFile { + return true, nil + } + } + return false, nil +} + +func convertRootfsToOCIArchive(ctx context.Context, client *containerd.Client, r io.ReadCloser, options types.ImageImportOptions, prefix string) (io.ReadCloser, error) { + defer r.Close() ctx, done, err := client.WithLease(ctx, leases.WithRandomID(), leases.WithExpiration(1*time.Hour)) if err != nil { - return zero, err + return nil, err } defer done(ctx) - if options.Stdin == nil { - return zero, fmt.Errorf("no input stream provided") - } - decomp, err := compression.DecompressStream(options.Stdin) + decomp, err := compression.DecompressStream(r) if err != nil { - return zero, err + return nil, err } defer decomp.Close() cs := client.ContentStore() - - ref := randomRef("import-rootfs-") + ref := randomRef("import-layer-") w, err := content.OpenWriter(ctx, cs, content.WithRef(ref)) if err != nil { - return zero, err + return nil, err } defer w.Close() + if err := w.Truncate(0); err != nil { - return zero, err + return nil, err + } + + layerDigest, diffID, layerSize, err := compressAndWriteLayer(ctx, w, decomp) + if err != nil { + return nil, err } + imgConfig, configDigest, err := buildImageConfig(diffID, options) + if err != nil { + return nil, err + } + + layerContent, err := readLayerContent(ctx, cs, layerDigest, layerSize) + if err != nil { + return nil, err + } + + return buildDockerArchive(imgConfig, configDigest, layerContent, layerDigest, prefix) +} + +func compressAndWriteLayer(ctx context.Context, w content.Writer, r io.Reader) (digest.Digest, digest.Digest, int64, error) { digester := digest.Canonical.Digester() - tee := io.TeeReader(decomp, digester.Hash()) + tee := io.TeeReader(r, digester.Hash()) pr, pw := io.Pipe() gz := gzip.NewWriter(pw) + doneCh := make(chan error, 1) go func() { - _, err := io.Copy(gz, tee) - if err != nil { - doneCh <- err + defer func() { _ = gz.Close() + }() + + if _, err := io.Copy(gz, tee); err != nil { + doneCh <- err _ = pw.CloseWithError(err) return } @@ -105,10 +223,10 @@ func importRootfs(ctx context.Context, client *containerd.Client, snapshotter st n, err := io.Copy(w, pr) if err != nil { - return zero, err + return "", "", 0, err } if err := <-doneCh; err != nil { - return zero, err + return "", "", 0, err } diffID := digester.Digest() @@ -116,21 +234,18 @@ func importRootfs(ctx context.Context, client *containerd.Client, snapshotter st "containerd.io/uncompressed": diffID.String(), } if err := w.Commit(ctx, n, "", content.WithLabels(labels)); err != nil && !errdefs.IsAlreadyExists(err) { - return zero, err - } - layerDesc := ocispec.Descriptor{ - MediaType: images.MediaTypeDockerSchema2LayerGzip, - Digest: w.Digest(), - Size: n, + return "", "", 0, err } + return w.Digest(), diffID, n, nil +} + +func buildImageConfig(diffID digest.Digest, options types.ImageImportOptions) ([]byte, digest.Digest, error) { ociplat := platforms.DefaultSpec() if options.Platform != "" { - p, err := platforms.Parse(options.Platform) - if err != nil { - return zero, err + if p, err := platforms.Parse(options.Platform); err == nil { + ociplat = p } - ociplat = p } created := time.Now().UTC() @@ -153,100 +268,85 @@ func importRootfs(ctx context.Context, client *containerd.Client, snapshotter st }}, } - manifestDesc, _, err := writeConfigAndManifest(ctx, cs, snapshotter, imgConfig, []ocispec.Descriptor{layerDesc}) + configJSON, err := json.Marshal(imgConfig) if err != nil { - return zero, err + return nil, "", err } + return configJSON, digest.FromBytes(configJSON), nil +} - storedName := options.Reference - if storedName == "" { - storedName = manifestDesc.Digest.String() - } else if refParsed, err := referenceutil.Parse(storedName); err == nil { - if refParsed.ExplicitTag == "" { - storedName = refParsed.FamiliarName() + ":latest" - } - if p2, err := referenceutil.Parse(storedName); err == nil { - storedName = p2.String() - } +func readLayerContent(ctx context.Context, cs content.Store, layerDigest digest.Digest, size int64) ([]byte, error) { + ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: layerDigest, Size: size}) + if err != nil { + return nil, err } - name := storedName + defer ra.Close() - img := images.Image{ - Name: name, - Target: manifestDesc, - CreatedAt: time.Now(), - } - if _, err := client.ImageService().Update(ctx, img); err != nil { - if !errdefs.IsNotFound(err) { - return zero, err - } - if _, err := client.ImageService().Create(ctx, img); err != nil { - return zero, err - } + layerContent := make([]byte, size) + if _, err := ra.ReadAt(layerContent, 0); err != nil { + return nil, err } + return layerContent, nil +} + +func buildDockerArchive(configJSON []byte, configDigest digest.Digest, layerContent []byte, layerDigest digest.Digest, prefix string) (io.ReadCloser, error) { + layerFileName := layerDigest.Encoded() + ".tar.gz" + configFileName := configDigest.Encoded() + ".json" - cimg := containerd.NewImage(client, img) - if err := cimg.Unpack(ctx, snapshotter); err != nil { - return zero, err + var repoTags []string + if parsed, err := referenceutil.Parse(prefix); err == nil && parsed.String() != "" { + repoTags = []string{parsed.String()} } - return img, nil -} -func randomRef(prefix string) string { - var b [6]byte - _, _ = rand.Read(b[:]) - return prefix + base64.RawURLEncoding.EncodeToString(b[:]) -} + dockerManifest := []struct { + Config string `json:"Config"` + RepoTags []string `json:"RepoTags,omitempty"` + Layers []string `json:"Layers"` + }{{ + Config: configFileName, + RepoTags: repoTags, + Layers: []string{layerFileName}, + }} -func writeConfigAndManifest(ctx context.Context, cs content.Store, snapshotter string, config ocispec.Image, layers []ocispec.Descriptor) (ocispec.Descriptor, digest.Digest, error) { - configJSON, err := json.Marshal(config) + dockerManifestJSON, err := json.Marshal(dockerManifest) if err != nil { - return ocispec.Descriptor{}, "", err - } - configDesc := ocispec.Descriptor{ - MediaType: images.MediaTypeDockerSchema2Config, - Digest: digest.FromBytes(configJSON), - Size: int64(len(configJSON)), + return nil, err } - gcLabel := map[string]string{} - if len(config.RootFS.DiffIDs) > 0 && snapshotter != "" { - gcLabel[fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snapshotter)] = identity.ChainID(config.RootFS.DiffIDs).String() - } - if err := content.WriteBlob(ctx, cs, configDesc.Digest.String(), bytes.NewReader(configJSON), configDesc, content.WithLabels(gcLabel)); err != nil && !errdefs.IsAlreadyExists(err) { - return ocispec.Descriptor{}, "", err - } + buf := &bytes.Buffer{} + tw := tar.NewWriter(buf) - manifest := struct { - MediaType string `json:"mediaType,omitempty"` - ocispec.Manifest + files := []struct { + name string + content []byte }{ - MediaType: images.MediaTypeDockerSchema2Manifest, - Manifest: ocispec.Manifest{ - Versioned: specs.Versioned{SchemaVersion: 2}, - Config: configDesc, - Layers: layers, - }, - } - manifestJSON, err := json.Marshal(manifest) - if err != nil { - return ocispec.Descriptor{}, "", err - } - manifestDesc := ocispec.Descriptor{ - MediaType: images.MediaTypeDockerSchema2Manifest, - Digest: digest.FromBytes(manifestJSON), - Size: int64(len(manifestJSON)), + {"manifest.json", dockerManifestJSON}, + {configFileName, configJSON}, + {layerFileName, layerContent}, } - refLabels := map[string]string{ - "containerd.io/gc.ref.content.0": configDesc.Digest.String(), - } - for i, l := range layers { - refLabels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = l.Digest.String() + for _, f := range files { + if err := tw.WriteHeader(&tar.Header{ + Name: f.name, + Mode: 0644, + Size: int64(len(f.content)), + }); err != nil { + return nil, err + } + if _, err := tw.Write(f.content); err != nil { + return nil, err + } } - if err := content.WriteBlob(ctx, cs, manifestDesc.Digest.String(), bytes.NewReader(manifestJSON), manifestDesc, content.WithLabels(refLabels)); err != nil && !errdefs.IsAlreadyExists(err) { - return ocispec.Descriptor{}, "", err + + if err := tw.Close(); err != nil { + return nil, err } - return manifestDesc, configDesc.Digest, nil + return io.NopCloser(buf), nil +} + +func randomRef(prefix string) string { + var b [6]byte + _, _ = rand.Read(b[:]) + return prefix + base64.RawURLEncoding.EncodeToString(b[:]) } diff --git a/pkg/cmd/image/push.go b/pkg/cmd/image/push.go index 8731b0cfc94..7f961c43d6c 100644 --- a/pkg/cmd/image/push.go +++ b/pkg/cmd/image/push.go @@ -18,10 +18,8 @@ package image import ( "context" - "errors" "fmt" "io" - "net/http" "os" "path/filepath" @@ -32,9 +30,6 @@ import ( "github.com/containerd/containerd/v2/core/content" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/images/converter" - "github.com/containerd/containerd/v2/core/remotes" - "github.com/containerd/containerd/v2/core/remotes/docker" - dockerconfig "github.com/containerd/containerd/v2/core/remotes/docker/config" "github.com/containerd/containerd/v2/pkg/reference" "github.com/containerd/log" "github.com/containerd/stargz-snapshotter/estargz" @@ -42,10 +37,8 @@ import ( estargzconvert "github.com/containerd/stargz-snapshotter/nativeconverter/estargz" "github.com/containerd/nerdctl/v2/pkg/api/types" - "github.com/containerd/nerdctl/v2/pkg/errutil" + "github.com/containerd/nerdctl/v2/pkg/imgutil" nerdconverter "github.com/containerd/nerdctl/v2/pkg/imgutil/converter" - "github.com/containerd/nerdctl/v2/pkg/imgutil/dockerconfigresolver" - "github.com/containerd/nerdctl/v2/pkg/imgutil/push" "github.com/containerd/nerdctl/v2/pkg/internal/filesystem" "github.com/containerd/nerdctl/v2/pkg/ipfs" "github.com/containerd/nerdctl/v2/pkg/platformutil" @@ -110,7 +103,6 @@ func Push(ctx context.Context, client *containerd.Client, rawRef string, options return err } ref := parsedReference.String() - refDomain := parsedReference.Domain platMC, err := platformutil.NewMatchComparer(options.AllPlatforms, options.Platforms) if err != nil { @@ -147,51 +139,7 @@ func Push(ctx context.Context, client *containerd.Client, rawRef string, options log.G(ctx).Infof("pushing as an eStargz image (%s, %s)", esgzImg.Target.MediaType, esgzImg.Target.Digest) } - // In order to push images where most layers are the same but the - // repository name is different, it is necessary to refresh the - // PushTracker. Otherwise, the MANIFEST_BLOB_UNKNOWN error will occur due - // to the registry not creating the corresponding layer link file, - // resulting in the failure of the entire image push. - pushTracker := docker.NewInMemoryTracker() - - pushFunc := func(r remotes.Resolver) error { - return push.Push(ctx, client, r, pushTracker, options.Stdout, pushRef, ref, platMC, options.AllowNondistributableArtifacts, options.Quiet) - } - - var dOpts []dockerconfigresolver.Opt - if options.GOptions.InsecureRegistry { - log.G(ctx).Warnf("skipping verifying HTTPS certs for %q", refDomain) - dOpts = append(dOpts, dockerconfigresolver.WithSkipVerifyCerts(true)) - } - dOpts = append(dOpts, dockerconfigresolver.WithHostsDirs(options.GOptions.HostsDir)) - - ho, err := dockerconfigresolver.NewHostOptions(ctx, refDomain, dOpts...) - if err != nil { - return err - } - - resolverOpts := docker.ResolverOptions{ - Tracker: pushTracker, - Hosts: dockerconfig.ConfigureHosts(ctx, *ho), - } - - resolver := docker.NewResolver(resolverOpts) - if err = pushFunc(resolver); err != nil { - // In some circumstance (e.g. people just use 80 port to support pure http), the error will contain message like "dial tcp : connection refused" - if !errors.Is(err, http.ErrSchemeMismatch) && !errutil.IsErrConnectionRefused(err) { - return err - } - if options.GOptions.InsecureRegistry { - log.G(ctx).WithError(err).Warnf("server %q does not seem to support HTTPS, falling back to plain HTTP", refDomain) - dOpts = append(dOpts, dockerconfigresolver.WithPlainHTTP(true)) - resolver, err = dockerconfigresolver.New(ctx, refDomain, dOpts...) - if err != nil { - return err - } - return pushFunc(resolver) - } - log.G(ctx).WithError(err).Errorf("server %q does not seem to support HTTPS", refDomain) - log.G(ctx).Info("Hint: you may want to try --insecure-registry to allow plain HTTP (if you are in a trusted network)") + if err := imgutil.PushImageWithTransfer(ctx, client, parsedReference, pushRef, ref, options); err != nil { return err } diff --git a/pkg/cmd/image/save.go b/pkg/cmd/image/save.go index 0a499b3f135..a35a83a97f5 100644 --- a/pkg/cmd/image/save.go +++ b/pkg/cmd/image/save.go @@ -19,55 +19,104 @@ package image import ( "context" "fmt" + "io" + "os" + + "github.com/distribution/reference" + "github.com/opencontainers/go-digest" containerd "github.com/containerd/containerd/v2/client" - "github.com/containerd/containerd/v2/core/images/archive" + "github.com/containerd/containerd/v2/core/transfer" + tarchive "github.com/containerd/containerd/v2/core/transfer/archive" + transferimage "github.com/containerd/containerd/v2/core/transfer/image" + "github.com/containerd/platforms" "github.com/containerd/nerdctl/v2/pkg/api/types" - "github.com/containerd/nerdctl/v2/pkg/idutil/imagewalker" "github.com/containerd/nerdctl/v2/pkg/platformutil" "github.com/containerd/nerdctl/v2/pkg/strutil" + "github.com/containerd/nerdctl/v2/pkg/transferutil" ) // Save exports `images` to a `io.Writer` (e.g., a file writer, or os.Stdout) specified by `options.Stdout`. -func Save(ctx context.Context, client *containerd.Client, images []string, options types.ImageSaveOptions, exportOpts ...archive.ExportOpt) error { +func Save(ctx context.Context, client *containerd.Client, images []string, options types.ImageSaveOptions) error { images = strutil.DedupeStrSlice(images) + var exportOpts []tarchive.ExportOpt + + if len(options.Platform) > 0 { + for _, ps := range options.Platform { + p, err := platforms.Parse(ps) + if err != nil { + return fmt.Errorf("invalid platform %q: %w", ps, err) + } + exportOpts = append(exportOpts, tarchive.WithPlatform(p)) + } + } + if options.AllPlatforms { + exportOpts = append(exportOpts, tarchive.WithAllPlatforms) + } + platMC, err := platformutil.NewMatchComparer(options.AllPlatforms, options.Platform) if err != nil { return err } - exportOpts = append(exportOpts, archive.WithPlatform(platMC)) - imageStore := client.ImageService() + imageService := client.ImageService() + var storeOpts []transferimage.StoreOpt + for _, img := range images { + var imageRef string - savedImages := make(map[string]struct{}) - walker := &imagewalker.ImageWalker{ - Client: client, - OnFound: func(ctx context.Context, found imagewalker.Found) error { - if found.UniqueImages > 1 { - return fmt.Errorf("ambiguous digest ID: multiple IDs found with provided prefix %s", found.Req) + var dgst digest.Digest + var err error + if dgst, err = digest.Parse(img); err != nil { + if dgst, err = digest.Parse("sha256:" + img); err != nil { + named, err := reference.ParseNormalizedNamed(img) + if err != nil { + return fmt.Errorf("invalid image name %q: %w", img, err) + } + imageRef = reference.TagNameOnly(named).String() + err = EnsureAllContent(ctx, client, imageRef, platMC, options.GOptions) + if err != nil { + return err + } + storeOpts = append(storeOpts, transferimage.WithExtraReference(imageRef)) + continue } + } - // Ensure all the layers are here: https://github.com/containerd/nerdctl/issues/3425 - err = EnsureAllContent(ctx, client, found.Image.Name, platMC, options.GOptions) - if err != nil { - return err - } + filters := []string{fmt.Sprintf("target.digest~=^%s$", dgst.String())} + imageList, err := imageService.List(ctx, filters...) + if err != nil { + return fmt.Errorf("failed to list images: %w", err) + } + if len(imageList) == 0 { + return fmt.Errorf("image %q: not found", img) + } - imgName := found.Image.Name - if _, ok := savedImages[imgName]; !ok { - savedImages[imgName] = struct{}{} - exportOpts = append(exportOpts, archive.WithImage(imageStore, imgName)) - } - return nil - }, + imageRef = imageList[0].Name + err = EnsureAllContent(ctx, client, imageRef, platMC, options.GOptions) + if err != nil { + return err + } + storeOpts = append(storeOpts, transferimage.WithExtraReference(imageRef)) } - // check if all images exist - if err := walker.WalkAll(ctx, images, false); err != nil { - return err - } + w := nopWriteCloser{options.Stdout} + + pf, done := transferutil.ProgressHandler(ctx, os.Stderr) + defer done() + + return client.Transfer(ctx, + transferimage.NewStore("", storeOpts...), + tarchive.NewImageExportStream(w, "", exportOpts...), + transfer.WithProgress(pf), + ) +} + +type nopWriteCloser struct { + io.Writer +} - return client.Export(ctx, options.Stdout, exportOpts...) +func (nopWriteCloser) Close() error { + return nil } diff --git a/pkg/cmd/image/tag.go b/pkg/cmd/image/tag.go index 60ab191d4f7..ed50bb063e8 100644 --- a/pkg/cmd/image/tag.go +++ b/pkg/cmd/image/tag.go @@ -18,79 +18,41 @@ package image import ( "context" - "fmt" containerd "github.com/containerd/containerd/v2/client" - "github.com/containerd/containerd/v2/core/images" - "github.com/containerd/errdefs" - "github.com/containerd/log" + transferimage "github.com/containerd/containerd/v2/core/transfer/image" "github.com/containerd/nerdctl/v2/pkg/api/types" - "github.com/containerd/nerdctl/v2/pkg/idutil/imagewalker" "github.com/containerd/nerdctl/v2/pkg/platformutil" "github.com/containerd/nerdctl/v2/pkg/referenceutil" ) func Tag(ctx context.Context, client *containerd.Client, options types.ImageTagOptions) error { - imageService := client.ImageService() - var srcName string - walker := &imagewalker.ImageWalker{ - Client: client, - OnFound: func(ctx context.Context, found imagewalker.Found) error { - if srcName == "" { - srcName = found.Image.Name - } - return nil - }, - } - matchCount, err := walker.Walk(ctx, options.Source) - if err != nil { - return err - } - if matchCount < 1 { - return fmt.Errorf("%s: not found", options.Source) - } + return tagWithTransfer(ctx, client, options) +} - parsedReference, err := referenceutil.Parse(options.Target) +func tagWithTransfer(ctx context.Context, client *containerd.Client, options types.ImageTagOptions) error { + parsedSource, err := referenceutil.Parse(options.Source) if err != nil { return err } - ctx, done, err := client.WithLease(ctx) + parsedTarget, err := referenceutil.Parse(options.Target) if err != nil { return err } - defer done(ctx) - // Ensure all the layers are here: https://github.com/containerd/nerdctl/issues/3425 - platMC, err := platformutil.NewMatchComparer(true, nil) + platMC, err := platformutil.NewMatchComparer(false, nil) if err != nil { return err } - - err = EnsureAllContent(ctx, client, srcName, platMC, options.GOptions) - if err != nil { - log.G(ctx).Warn("Unable to fetch missing layers before committing. " + - "If you try to save or push this image, it might fail. See https://github.com/containerd/nerdctl/issues/3439.") - } - - img, err := imageService.Get(ctx, srcName) + err = EnsureAllContent(ctx, client, parsedSource.String(), platMC, options.GOptions) if err != nil { return err } - img.Name = parsedReference.String() - if _, err = imageService.Create(ctx, img); err != nil { - if errdefs.IsAlreadyExists(err) { - if err = imageService.Delete(ctx, img.Name, images.SynchronousDelete()); err != nil { - return err - } - if _, err = imageService.Create(ctx, img); err != nil { - return err - } - } else { - return err - } - } - return nil + sourceStore := transferimage.NewStore(parsedSource.String()) + targetStore := transferimage.NewStore(parsedTarget.String()) + + return client.Transfer(ctx, sourceStore, targetStore) } diff --git a/pkg/cmd/login/login.go b/pkg/cmd/login/login.go index 773bf8edc76..56f7f2583ee 100644 --- a/pkg/cmd/login/login.go +++ b/pkg/cmd/login/login.go @@ -150,7 +150,7 @@ func loginClientSide(ctx context.Context, globalOptions types.GlobalCommandOptio } for i, rh := range regHosts { err = tryLoginWithRegHost(ctx, rh) - if err != nil && globalOptions.InsecureRegistry && (errors.Is(err, http.ErrSchemeMismatch) || errutil.IsErrConnectionRefused(err)) { + if err != nil && globalOptions.InsecureRegistry && errutil.IsErrHTTPSFallbackNeeded(err) { rh.Scheme = "http" err = tryLoginWithRegHost(ctx, rh) } diff --git a/pkg/errutil/errors_check.go b/pkg/errutil/errors_check.go index 202c4fe8518..26b63ccfc9d 100644 --- a/pkg/errutil/errors_check.go +++ b/pkg/errutil/errors_check.go @@ -16,7 +16,11 @@ package errutil -import "strings" +import ( + "errors" + "net/http" + "strings" +) // IsErrConnectionRefused return whether err is // "connect: connection refused" @@ -24,3 +28,28 @@ func IsErrConnectionRefused(err error) bool { const errMessage = "connect: connection refused" return strings.Contains(err.Error(), errMessage) } + +// IsErrHTTPResponseToHTTPSClient returns whether err is +// "server gave HTTP response to HTTPS client" +func IsErrHTTPResponseToHTTPSClient(err error) bool { + if err == nil { + return false + } + errMsg := err.Error() + return strings.Contains(errMsg, "server gave HTTP response to HTTPS client") +} + +// IsErrHTTPSFallbackNeeded returns whether the error indicates that +// HTTPS connection failed and should fallback to plain HTTP. +// This includes: +// - http.ErrSchemeMismatch +// - connection refused errors +// - "server gave HTTP response to HTTPS client" errors +func IsErrHTTPSFallbackNeeded(err error) bool { + if err == nil { + return false + } + return errors.Is(err, http.ErrSchemeMismatch) || + IsErrConnectionRefused(err) || + IsErrHTTPResponseToHTTPSClient(err) +} diff --git a/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go b/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go index 8577b8e2bc6..6dcdcd59a45 100644 --- a/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go +++ b/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go @@ -24,6 +24,7 @@ import ( "github.com/containerd/containerd/v2/core/remotes" "github.com/containerd/containerd/v2/core/remotes/docker" dockerconfig "github.com/containerd/containerd/v2/core/remotes/docker/config" + "github.com/containerd/containerd/v2/core/transfer/registry" "github.com/containerd/errdefs" "github.com/containerd/log" ) @@ -193,3 +194,27 @@ func NewAuthCreds(refHostname string) (AuthCreds, error) { return credFunc, nil } + +func NewCredentialHelper(refHostname string) (registry.CredentialHelper, error) { + authCreds, err := NewAuthCreds(refHostname) + if err != nil { + return nil, err + } + return &credentialHelper{authCreds: authCreds}, nil +} + +type credentialHelper struct { + authCreds AuthCreds +} + +func (ch *credentialHelper) GetCredentials(ctx context.Context, ref, host string) (registry.Credentials, error) { + username, secret, err := ch.authCreds(host) + if err != nil { + return registry.Credentials{}, err + } + return registry.Credentials{ + Host: host, + Username: username, + Secret: secret, + }, nil +} diff --git a/pkg/imgutil/imgutil.go b/pkg/imgutil/imgutil.go index 08d10be6437..e7ba4c3c22e 100644 --- a/pkg/imgutil/imgutil.go +++ b/pkg/imgutil/imgutil.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "reflect" "github.com/opencontainers/image-spec/identity" @@ -39,7 +38,6 @@ import ( "github.com/containerd/platforms" "github.com/containerd/nerdctl/v2/pkg/api/types" - "github.com/containerd/nerdctl/v2/pkg/errutil" "github.com/containerd/nerdctl/v2/pkg/healthcheck" "github.com/containerd/nerdctl/v2/pkg/idutil/imagewalker" "github.com/containerd/nerdctl/v2/pkg/imgutil/dockerconfigresolver" @@ -133,38 +131,7 @@ func EnsureImage(ctx context.Context, client *containerd.Client, rawRef string, return nil, err } - var dOpts []dockerconfigresolver.Opt - if options.GOptions.InsecureRegistry { - log.G(ctx).Warnf("skipping verifying HTTPS certs for %q", parsedReference.Domain) - dOpts = append(dOpts, dockerconfigresolver.WithSkipVerifyCerts(true)) - } - dOpts = append(dOpts, dockerconfigresolver.WithHostsDirs(options.GOptions.HostsDir)) - resolver, err := dockerconfigresolver.New(ctx, parsedReference.Domain, dOpts...) - if err != nil { - return nil, err - } - - img, err := PullImage(ctx, client, resolver, parsedReference.String(), options) - if err != nil { - // In some circumstance (e.g. people just use 80 port to support pure http), the error will contain message like "dial tcp : connection refused". - if !errors.Is(err, http.ErrSchemeMismatch) && !errutil.IsErrConnectionRefused(err) { - return nil, err - } - if options.GOptions.InsecureRegistry { - log.G(ctx).WithError(err).Warnf("server %q does not seem to support HTTPS, falling back to plain HTTP", parsedReference.Domain) - dOpts = append(dOpts, dockerconfigresolver.WithPlainHTTP(true)) - resolver, err = dockerconfigresolver.New(ctx, parsedReference.Domain, dOpts...) - if err != nil { - return nil, err - } - return PullImage(ctx, client, resolver, parsedReference.String(), options) - } - log.G(ctx).WithError(err).Errorf("server %q does not seem to support HTTPS", parsedReference.Domain) - log.G(ctx).Info("Hint: you may want to try --insecure-registry to allow plain HTTP (if you are in a trusted network)") - return nil, err - - } - return img, nil + return PullImageWithTransfer(ctx, client, parsedReference, rawRef, options) } // ResolveDigest resolves `rawRef` and returns its descriptor digest. diff --git a/pkg/imgutil/load/load.go b/pkg/imgutil/load/load.go index 0afb322f4e4..5e80096d5db 100644 --- a/pkg/imgutil/load/load.go +++ b/pkg/imgutil/load/load.go @@ -20,19 +20,19 @@ import ( "context" "errors" "fmt" - "io" "os" "strings" containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/images" - "github.com/containerd/containerd/v2/core/images/archive" - "github.com/containerd/containerd/v2/pkg/archive/compression" + "github.com/containerd/containerd/v2/core/transfer" + tarchive "github.com/containerd/containerd/v2/core/transfer/archive" + transferimage "github.com/containerd/containerd/v2/core/transfer/image" "github.com/containerd/platforms" "github.com/containerd/nerdctl/v2/pkg/api/types" - "github.com/containerd/nerdctl/v2/pkg/imgutil" "github.com/containerd/nerdctl/v2/pkg/platformutil" + "github.com/containerd/nerdctl/v2/pkg/transferutil" ) // FromArchive loads and unpacks the images from the tar archive specified in image load options. @@ -54,27 +54,59 @@ func FromArchive(ctx context.Context, client *containerd.Client, options types.I return nil, errors.New("stdin is empty and input flag is not specified") } } - decompressor, err := compression.DecompressStream(options.Stdin) - if err != nil { + + if _, err := platformutil.NewMatchComparer(options.AllPlatforms, options.Platform); err != nil { return nil, err } - platMC, err := platformutil.NewMatchComparer(options.AllPlatforms, options.Platform) + + imageService := client.ImageService() + beforeImages, err := imageService.List(ctx) if err != nil { return nil, err } - imgs, err := importImages(ctx, client, decompressor, options.GOptions.Snapshotter, platMC) - if err != nil { - return nil, err + beforeSet := make(map[string]bool) + for _, img := range beforeImages { + beforeSet[img.Name] = true } - unpackedImages := make([]images.Image, 0, len(imgs)) - for _, img := range imgs { - err := unpackImage(ctx, client, img, platMC, options) + + var storeOpts []transferimage.StoreOpt + platUnpack := platforms.DefaultSpec() + if len(options.Platform) > 0 { + p, err := platforms.Parse(options.Platform[0]) if err != nil { - return unpackedImages, fmt.Errorf("error unpacking image (%s): %w", img.Name, err) + return nil, fmt.Errorf("invalid platform %q: %w", options.Platform[0], err) } - unpackedImages = append(unpackedImages, img) + platUnpack = p + storeOpts = append(storeOpts, transferimage.WithPlatforms(p)) + } else if !options.AllPlatforms { + storeOpts = append(storeOpts, transferimage.WithPlatforms(platUnpack)) } - return unpackedImages, nil + storeOpts = append(storeOpts, transferimage.WithUnpack(platUnpack, options.GOptions.Snapshotter)) + storeOpts = append(storeOpts, transferimage.WithDigestRef("import", true, true)) + + var loadedImages []images.Image + pf, done := transferutil.ProgressHandler(ctx, options.Stdout) + defer done() + + err = client.Transfer(ctx, + tarchive.NewImageImportStream(options.Stdin, ""), + transferimage.NewStore("", storeOpts...), + transfer.WithProgress(func(p transfer.Progress) { + if p.Event == "saved" { + if img, err := imageService.Get(ctx, p.Name); err == nil { + if !beforeSet[img.Name] { + loadedImages = append(loadedImages, img) + if !options.Quiet { + fmt.Fprintf(options.Stdout, "Loaded image: %s\n", img.Name) + } + } + } + } + pf(p) + }), + ) + + return loadedImages, err } // FromOCIArchive loads and unpacks the images from the OCI formatted archive at the provided file system path. @@ -95,57 +127,3 @@ func FromOCIArchive(ctx context.Context, client *containerd.Client, pathToOCIArc return FromArchive(ctx, client, options) } - -type readCounter struct { - io.Reader - N int -} - -func (r *readCounter) Read(p []byte) (int, error) { - n, err := r.Reader.Read(p) - if n > 0 { - r.N += n - } - return n, err -} - -func importImages(ctx context.Context, client *containerd.Client, in io.Reader, snapshotter string, platformMC platforms.MatchComparer) ([]images.Image, error) { - // In addition to passing WithImagePlatform() to client.Import(), we also need to pass WithDefaultPlatform() to NewClient(). - // Otherwise unpacking may fail. - r := &readCounter{Reader: in} - imgs, err := client.Import(ctx, r, - containerd.WithDigestRef(archive.DigestTranslator(snapshotter)), - containerd.WithSkipDigestRef(func(name string) bool { return name != "" }), - containerd.WithImportPlatform(platformMC), - ) - if err != nil { - if r.N == 0 { - // Avoid confusing "unrecognized image format" - return nil, errors.New("no image was built") - } - if errors.Is(err, images.ErrEmptyWalk) { - err = fmt.Errorf("%w (Hint: set `--platform=PLATFORM` or `--all-platforms`)", err) - } - return nil, err - } - return imgs, nil -} - -func unpackImage(ctx context.Context, client *containerd.Client, model images.Image, platform platforms.MatchComparer, options types.ImageLoadOptions) error { - image := containerd.NewImageWithPlatform(client, model, platform) - - if !options.Quiet { - fmt.Fprintf(options.Stdout, "unpacking %s (%s)...\n", model.Name, model.Target.Digest) - } - - err := image.Unpack(ctx, options.GOptions.Snapshotter) - if err != nil { - return err - } - - // Loaded message is shown even when quiet. - repo, tag := imgutil.ParseRepoTag(model.Name) - fmt.Fprintf(options.Stdout, "Loaded image: %s:%s\n", repo, tag) - - return nil -} diff --git a/pkg/imgutil/transfer.go b/pkg/imgutil/transfer.go new file mode 100644 index 00000000000..e32ff1b98de --- /dev/null +++ b/pkg/imgutil/transfer.go @@ -0,0 +1,189 @@ +package imgutil + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + + containerd "github.com/containerd/containerd/v2/client" + "github.com/containerd/containerd/v2/core/remotes/docker" + "github.com/containerd/containerd/v2/core/transfer" + transferimage "github.com/containerd/containerd/v2/core/transfer/image" + "github.com/containerd/containerd/v2/core/transfer/registry" + "github.com/containerd/log" + + "github.com/containerd/nerdctl/v2/pkg/api/types" + "github.com/containerd/nerdctl/v2/pkg/errutil" + "github.com/containerd/nerdctl/v2/pkg/imgutil/dockerconfigresolver" + "github.com/containerd/nerdctl/v2/pkg/platformutil" + "github.com/containerd/nerdctl/v2/pkg/referenceutil" + "github.com/containerd/nerdctl/v2/pkg/transferutil" +) + +func prepareImageStore(ctx context.Context, parsedReference *referenceutil.ImageReference, options types.ImagePullOptions) (*transferimage.Store, error) { + var storeOpts []transferimage.StoreOpt + if len(options.OCISpecPlatform) > 0 { + storeOpts = append(storeOpts, transferimage.WithPlatforms(options.OCISpecPlatform...)) + } + + unpackEnabled := len(options.OCISpecPlatform) == 1 + if options.Unpack != nil { + unpackEnabled = *options.Unpack + if unpackEnabled && len(options.OCISpecPlatform) != 1 { + return nil, fmt.Errorf("unpacking requires a single platform to be specified (e.g., --platform=amd64)") + } + } + + if unpackEnabled { + platform := options.OCISpecPlatform[0] + snapshotter := options.GOptions.Snapshotter + storeOpts = append(storeOpts, transferimage.WithUnpack(platform, snapshotter)) + } + + return transferimage.NewStore(parsedReference.String(), storeOpts...), nil +} + +func createOCIRegistry(ctx context.Context, parsedReference *referenceutil.ImageReference, gOptions types.GlobalCommandOptions, plainHTTP bool) (*registry.OCIRegistry, error) { + ch, err := dockerconfigresolver.NewCredentialHelper(parsedReference.Domain) + if err != nil { + return nil, err + } + + opts := []registry.Opt{ + registry.WithCredentials(ch), + } + + if len(gOptions.HostsDir) > 0 { + opts = append(opts, registry.WithHostDir(gOptions.HostsDir[0])) + } + + if isLocalHost, err := docker.MatchLocalhost(parsedReference.Domain); err != nil { + return nil, err + } else if isLocalHost || plainHTTP { + opts = append(opts, registry.WithDefaultScheme("http")) + } + + return registry.NewOCIRegistry(ctx, parsedReference.String(), opts...) +} + +func PullImageWithTransfer(ctx context.Context, client *containerd.Client, parsedReference *referenceutil.ImageReference, rawRef string, options types.ImagePullOptions) (*EnsuredImage, error) { + store, err := prepareImageStore(ctx, parsedReference, options) + if err != nil { + return nil, err + } + + progressWriter := options.Stderr + if options.ProgressOutputToStdout { + progressWriter = options.Stdout + } + + fetcher, err := createOCIRegistry(ctx, parsedReference, options.GOptions, false) + if err != nil { + return nil, err + } + + transferErr := doTransfer(ctx, client, fetcher, store, options.Quiet, progressWriter) + + if transferErr != nil && errutil.IsErrHTTPSFallbackNeeded(transferErr) { + if options.GOptions.InsecureRegistry { + log.G(ctx).WithError(transferErr).Warnf("server %q does not seem to support HTTPS, falling back to plain HTTP", parsedReference.Domain) + fetcher, err = createOCIRegistry(ctx, parsedReference, options.GOptions, true) + if err != nil { + return nil, err + } + transferErr = doTransfer(ctx, client, fetcher, store, options.Quiet, progressWriter) + } + } + + if transferErr != nil { + return nil, transferErr + } + + imageStore := client.ImageService() + stored, err := store.Get(ctx, imageStore) + if err != nil { + return nil, err + } + + plMatch := platformutil.NewMatchComparerFromOCISpecPlatformSlice(options.OCISpecPlatform) + containerdImage := containerd.NewImageWithPlatform(client, stored, plMatch) + imgConfig, err := getImageConfig(ctx, containerdImage) + if err != nil { + return nil, err + } + + snapshotter := options.GOptions.Snapshotter + snOpt := getSnapshotterOpts(snapshotter) + + return &EnsuredImage{ + Ref: rawRef, + Image: containerdImage, + ImageConfig: *imgConfig, + Snapshotter: snapshotter, + Remote: snOpt.isRemote(), + }, nil +} + +func preparePushStore(pushRef string, options types.ImagePushOptions) (*transferimage.Store, error) { + platformsSlice, err := platformutil.NewOCISpecPlatformSlice(options.AllPlatforms, options.Platforms) + if err != nil { + return nil, err + } + + storeOpts := []transferimage.StoreOpt{} + if len(platformsSlice) > 0 { + storeOpts = append(storeOpts, transferimage.WithPlatforms(platformsSlice...)) + } + + return transferimage.NewStore(pushRef, storeOpts...), nil +} + +func PushImageWithTransfer(ctx context.Context, client *containerd.Client, parsedReference *referenceutil.ImageReference, pushRef, rawRef string, options types.ImagePushOptions) error { + source, err := preparePushStore(pushRef, options) + if err != nil { + return err + } + + progressWriter := io.Discard + if options.Stdout != nil { + progressWriter = options.Stdout + } + + pusher, err := createOCIRegistry(ctx, parsedReference, options.GOptions, false) + if err != nil { + return err + } + + transferErr := doTransfer(ctx, client, source, pusher, options.Quiet, progressWriter) + + if transferErr != nil && (errors.Is(transferErr, http.ErrSchemeMismatch) || errutil.IsErrConnectionRefused(transferErr)) { + if options.GOptions.InsecureRegistry { + log.G(ctx).WithError(err).Warnf("server %q does not seem to support HTTPS, falling back to plain HTTP", parsedReference.Domain) + pusher, err = createOCIRegistry(ctx, parsedReference, options.GOptions, true) + if err != nil { + return err + } + transferErr = doTransfer(ctx, client, source, pusher, options.Quiet, progressWriter) + } + } + + if transferErr != nil { + log.G(ctx).WithError(transferErr).Errorf("server %q does not seem to support HTTPS", parsedReference.Domain) + log.G(ctx).Info("Hint: you may want to try --insecure-registry to allow plain HTTP (if you are in a trusted network)") + return transferErr + } + + return nil +} + +func doTransfer(ctx context.Context, client *containerd.Client, src, dst interface{}, quiet bool, progressWriter io.Writer) error { + opts := make([]transfer.Opt, 0, 1) + if !quiet { + pf, done := transferutil.ProgressHandler(ctx, progressWriter) + defer done() + opts = append(opts, transfer.WithProgress(pf)) + } + return client.Transfer(ctx, src, dst, opts...) +} diff --git a/pkg/transferutil/progress.go b/pkg/transferutil/progress.go new file mode 100644 index 00000000000..bad351432c9 --- /dev/null +++ b/pkg/transferutil/progress.go @@ -0,0 +1,223 @@ +package transferutil + +import ( + "context" + "fmt" + "io" + "strings" + "time" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + + "github.com/containerd/containerd/v2/core/transfer" + "github.com/containerd/containerd/v2/pkg/progress" +) + +// From https://github.com/containerd/containerd/blob/v2.2.0-rc.0/cmd/ctr/commands/image/pull.go#L240-L473 +type progressNode struct { + transfer.Progress + children []*progressNode + root bool +} + +func (n *progressNode) mainDesc() *ocispec.Descriptor { + if n.Desc != nil { + return n.Desc + } + for _, c := range n.children { + if desc := c.mainDesc(); desc != nil { + return desc + } + } + return nil +} + +// ProgressHandler returns a progress callback and a cleanup function to render transfer progress. +// This implementation is based on containerd's ctr command progress handler. +func ProgressHandler(ctx context.Context, out io.Writer) (transfer.ProgressFunc, func()) { + ctx, cancel := context.WithCancel(ctx) + var ( + fw = progress.NewWriter(out) + start = time.Now() + statuses = map[string]*progressNode{} + roots = []*progressNode{} + pc = make(chan transfer.Progress, 5) + status string + closeC = make(chan struct{}) + ) + + progressFn := func(p transfer.Progress) { + select { + case pc <- p: + case <-ctx.Done(): + } + } + + done := func() { + cancel() + <-closeC + } + + go func() { + defer close(closeC) + for { + select { + case p := <-pc: + if p.Name == "" { + status = p.Event + continue + } + if node, ok := statuses[p.Name]; !ok { + node = &progressNode{ + Progress: p, + root: true, + } + if len(p.Parents) == 0 { + roots = append(roots, node) + } else { + var parents []string + for _, parent := range p.Parents { + pStatus, ok := statuses[parent] + if ok { + parents = append(parents, parent) + pStatus.children = append(pStatus.children, node) + node.root = false + } + } + node.Progress.Parents = parents + if node.root { + roots = append(roots, node) + } + } + statuses[p.Name] = node + } else { + if len(node.Progress.Parents) != len(p.Parents) { + var parents []string + var removeRoot bool + for _, parent := range p.Parents { + pStatus, ok := statuses[parent] + if ok { + parents = append(parents, parent) + var found bool + for _, child := range pStatus.children { + if child.Progress.Name == p.Name { + found = true + break + } + } + if !found { + pStatus.children = append(pStatus.children, node) + } + if node.root { + removeRoot = true + } + node.root = false + } + } + p.Parents = parents + // Check if needs to remove from root + if removeRoot { + for i := range roots { + if roots[i] == node { + roots = append(roots[:i], roots[i+1:]...) + break + } + } + } + } + node.Progress = p + } + + displayHierarchy(fw, status, roots, start) + fw.Flush() + + case <-ctx.Done(): + return + } + } + }() + + return progressFn, done +} + +func displayHierarchy(w io.Writer, status string, roots []*progressNode, start time.Time) { + total := displayNode(w, "", roots) + for _, r := range roots { + if desc := r.mainDesc(); desc != nil { + fmt.Fprintf(w, "%s %s\n", desc.MediaType, desc.Digest) + } + } + // Print the Status line + fmt.Fprintf(w, "%s\telapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n", + status, + time.Since(start).Seconds(), + progress.Bytes(total), + progress.NewBytesPerSecond(total, time.Since(start))) +} + +func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 { + var total int64 + for i, node := range nodes { + status := node.Progress + total += status.Progress + pf, cpf := prefixes(i, len(nodes)) + if node.root { + pf, cpf = "", "" + } + + name := prefix + pf + displayName(status.Name) + + switch status.Event { + case "downloading", "uploading", "extracting": + var bar progress.Bar + if status.Total > 0.0 { + bar = progress.Bar(float64(status.Progress) / float64(status.Total)) + } + fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t%8.8s/%s\t\n", + name, + status.Event, + bar, + progress.Bytes(status.Progress), progress.Bytes(status.Total)) + case "resolving", "waiting": + bar := progress.Bar(0.0) + fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n", + name, + status.Event, + bar) + case "complete", "extracted": + bar := progress.Bar(1.0) + fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n", + name, + status.Event, + bar) + default: + fmt.Fprintf(w, "%-40.40s\t%s\t\n", + name, + status.Event) + } + total += displayNode(w, prefix+cpf, node.children) + } + return total +} + +func prefixes(index, length int) (string, string) { + if index+1 == length { + return "└──", " " + } + return "├──", "│ " +} + +func displayName(name string) string { + parts := strings.Split(name, "-") + for i := range parts { + parts[i] = shortenName(parts[i]) + } + return strings.Join(parts, " ") +} + +func shortenName(name string) string { + if strings.HasPrefix(name, "sha256:") && len(name) == 71 { + return "(" + name[7:19] + ")" + } + return name +} From 18b044843a32443ace2dc57d5d3bac798f603685 Mon Sep 17 00:00:00 2001 From: ChengyuZhu6 Date: Tue, 11 Nov 2025 23:09:46 +0800 Subject: [PATCH 2/2] fix remote snapshotter Signed-off-by: ChengyuZhu6 --- Dockerfile.d/etc_containerd_config.toml | 9 +++++++++ ...est-integration-etc_containerd_config.toml | 18 +++++++++++++++++ Dockerfile.d/test-integration-rootless.sh | 11 +++++++++- .../rootless/containerd-rootless-setuptool.sh | 20 ++++++++++++++++++- 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/Dockerfile.d/etc_containerd_config.toml b/Dockerfile.d/etc_containerd_config.toml index dccac081af4..583ebcc3d46 100644 --- a/Dockerfile.d/etc_containerd_config.toml +++ b/Dockerfile.d/etc_containerd_config.toml @@ -5,3 +5,12 @@ version = 2 [proxy_plugins.stargz] type = "snapshot" address = "/run/containerd-stargz-grpc/containerd-stargz-grpc.sock" + [proxy_plugins.stargz.exports] + root = "/var/lib/containerd-stargz-grpc/" + enable_remote_snapshot_annotations = "true" +[[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "overlayfs" +[[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "stargz" diff --git a/Dockerfile.d/test-integration-etc_containerd_config.toml b/Dockerfile.d/test-integration-etc_containerd_config.toml index d37df58da75..0a6cc862e77 100644 --- a/Dockerfile.d/test-integration-etc_containerd_config.toml +++ b/Dockerfile.d/test-integration-etc_containerd_config.toml @@ -5,8 +5,26 @@ version = 2 [proxy_plugins.stargz] type = "snapshot" address = "/run/containerd-stargz-grpc/containerd-stargz-grpc.sock" + [proxy_plugins.stargz.exports] + root = "/var/lib/containerd-stargz-grpc/" + enable_remote_snapshot_annotations = "true" # Enable soci snapshotter [proxy_plugins.soci] type = "snapshot" address = "/run/soci-snapshotter-grpc/soci-snapshotter-grpc.sock" + [proxy_plugins.soci.exports] + root = "/var/lib/soci-snapshotter-grpc" + enable_remote_snapshot_annotations = "true" + +[[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "overlayfs" + +[[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "soci" + +[[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "stargz" diff --git a/Dockerfile.d/test-integration-rootless.sh b/Dockerfile.d/test-integration-rootless.sh index f6e243f32b5..14d84d463be 100755 --- a/Dockerfile.d/test-integration-rootless.sh +++ b/Dockerfile.d/test-integration-rootless.sh @@ -53,6 +53,15 @@ else [proxy_plugins."stargz"] type = "snapshot" address = "/run/user/$(id -u)/containerd-stargz-grpc/containerd-stargz-grpc.sock" + [proxy_plugins.stargz.exports] + root = "/home/rootless/.local/share/containerd-stargz-grpc/" + enable_remote_snapshot_annotations = "true" +[[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "overlayfs" +[[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "stargz" EOF systemctl --user restart containerd.service containerd-rootless-setuptool.sh -- install-ipfs --init --offline # offline ipfs daemon for testing @@ -64,4 +73,4 @@ EOF cd /go/src/github.com/containerd/nerdctl # We also lose the PATH (and SendEnv=PATH would require sshd config changes) exec env PATH="/usr/local/go/bin:$PATH" "$@" -fi +fi \ No newline at end of file diff --git a/extras/rootless/containerd-rootless-setuptool.sh b/extras/rootless/containerd-rootless-setuptool.sh index 27627640d51..ddbccf115a7 100755 --- a/extras/rootless/containerd-rootless-setuptool.sh +++ b/extras/rootless/containerd-rootless-setuptool.sh @@ -404,6 +404,15 @@ cmd_entrypoint_install_fuse_overlayfs() { [proxy_plugins."fuse-overlayfs"] type = "snapshot" address = "${XDG_RUNTIME_DIR}/containerd-fuse-overlayfs.sock" + [proxy_plugins."fuse-overlayfs".exports] + root = "${XDG_DATA_HOME}/containerd-fuse-overlayfs/" + enable_remote_snapshot_annotations = "true" + [[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "fuse-overlayfs" + [[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "overlayfs" ### END ### EOT INFO "Set \`export CONTAINERD_SNAPSHOTTER=\"fuse-overlayfs\"\` to use the fuse-overlayfs snapshotter." @@ -449,6 +458,15 @@ cmd_entrypoint_install_stargz() { [proxy_plugins."stargz"] type = "snapshot" address = "${XDG_RUNTIME_DIR}/containerd-stargz-grpc/containerd-stargz-grpc.sock" + [proxy_plugins.stargz.exports] + root = "${XDG_DATA_HOME}/containerd-stargz-grpc/" + enable_remote_snapshot_annotations = "true" + [[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "stargz" + [[plugins."io.containerd.transfer.v1.local".unpack_config]] + platform = "linux" + snapshotter = "overlayfs" ### END ### EOT INFO "Set \`export CONTAINERD_SNAPSHOTTER=\"stargz\"\` to use the stargz snapshotter." @@ -652,4 +670,4 @@ fi # main shift -"cmd_entrypoint_${command}" "$@" +"cmd_entrypoint_${command}" "$@" \ No newline at end of file