本文主要解决以下问题,以当前 release 最新版 v2.8.3 源码进行分析(最新 tag 为 v3.0.0-rc.2)

  • distribution 作为 mirror (即pull cache) 的实现原理是什么?首次时数据怎么落盘,第二次本地有数据了,如何返回给客户端?
  • storage 中 cache 中存的内容是什么?如何加快速度
  • 如何查看 docker pull 时真正的请求地址

TL;DR

  • 对于问题一

    • 当接收到 client 请求时,以 blob 为例,经过一系列调用,会调用 GetBlob ,如果是首次,则会回源 proxy 并将数据在写入磁盘后,再返回给 client,如果是非首次,那么直接从磁盘返回数据。如果配置了 cache,那么每次执行前,都会尝试访问 cache 以加快索引
  • 对于问题二

    • 本例中测试的 blob 会存入以下数据

      hider
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      127.0.0.1:6379> keys *
      1) "repository::library/alpine::blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
      2) "repository::library/alpine::blobs"
      3) "blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
      127.0.0.1:6379> HGETALL "repository::library/alpine::blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
      1) "mediatype"
      2) "application/octet-stream"
      127.0.0.1:6379> SMEMBERS "repository::library/alpine::blobs"
      1) "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
      127.0.0.1:6379> HGETALL "blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
      1) "size"
      2) "3418409"
      3) "mediatype"
      4) "application/octet-stream"
      5) "digest"
      6) "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
      127.0.0.1:6379>
    • 因为对于请求 url,如果是首次,那么就会经过一系列计算逻辑,得到这些值,因此存入 redis 后,可快速构建 HTTP 响应返回 client,加快速度

  • 对于问题三

    • registry 启动后,可在 proxy.log 查看请求日志

在开始之前

官方文档 中关于 mirror 的运作原理如下,可以看到 mirror 在首次被请求时,会回源数据并缓存到本地存储,后续请求则会直接从 mirror 本地返回

How does it work?

The first time you request an image from your local registry mirror, it pulls the image from the public Docker registry and stores it locally before handing it back to you. On subsequent requests, the local registry mirror is able to serve the image from its own storage.

What if the content changes on the Hub?

When a pull is attempted with a tag, the Registry checks the remote to ensure if it has the latest version of the requested content. Otherwise, it fetches and caches the latest content.

What about my disk?

In environments with high churn rates, stale data can build up in the cache. When running as a pull through cache the Registry periodically removes old content to save disk space. Subsequent requests for removed content causes a remote fetch and local re-caching.

To ensure best performance and guarantee correctness the Registry cache should be configured to use the filesystem driver for storage.

配置上,则是需要在配置文件中加下如下内容,才可作为 mirror 使用

The easiest way to run a registry as a pull through cache is to run the official Registry image. At least, you need to specify proxy.remoteurl within /etc/distribution/config.yml as described in the following subsection.

Multiple registry caches can be deployed over the same back-end. A single registry cache ensures that concurrent requests do not pull duplicate data, but this property does not hold true for a registry cache cluster.

1
2
3
4
5
proxy:
remoteurl: https://registry-1.docker.io
username: [username]
password: [password]
ttl: 168h

但 pull 请求的服务入口是什么?请求的流转路径是什么,这些都没有详细的解释,下面将通过源码剖析的方式进行分析。其中一些基础的概念已在 【Distribution】01-概述 中,此处不再赘述。

镜像 pull

镜像仓库Harbor Pull实现原理 了解到,对于 distribution, 镜像 pull 会依次调用以下接口

  • /v2/*/manifests/*
  • /v2/*/blobs/*

相应的接口文档可在 docs/spec/api.md 找到,后面将根据调用链进行源码分析。

初始化

在此之前,作为 mirror 使用的 distribution,有一些初始化动作,需要了解清楚,源码如下

hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// registry/handlers/app.go
func NewApp(ctx context.Context, config *configuration.Configuration) *App {
app := &App{
Config: config,
Context: ctx,
router: v2.RouterWithPrefix(config.HTTP.Prefix),
isCache: config.Proxy.RemoteURL != "",
}
// ...
// configure as a pull through cache
if config.Proxy.RemoteURL != "" {
app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy)
if err != nil {
panic(err.Error())
}
app.isCache = true
dcontext.GetLogger(app).Info("Registry configured as a proxy cache to ", config.Proxy.RemoteURL)
}
// ...
}

其中,调用了 NewRegistryPullThroughCache 初始化 mirror 的一些行为,源码如下,主要工作是

  • 初始化了 scheduler-state.json,用于存放 blobmanifest 的过期行为
hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// registry/proxy/proxyregistry.go
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
remoteURL, err := url.Parse(config.RemoteURL)
if err != nil {
return nil, err
}

v := storage.NewVacuum(ctx, driver)
s := scheduler.New(ctx, driver, "/scheduler-state.json")
s.OnBlobExpire(func(ref reference.Reference) error {
// reference.Canonical 是一个接口,定义可见 https://github.com/distribution/reference/blob/main/reference.go#L175
// Canonical reference is an object with a fully unique
// name including a name with domain and digest
// type Canonical interface {
// Named
// Digest() digest.Digest
// }
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}

repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}

blobs := repo.Blobs(ctx)

// Clear the repository reference and descriptor caches
err = blobs.Delete(ctx, r.Digest())
if err != nil {
return err
}

err = v.RemoveBlob(r.Digest().String())
if err != nil {
return err
}

return nil
})

s.OnManifestExpire(func(ref reference.Reference) error {
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}

repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}

manifests, err := repo.Manifests(ctx)
if err != nil {
return err
}
err = manifests.Delete(ctx, r.Digest())
if err != nil {
return err
}
return nil
})

err = s.Start()
if err != nil {
return nil, err
}

cs, err := configureAuth(config.Username, config.Password, config.RemoteURL)
if err != nil {
return nil, err
}

return &proxyingRegistry{
embedded: registry,
scheduler: s,
remoteURL: *remoteURL,
authChallenger: &remoteAuthChallenger{
remoteURL: *remoteURL,
cm: challenge.NewSimpleManager(),
cs: cs,
},
}, nil
}

请求链路

【Distribution】01-概述 中已经提到

其中 NewApp 用于真正处理请求,分为以下几个步骤

  1. 注册分发器,用于处理不同的请求,分发器的实际处理代码位于 registry/api/v2/descriptors.go 中,对应的 handler 位于 registry/handlers/{handler}.go
  2. 通过配置文件配置相应的组件

app.go 可以看到,针对 blob 获取注册相应的处理函数如下

hider
1
2
// registry/handlers/app.go
app.register(v2.RouteNameBlob, blobDispatcher)

首先是在 routeDescriptors 中定义了相关的行为,针对 pull 场景,就是 RouteNameBlob,大体分为 GETDELETE 两个操作,其中 GET 又分为 Fetch BlobFetch Blob Part ,根据描述可以知道,两者的使用场景来源于客户端的请求方式,因此一般情况下,会走到 Fetch Blob 的逻辑,后面也会以这个进行分析

Name: “Fetch Blob Part”,

Description: “This endpoint may also support RFC7233 compliant range requests. Support can be detected by issuing a HEAD request. If the header Accept-Range: bytes is returned, range requests can be used to fetch partial content.”,

相关源码如下

hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// registry/api/v2/descriptors.go
var routeDescriptors = []RouteDescriptor{
// ...
{
Name: RouteNameBlob,
Path: "/v2/{name:" + reference.NameRegexp.String() + "}/blobs/{digest:" + digest.DigestRegexp.String() + "}",
Entity: "Blob",
Description: "Operations on blobs identified by `name` and `digest`. Used to fetch or delete layers by digest.",
Methods: []MethodDescriptor{
{
Method: "GET",
Description: "Retrieve the blob from the registry identified by `digest`. A `HEAD` request can also be issued to this endpoint to obtain resource information without receiving all data.",
Requests: []RequestDescriptor{
{
Name: "Fetch Blob",
Headers: []ParameterDescriptor{
hostHeader,
authHeader,
},
PathParameters: []ParameterDescriptor{
nameParameterDescriptor,
digestPathParameter,
},
Successes: []ResponseDescriptor{
{
Description: "The blob identified by `digest` is available. The blob content will be present in the body of the request.",
StatusCode: http.StatusOK,
Headers: []ParameterDescriptor{
{
Name: "Content-Length",
Type: "integer",
Description: "The length of the requested blob content.",
Format: "<length>",
},
digestHeader,
},
Body: BodyDescriptor{
ContentType: "application/octet-stream",
Format: "<blob binary data>",
},
},
{
Description: "The blob identified by `digest` is available at the provided location.",
StatusCode: http.StatusTemporaryRedirect,
Headers: []ParameterDescriptor{
{
Name: "Location",
Type: "url",
Description: "The location where the layer should be accessible.",
Format: "<blob location>",
},
digestHeader,
},
},
},
Failures: []ResponseDescriptor{
{
Description: "There was a problem with the request that needs to be addressed by the client, such as an invalid `name` or `tag`.",
StatusCode: http.StatusBadRequest,
ErrorCodes: []errcode.ErrorCode{
ErrorCodeNameInvalid,
ErrorCodeDigestInvalid,
},
Body: BodyDescriptor{
ContentType: "application/json; charset=utf-8",
Format: errorsBody,
},
},
{
Description: "The blob, identified by `name` and `digest`, is unknown to the registry.",
StatusCode: http.StatusNotFound,
Body: BodyDescriptor{
ContentType: "application/json; charset=utf-8",
Format: errorsBody,
},
ErrorCodes: []errcode.ErrorCode{
ErrorCodeNameUnknown,
ErrorCodeBlobUnknown,
},
},
unauthorizedResponseDescriptor,
repositoryNotFoundResponseDescriptor,
deniedResponseDescriptor,
tooManyRequestsDescriptor,
},
},
{
Name: "Fetch Blob Part",
Description: "This endpoint may also support RFC7233 compliant range requests. Support can be detected by issuing a HEAD request. If the header `Accept-Range: bytes` is returned, range requests can be used to fetch partial content.",
Headers: []ParameterDescriptor{
hostHeader,
authHeader,
{
Name: "Range",
Type: "string",
Description: "HTTP Range header specifying blob chunk.",
Format: "bytes=<start>-<end>",
},
},
PathParameters: []ParameterDescriptor{
nameParameterDescriptor,
digestPathParameter,
},
Successes: []ResponseDescriptor{
{
Description: "The blob identified by `digest` is available. The specified chunk of blob content will be present in the body of the request.",
StatusCode: http.StatusPartialContent,
Headers: []ParameterDescriptor{
{
Name: "Content-Length",
Type: "integer",
Description: "The length of the requested blob chunk.",
Format: "<length>",
},
{
Name: "Content-Range",
Type: "byte range",
Description: "Content range of blob chunk.",
Format: "bytes <start>-<end>/<size>",
},
},
Body: BodyDescriptor{
ContentType: "application/octet-stream",
Format: "<blob binary data>",
},
},
},
Failures: []ResponseDescriptor{
{
Description: "There was a problem with the request that needs to be addressed by the client, such as an invalid `name` or `tag`.",
StatusCode: http.StatusBadRequest,
ErrorCodes: []errcode.ErrorCode{
ErrorCodeNameInvalid,
ErrorCodeDigestInvalid,
},
Body: BodyDescriptor{
ContentType: "application/json; charset=utf-8",
Format: errorsBody,
},
},
{
StatusCode: http.StatusNotFound,
ErrorCodes: []errcode.ErrorCode{
ErrorCodeNameUnknown,
ErrorCodeBlobUnknown,
},
Body: BodyDescriptor{
ContentType: "application/json; charset=utf-8",
Format: errorsBody,
},
},
{
Description: "The range specification cannot be satisfied for the requested content. This can happen when the range is not formatted correctly or if the range is outside of the valid size of the content.",
StatusCode: http.StatusRequestedRangeNotSatisfiable,
},
unauthorizedResponseDescriptor,
repositoryNotFoundResponseDescriptor,
deniedResponseDescriptor,
tooManyRequestsDescriptor,
},
},
},
},
{
Method: "DELETE",
Description: "Delete the blob identified by `name` and `digest`",
Requests: []RequestDescriptor{
{
Headers: []ParameterDescriptor{
hostHeader,
authHeader,
},
PathParameters: []ParameterDescriptor{
nameParameterDescriptor,
digestPathParameter,
},
Successes: []ResponseDescriptor{
{
StatusCode: http.StatusAccepted,
Headers: []ParameterDescriptor{
{
Name: "Content-Length",
Type: "integer",
Description: "0",
Format: "0",
},
digestHeader,
},
},
},
Failures: []ResponseDescriptor{
{
Name: "Invalid Name or Digest",
StatusCode: http.StatusBadRequest,
ErrorCodes: []errcode.ErrorCode{
ErrorCodeDigestInvalid,
ErrorCodeNameInvalid,
},
},
{
Description: "The blob, identified by `name` and `digest`, is unknown to the registry.",
StatusCode: http.StatusNotFound,
Body: BodyDescriptor{
ContentType: "application/json; charset=utf-8",
Format: errorsBody,
},
ErrorCodes: []errcode.ErrorCode{
ErrorCodeNameUnknown,
ErrorCodeBlobUnknown,
},
},
{
Description: "Blob delete is not allowed because the registry is configured as a pull-through cache or `delete` has been disabled",
StatusCode: http.StatusMethodNotAllowed,
Body: BodyDescriptor{
ContentType: "application/json; charset=utf-8",
Format: errorsBody,
},
ErrorCodes: []errcode.ErrorCode{
errcode.ErrorCodeUnsupported,
},
},
unauthorizedResponseDescriptor,
repositoryNotFoundResponseDescriptor,
deniedResponseDescriptor,
tooManyRequestsDescriptor,
},
},
},
},
// TODO(stevvooe): We may want to add a PUT request here to
// kickoff an upload of a blob, integrated with the blob upload
// API.
},
},
// ...
}

再看 blobDispatcher 代码如下

hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// registry/handlers/blob.go
// blobDispatcher uses the request context to build a blobHandler.
func blobDispatcher(ctx *Context, r *http.Request) http.Handler {
dgst, err := getDigest(ctx)
if err != nil {

if err == errDigestNotAvailable {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.Errors = append(ctx.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
})
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.Errors = append(ctx.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
})
}

blobHandler := &blobHandler{
Context: ctx,
Digest: dgst,
}

mhandler := handlers.MethodHandler{
"GET": http.HandlerFunc(blobHandler.GetBlob),
"HEAD": http.HandlerFunc(blobHandler.GetBlob),
}

if !ctx.readOnly {
mhandler["DELETE"] = http.HandlerFunc(blobHandler.DeleteBlob)
}

return mhandler
}

可以看到,其中关键是调用了 GetBlob 函数,如下

hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// registry/handlers/blob.go
// GetBlob fetches the binary data from backend storage returns it in the
// response.
func (bh *blobHandler) GetBlob(w http.ResponseWriter, r *http.Request) {
context.GetLogger(bh).Debug("GetBlob")
blobs := bh.Repository.Blobs(bh)
// 根据 digest 获取 desc(会先走 cache,如果 miss,则会 set cache),详细可见 delve 调试
desc, err := blobs.Stat(bh, bh.Digest)
if err != nil {
if err == distribution.ErrBlobUnknown {
bh.Errors = append(bh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(bh.Digest))
} else {
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}

if err := blobs.ServeBlob(bh, w, r, desc.Digest); err != nil {
context.GetLogger(bh).Debugf("unexpected error getting blob HTTP handler: %v", err)
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}

精彩的来了,blobs := bh.Repository.Blobs(bh) 从这里开始用了大量的封装抽象

  1. 首先是 bh,即 blobHandler,定义如下,可以看到是一个嵌套结构体,因此可以直接使用 Context 定义的字段

    在 Go 中,嵌入一个结构体意味着可以在外部结构体中直接访问嵌入的结构体的字段和方法,而不需要显式地使用嵌入结构体的名称

    hider
    1
    2
    3
    4
    5
    6
    7
    // registry/handlers/blob.go
    // blobHandler serves http blob requests.
    type blobHandler struct {
    *Context

    Digest digest.Digest
    }
  2. 其中 Context 是结构体,内含 Repository 字段

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // registry/handlers/context.go
    // Context should contain the request specific context for use in across
    // handlers. Resources that don't need to be shared across handlers should not
    // be on this object.
    type Context struct {
    // App points to the application structure that created this context.
    *App
    context.Context

    // Repository is the repository for the current request. All requests
    // should be scoped to a single repository. This field may be nil.
    Repository distribution.Repository

    // RepositoryRemover provides method to delete a repository
    RepositoryRemover distribution.RepositoryRemover

    // Errors is a collection of errors encountered during the request to be
    // returned to the client API. If errors are added to the collection, the
    // handler *must not* start the response via http.ResponseWriter.
    Errors errcode.Errors

    urlBuilder *v2.URLBuilder

    // TODO(stevvooe): The goal is too completely factor this context and
    // dispatching out of the web application. Ideally, we should lean on
    // context.Context for injection of these resources.
    }
  3. Repository 则是一个 interface,定义如下

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // registry.go
    // Repository is a named collection of manifests and layers.
    type Repository interface {
    // Named returns the name of the repository.
    Named() reference.Named

    // Manifests returns a reference to this repository's manifest service.
    // with the supplied options applied.
    Manifests(ctx context.Context, options ...ManifestServiceOption) (ManifestService, error)

    // Blobs returns a reference to this repository's blob service.
    Blobs(ctx context.Context) BlobStore

    // TODO(stevvooe): The above BlobStore return can probably be relaxed to
    // be a BlobService for use with clients. This will allow such
    // implementations to avoid implementing ServeBlob.

    // Tags returns a reference to this repositories tag service
    Tags(ctx context.Context) TagService
    }

到了这一步,能够知道,为了支持不同场景,因此只需要实现 Blobs 方法,即可以用接口调用,此时,全局搜索可以发现,一共有以下几个方法

  • notifications/listener.go
  • registry/client/repository.go
  • registry/proxy/proxyregistry.go
  • registry/storage/registry.go
hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// notifications/listener.go
func (rl *repositoryListener) Blobs(ctx context.Context) distribution.BlobStore {
return &blobServiceListener{
BlobStore: rl.Repository.Blobs(ctx),
parent: rl,
}
}

// registry/client/repository.go
func (r *repository) Blobs(ctx context.Context) distribution.BlobStore {
statter := &blobStatter{
name: r.name,
ub: r.ub,
client: r.client,
}
return &blobs{
name: r.name,
ub: r.ub,
client: r.client,
statter: cache.NewCachedBlobStatter(memory.NewInMemoryBlobDescriptorCacheProvider(), statter),
}
}

// registry/proxy/proxyregistry.go
func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
return pr.blobStore
}


// registry/storage/registry.go
// Blobs returns an instance of the BlobStore. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
var statter distribution.BlobDescriptorService = &linkedBlobStatter{
blobStore: repo.blobStore,
repository: repo,
linkPathFns: []linkPathFunc{blobLinkPath},
}

if repo.descriptorCache != nil {
statter = cache.NewCachedBlobStatter(repo.descriptorCache, statter)
}

if repo.registry.blobDescriptorServiceFactory != nil {
statter = repo.registry.blobDescriptorServiceFactory.BlobAccessController(statter)
}

return &linkedBlobStore{
registry: repo.registry,
blobStore: repo.blobStore,
blobServer: repo.blobServer,
blobAccessController: statter,
repository: repo,
ctx: ctx,

// TODO(stevvooe): linkPath limits this blob store to only layers.
// This instance cannot be used for manifest checks.
linkPathFns: []linkPathFunc{blobLinkPath},
deleteEnabled: repo.registry.deleteEnabled,
resumableDigestEnabled: repo.resumableDigestEnabled,
}
}

尽管作为 mirror 时,能很明显的知道,实际调用了 registry/proxy/proxyregistry.go,但是需要弄清楚的是

  1. 为什么 Blobs 方法的实现中,入参是 context.Context,但是实际传入的却是 blobHandler
  2. 何时进行了定义传入

对于第一个问题,是利用了 Golang 中的 类型嵌套,即

结构体可以嵌套其他结构体。如果一个结构体嵌套了一个接口(如 context.Context),那么它就可以被当作该接口的实现来使用

因此 blobHandler 嵌套了 *Context, 而 Context 结构体嵌套了 context.Context,因此 *blobHandler 也可以作为 context.Context 类型传递。具体解释可看 Embedding interfaces in structs

对于第二个问题,往回找发现 初始化 中根据不同条件,对 app.registry 进行了不同的赋值,而对于 mirror 来说,即是如下代码。可以看到,如果设定为 mirror 则优先级最高

hider
1
2
3
// registry/handlers/app.go

app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy)

上面还只是设定了 ctx,真正生效则是在接收请求时,才会真正执行初始化(这个通过 delve 调试能比较好的找到,可以用 b app.go:655),代码如下

hider
1
2
3
4
5
6
// registry/handlers/app.go
func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
// ...
repository, err := app.registry.Repository(context, nameRef)
// ...
}

因此,当请求到达时,blobDispatcher 能够动态获取到 Repository 的定义为 proxyingRegistry 下的 Repository,后面就是走 proxy 相关逻辑进行处理了

获取到 blobs 后,则调用 desc, err := blobs.Stat(bh, bh.Digest) 获取 blob 相关的信息,主要是获取 digestsizemediatypedesc 具体内容如下

1
2
3
4
5
6
7
github.com/docker/distribution.Descriptor {
MediaType: "application/octet-stream",
Size: 3418409,
Digest: "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdff...+7 more",
URLs: []string len: 0, cap: 0, nil,
Annotations: map[string]string nil,
Platform: *github.com/docker/distribution/vendor/github.com/opencontainers/image-spec/specs-go/v1.Platform nil,}

这里 Stat 会根据 blobs 的定义 notifications.(*blobServiceListener).Stat() ,并根据 ctx 进入 registry/proxy.(*proxyBlobStore).Stat(),代码如下,可以看到,先尝试本地是否存在,如果不存在则回源

hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// registry/proxy/proxyblobstore.go
func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err == nil {
return desc, err
}

if err != distribution.ErrBlobUnknown {
return distribution.Descriptor{}, err
}

if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
return distribution.Descriptor{}, err
}

return pbs.remoteStore.Stat(ctx, dgst)
}

这里为什么还得看下

本地则先会调用 registry/storage.(*linkedBlobStore).Stat()

1
2
3
4
// registry/storage/linkedblobstore.go
func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return lbs.blobAccessController.Stat(ctx, dgst)
}

这里为什么还得看下

随即调用 registry/storage/cache.(*cachedBlobStatter).Stat(),这个函数在后续会经常调用(即 cache 处理)

hider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
cacheCount.WithValues("Request").Inc(1)
desc, err := cbds.cache.Stat(ctx, dgst)
if err != nil {
if err != distribution.ErrBlobUnknown {
logErrorf(ctx, cbds.tracker, "error retrieving descriptor from cache: %v", err)
}

goto fallback
}
cacheCount.WithValues("Hit").Inc(1)
if cbds.tracker != nil {
cbds.tracker.Hit()
}
return desc, nil
fallback:
cacheCount.WithValues("Miss").Inc(1)
if cbds.tracker != nil {
cbds.tracker.Miss()
}
desc, err = cbds.backend.Stat(ctx, dgst)
if err != nil {
return desc, err
}

if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err)
}

return desc, err

}

delve 调试

下面使用 delve 进行 debug,关于 delve 的详细使用,可查看 【Go】07-调试工具 delve

调试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1. debug 启动调试
dlv debug cmd/registry/main.go -- serve cmd/registry/config-debug.yml

# 2. 进入后,打断点,如
b handlers.GetBlob

# 3. 启动服务
c

# 4. 模拟请求
curl http://localhost:5000/v2/library/alpine/blobs/sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885 -o testfile

# 5. 断点调试
s/so/n/c/locals/p blobs

GetBlob desc, err := blobs.Stat(bh, bh.Digest)

  • 请求到达后,会根据 ctxdgst首先查看 cache 中是否有相应的文件信息,如果没有,会查询一遍后,信息如下(以下是走到了 cache miss 的逻辑)

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    (dlv) l
    > github.com/docker/distribution/registry/storage/cache.(*cachedBlobStatter).Stat() ./registry/storage/cache/cachedblobdescriptorstore.go:87 (PC: 0xb74ff6)
    82: cacheCount.WithValues("Miss").Inc(1)
    83: if cbds.tracker != nil {
    84: cbds.tracker.Miss()
    85: }
    86: desc, err = cbds.backend.Stat(ctx, dgst)
    => 87: if err != nil {
    88: return desc, err
    89: }
    90:
    91: if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
    92: logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err)
    (dlv) p desc
    github.com/docker/distribution.Descriptor {
    MediaType: "application/octet-stream",
    Size: 3418409,
    Digest: "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885",
    URLs: []string len: 0, cap: 0, nil,
    Annotations: map[string]string nil,
    Platform: *github.com/docker/distribution/vendor/github.com/opencontainers/image-spec/specs-go/v1.Platform nil,}
  • 如果是本地缓存没有,那么就会访问 proxy,并写在本地

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    344
    345
    346
    347
    348
    349
    350
    351
    352
    353
    354
    355
    356
    357
    358
    359
    360
    361
    362
    363
    364
    365
    366
    367
    368
    369
    (dlv) l
    > github.com/docker/distribution/registry/storage/driver/filesystem.(*driver).GetContent() /root/go/src/github.com/docker/distribution/registry/storage/driver/filesystem/driver.go:119 (PC: 0xd43a96)
    114: func (d *driver) Name() string {
    115: return driverName
    116: }
    117:
    118: // GetContent retrieves the content stored at "path" as a []byte.
    => 119: func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
    120: rc, err := d.Reader(ctx, path, 0)
    121: if err != nil {
    122: return nil, err
    123: }
    124: defer rc.Close()
    (dlv)
    # ...
    (dlv) l
    > github.com/docker/distribution/registry/storage/driver/filesystem.(*driver).Reader() /root/go/src/github.com/docker/distribution/registry/storage/driver/filesystem/driver.go:151 (PC: 0xd44456)
    146: return writer.Commit()
    147: }
    148:
    149: // Reader retrieves an io.ReadCloser for the content stored at "path" with a
    150: // given byte offset.
    => 151: func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
    152: file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
    153: if err != nil {
    154: if os.IsNotExist(err) {
    155: return nil, storagedriver.PathNotFoundError{Path: path}
    156: }
    (dlv)
    # ...
    (dlv) l
    > github.com/docker/distribution/registry/storage/driver/filesystem.(*driver).Reader() /root/go/src/github.com/docker/distribution/registry/storage/driver/filesystem/driver.go:153 (PC: 0xd44560)
    148:
    149: // Reader retrieves an io.ReadCloser for the content stored at "path" with a
    150: // given byte offset.
    151: func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
    152: file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
    => 153: if err != nil {
    154: if os.IsNotExist(err) {
    155: return nil, storagedriver.PathNotFoundError{Path: path}
    156: }
    157:
    158: return nil, err
    (dlv) locals
    err = error(*io/fs.PathError) 0xc0003e0e48
    file = *os.File nil
    # ...
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*blobStore).readlink() /root/go/src/github.com/docker/distribution/registry/storage/blobstore.go:144 (PC: 0xb9b009)
    139:
    140: // readlink returns the linked digest at path.
    141: func (bs *blobStore) readlink(ctx context.Context, path string) (digest.Digest, error) {
    142: content, err := bs.driver.GetContent(ctx, path)
    143: if err != nil {
    => 144: return "", err
    145: }
    146:
    147: linked, err := digest.Parse(string(content))
    148: if err != nil {
    149: return "", err
    (dlv) locals
    content = []uint8 len: 0, cap: 0, nil
    err = error(github.com/docker/distribution/registry/storage/driver.PathNotFoundError) {Path: "/docker/registry/v2/repositories/library/alpine/_layers/sha256/4...+68 more", DriverName: "filesystem"}
    (dlv) p err
    error(github.com/docker/distribution/registry/storage/driver.PathNotFoundError) {
    Path: "/docker/registry/v2/repositories/library/alpine/_layers/sha256/44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885/link",
    DriverName: "filesystem",}
    (dlv)
    # ...
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*linkedBlobStatter).Stat() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:395 (PC: 0xbaa5ca)
    390: if err == nil {
    391: found = true
    392: break // success!
    393: }
    394:
    => 395: switch err := err.(type) {
    396: case driver.PathNotFoundError:
    397: // do nothing, just move to the next linkPathFn
    398: default:
    399: return distribution.Descriptor{}, err
    400: }
    (dlv)
    # ...
    (dlv) s
    > github.com/docker/distribution/registry/storage/cache.(*cachedBlobStatter).Stat() /root/go/src/github.com/docker/distribution/registry/storage/cache/cachedblobdescriptorstore.go:86 (PC: 0xb74f3a)
    Values returned:
    ~r0: github.com/docker/distribution.Descriptor {
    MediaType: "",
    Size: 0,
    Digest: "",
    URLs: []string len: 0, cap: 0, nil,
    Annotations: map[string]string nil,
    Platform: *github.com/docker/distribution/vendor/github.com/opencontainers/image-spec/specs-go/v1.Platform nil,}
    ~r1: error(*errors.errorString) *{
    s: "unknown blob",}

    81: fallback:
    82: cacheCount.WithValues("Miss").Inc(1)
    83: if cbds.tracker != nil {
    84: cbds.tracker.Miss()
    85: }
    => 86: desc, err = cbds.backend.Stat(ctx, dgst)
    87: if err != nil {
    88: return desc, err
    89: }
    90:
    91: if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
    (dlv)
    # ...
    (dlv) l
    > github.com/docker/distribution/registry/proxy.(*proxyBlobStore).Stat() /root/go/src/github.com/docker/distribution/registry/proxy/proxyblobstore.go:169 (PC: 0xbe318e)
    164:
    165: if err != distribution.ErrBlobUnknown {
    166: return distribution.Descriptor{}, err
    167: }
    168:
    => 169: if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
    170: return distribution.Descriptor{}, err
    171: }
    172:
    173: return pbs.remoteStore.Stat(ctx, dgst)
    174: }
    (dlv) s
    DEBU[1994] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=39.592µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=e0d4c1b9-1659-4fad-a556-aeec6b74d624 trace.line=155 version="v2.8.3+unknown"
    > github.com/docker/distribution/registry/proxy.(*remoteAuthChallenger).tryEstablishChallenges() /root/go/src/github.com/docker/distribution/registry/proxy/proxyregistry.go:217 (PC: 0xbe7696)
    212: func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
    213: return r.cm
    214: }
    215:
    216: // tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
    => 217: func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
    218: r.Lock()
    219: defer r.Unlock()
    220:
    221: remoteURL := r.remoteURL
    222: remoteURL.Path = "/v2/"
    (dlv)
    # tryEstablishChallenges 只是尝试建立连接,还没有真正获取数据
    # 真正获取数据,则是由后面的
    (dlv) l
    > github.com/docker/distribution/registry/client.(*blobStatter).Stat() /root/go/src/github.com/docker/distribution/registry/client/repository.go:794 (PC: 0xbd7411)
    789: func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
    790: ref, err := reference.WithDigest(bs.name, dgst)
    791: if err != nil {
    792: return distribution.Descriptor{}, err
    793: }
    => 794: u, err := bs.ub.BuildBlobURL(ref)
    795: if err != nil {
    796: return distribution.Descriptor{}, err
    797: }
    798:
    799: resp, err := bs.client.Head(u)
    (dlv)
    # 获取数据前,需要指定 scope 和 生成 token
    (dlv) l
    > github.com/docker/distribution/registry/client/auth.(*tokenHandler).getToken() /root/go/src/github.com/docker/distribution/registry/client/auth/session.go:278 (PC: 0xbda455)
    273: defer th.tokenLock.Unlock()
    274: scopes := make([]string, 0, len(th.scopes)+len(additionalScopes))
    275: for _, scope := range th.scopes {
    276: scopes = append(scopes, scope.String())
    277: }
    => 278: var addedScopes bool
    279: for _, scope := range additionalScopes {
    280: if hasScope(scopes, scope) {
    281: continue
    282: }
    283: scopes = append(scopes, scope)
    (dlv)
    # 通过 th.creds 中存储的账密像 proxy 获取 token
    (dlv) l
    > github.com/docker/distribution/registry/client/auth.(*tokenHandler).fetchToken() /root/go/src/github.com/docker/distribution/registry/client/auth/session.go:494 (PC: 0xbdceb2)
    489:
    490: service := params["service"]
    491:
    492: var refreshToken string
    493:
    => 494: if th.creds != nil {
    495: refreshToken = th.creds.RefreshToken(realmURL, service)
    496: }
    497:
    498: if refreshToken != "" || th.forceOAuth {
    499: return th.fetchTokenWithOAuth(realmURL, refreshToken, service, scopes)
    (dlv)
    # 请求过程中则是将 username 和 password 进行 base64 加密
    > net/http.basicAuth() /usr/local/go/src/net/http/client.go:423 (PC: 0x869b60)
    418: // "To receive authorization, the client sends the userid and password,
    419: // separated by a single colon (":") character, within a base64
    420: // encoded string in the credentials."
    421: // It is not meant to be urlencoded.
    422: func basicAuth(username, password string) string {
    => 423: auth := username + ":" + password
    424: return base64.StdEncoding.EncodeToString([]byte(auth))
    425: }
    426:
    427: // Get issues a GET to the specified URL. If the response is one of
    428: // the following redirect codes, Get follows the redirect, up to a
    (dlv)
    # 发起请求获取数据
    (dlv) l
    > github.com/docker/distribution/registry/client/transport.(*transport).RoundTrip() /root/go/src/github.com/docker/distribution/registry/client/transport/transport.go:56 (PC: 0xbc747f)
    51: // access token. If no token exists or token is expired,
    52: // tries to refresh/fetch a new token.
    53: func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
    54: req2 := cloneRequest(req)
    55: for _, modifier := range t.Modifiers {
    => 56: if err := modifier.ModifyRequest(req2); err != nil {
    57: return nil, err
    58: }
    59: }
    60:
    61: t.setModReq(req, req2)
    (dlv)
    # 数据存储前,会走到 cache 逻辑,此时 mirror 本地存储还没有数据
    (dlv) l
    > github.com/docker/distribution/registry/storage/cache/memory.(*inMemoryBlobDescriptorCacheProvider).SetDescriptor() /root/go/src/github.com/docker/distribution/registry/storage/cache/memory/memory.go:55 (PC: 0xbc8ba9)
    50:
    51: func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
    52: _, err := imbdcp.Stat(ctx, dgst)
    53: if err == distribution.ErrBlobUnknown {
    54:
    => 55: if dgst.Algorithm() != desc.Digest.Algorithm() && dgst != desc.Digest {
    56: // if the digests differ, set the other canonical mapping
    57: if err := imbdcp.global.SetDescriptor(ctx, desc.Digest, desc); err != nil {
    58: return err
    59: }
    60: }
    (dlv) n
    DEBU[6818] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=36.344µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=7c42ff89-d43a-40f5-b91a-32d92f869377 trace.line=155 version="v2.8.3+unknown"
    > github.com/docker/distribution/registry/storage/cache/memory.(*inMemoryBlobDescriptorCacheProvider).SetDescriptor() /root/go/src/github.com/docker/distribution/registry/storage/cache/memory/memory.go:63 (PC: 0xbc8d32)
    58: return err
    59: }
    60: }
    61:
    62: // unknown, just set it
    => 63: return imbdcp.global.SetDescriptor(ctx, dgst, desc)
    64: }
    65:
    66: // we already know it, do nothing
    67: return err
    68: }
    (dlv)
    # 首次和二次 pull 走相同的逻辑,在 readlink 时出现不同的处理
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*blobStore).readlink() /root/go/src/github.com/docker/distribution/registry/storage/blobstore.go:144 (PC: 0xb9b009)
    139:
    140: // readlink returns the linked digest at path.
    141: func (bs *blobStore) readlink(ctx context.Context, path string) (digest.Digest, error) {
    142: content, err := bs.driver.GetContent(ctx, path)
    143: if err != nil {
    => 144: return "", err
    145: }
    146:
    147: linked, err := digest.Parse(string(content))
    148: if err != nil {
    149: return "", err
    (dlv) s
    DEBU[7454] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=36.833µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=40c6e2e6-6848-406b-b055-7111293ad295 trace.line=155 version="v2.8.3+unknown"
    > github.com/docker/distribution/registry/storage.(*linkedBlobStatter).resolveWithLinkFunc() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:449 (PC: 0xbaafc9)
    Values returned:
    ~r0: ""
    ~r1: error(github.com/docker/distribution/registry/storage/driver.PathNotFoundError) {
    Path: "/docker/registry/v2/repositories/library/alpine/_layers/sha256/44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885/link",
    DriverName: "filesystem",}

    444: blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
    445: if err != nil {
    446: return "", err
    447: }
    448:
    => 449: return lbs.blobStore.readlink(ctx, blobLinkPath)
    450: }
    451:
    452: func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
    453: // The canonical descriptor for a blob is set at the commit phase of upload
    454: return nil
    (dlv)
    # 根据 uploadDataPathSpec ,获取数据写入到本地
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*linkedBlobStore).Create() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:156 (PC: 0xba7a55)
    151: path, err := pathFor(uploadDataPathSpec{
    152: name: lbs.repository.Named().Name(),
    153: id: uuid,
    154: })
    155:
    => 156: if err != nil {
    157: return nil, err
    158: }
    159:
    160: startedAtPath, err := pathFor(uploadStartedAtPathSpec{
    161: name: lbs.repository.Named().Name(),
    (dlv)
    # 写入前的最后准备工作
    (dlv) locals
    opts = github.com/docker/distribution.CreateOptions {Mount: (*"struct { ShouldMount bool; From github.com/docker/distribution/vendor/github.com/distribution/reference.Canonical; Stat *github.com/docker/distribution.Descriptor }")(0xc0002a0320)}
    uuid = "fd96c777-cacd-4dbf-954b-17f98861599c"
    startedAt = time.Time(2025-02-18T12:00:35Z){wall: 538346343, ext: 63875476835, loc: *time.Location nil}
    err = error nil
    path = "/docker/registry/v2/repositories/library/alpine/_uploads/fd96c77...+34 more"
    startedAtPath = "/docker/registry/v2/repositories/library/alpine/_uploads/fd96c77...+39 more"
    (dlv) p path
    "/docker/registry/v2/repositories/library/alpine/_uploads/fd96c777-cacd-4dbf-954b-17f98861599c/data"
    (dlv) p startedAtPath
    "/docker/registry/v2/repositories/library/alpine/_uploads/fd96c777-cacd-4dbf-954b-17f98861599c/startedat"
    (dlv)

    # lbs.blobStore.driver.PutContent 往 startedat 写入时间
    (dlv) s
    DEBU[8811] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=16.772555154s trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=ad590e58-4273-4ea7-a6e1-d79fad18fca1 trace.line=155 version="v2.8.3+unknown"
    > github.com/docker/distribution/registry/storage.(*linkedBlobStore).Create() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:174 (PC: 0xba7d84)
    169: // Write a startedat file for this upload
    170: if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
    171: return nil, err
    172: }
    173:
    => 174: return lbs.newBlobUpload(ctx, uuid, path, startedAt, false)
    175: }
    176:
    177: func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
    178: dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Resume")
    179:
    (dlv)

    # 真正写入文件 lbs.newBlobUpload(ctx, uuid, path, startedAt, false)
    (dlv) l
    > github.com/docker/distribution/registry/proxy.(*proxyBlobStore).copyContent() /root/go/src/github.com/docker/distribution/registry/proxy/proxyblobstore.go:46 (PC: 0xbe141e)
    41: desc, err := pbs.remoteStore.Stat(ctx, dgst)
    42: if err != nil {
    43: return distribution.Descriptor{}, err
    44: }
    45:
    => 46: if w, ok := writer.(http.ResponseWriter); ok {
    47: setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
    48: }
    49:
    50: remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
    51: if err != nil {
    (dlv)
    # ...
    # 调用 io.CopyN(writer, remoteReader, desc.Size) 后,uuid data 才会有内容
    (dlv) l
    > github.com/docker/distribution/registry/proxy.(*proxyBlobStore).copyContent() /root/go/src/github.com/docker/distribution/registry/proxy/proxyblobstore.go:57 (PC: 0xbe17a5)
    52: return distribution.Descriptor{}, err
    53: }
    54:
    55: defer remoteReader.Close()
    56:
    => 57: _, err = io.CopyN(writer, remoteReader, desc.Size)
    58: if err != nil {
    59: return distribution.Descriptor{}, err
    60: }
    61:
    62: proxyMetrics.BlobPush(uint64(desc.Size))
    (dlv)
    # 最后调用 moveBlob,从临时文件夹中移除
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*blobWriter).moveBlob() /root/go/src/github.com/docker/distribution/registry/storage/blobwriter.go:305 (PC: 0xb9e0b6)
    300: if err != nil {
    301: return err
    302: }
    303:
    304: // Check for existence
    => 305: if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil {
    306: switch err := err.(type) {
    307: case storagedriver.PathNotFoundError:
    308: break // ensure that it doesn't exist.
    309: default:
    310: return err
    (dlv)
  • 如果本地缓存有,那么就会从本地直接返回

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # 走下面 err = nil 的逻辑
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*blobStore).readlink() /root/go/src/github.com/docker/distribution/registry/storage/blobstore.go:143 (PC: 0xb9b002)
    138: }
    139:
    140: // readlink returns the linked digest at path.
    141: func (bs *blobStore) readlink(ctx context.Context, path string) (digest.Digest, error) {
    142: content, err := bs.driver.GetContent(ctx, path)
    => 143: if err != nil {
    144: return "", err
    145: }
    146:
    147: linked, err := digest.Parse(string(content))
    148: if err != nil {
    (dlv)
  • 随后将信息写入 cache,信息如下

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    (dlv) l
    > github.com/docker/distribution/registry/storage/cache/memory.(*mapBlobDescriptorCache).SetDescriptor() ./registry/storage/cache/memory/memory.go:178 (PC: 0xbc9ef3)
    173:
    174: mbdc.mu.Lock()
    175: defer mbdc.mu.Unlock()
    176:
    177: mbdc.descriptors[dgst] = desc
    => 178: return nil
    179: }
    (dlv) p mbdc
    ("*github.com/docker/distribution/registry/storage/cache/memory.mapBlobDescriptorCache")(0xc00036ec40)
    *github.com/docker/distribution/registry/storage/cache/memory.mapBlobDescriptorCache {
    descriptors: map[github.com/docker/distribution/vendor/github.com/opencontainers/go-digest.Digest]github.com/docker/distribution.Descriptor [
    "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885": (*"github.com/docker/distribution.Descriptor")(0xc0000a63a0),
    ],
    mu: sync.RWMutex {
    w: (*sync.Mutex)(0xc00036ec48),
    writerSem: 0,
    readerSem: 0,
    readerCount: (*"sync/atomic.Int32")(0xc00036ec58),
    readerWait: (*"sync/atomic.Int32")(0xc00036ec5c),},}
    (dlv) p mbdc.descriptors
    map[github.com/docker/distribution/vendor/github.com/opencontainers/go-digest.Digest]github.com/docker/distribution.Descriptor [
    "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885": {
    MediaType: "application/octet-stream",
    Size: 3418409,
    Digest: "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885",
    URLs: []string len: 0, cap: 0, nil,
    Annotations: map[string]string nil,
    Platform: *github.com/docker/distribution/vendor/github.com/opencontainers/image-spec/specs-go/v1.Platform nil,},
    ]
    (dlv) s
    > github.com/docker/distribution/registry/storage/cache/memory.(*inMemoryBlobDescriptorCacheProvider).SetDescriptor() ./registry/storage/cache/memory/memory.go:63 (PC: 0xbc8d9e)
    Values returned:
    ~r0: error nil

    58: return err
    59: }
    60: }
    61:
    62: // unknown, just set it
    => 63: return imbdcp.global.SetDescriptor(ctx, dgst, desc)
    64: }
    65:
    66: // we already know it, do nothing
    67: return err
    68: }
    (dlv) s
    DEBU[53825] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=60.197µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=22bba5cd-3f4b-4609-8977-5ce26ad5bb7f trace.line=155 version="v2.8.3+unknown"
    DEBU[53825] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=12.473µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=651c9e73-4c43-45f9-9134-5b9dbb5fcce6 trace.line=155 version="v2.8.3+unknown"
    > github.com/docker/distribution/registry/storage/cache.(*cachedBlobStatter).Stat() ./registry/storage/cache/cachedblobdescriptorstore.go:91 (PC: 0xb750bc)
    Values returned:
    ~r0: error nil

    86: desc, err = cbds.backend.Stat(ctx, dgst)
    87: if err != nil {
    88: return desc, err
    89: }
    90:
    => 91: if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
    92: logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err)
    93: }
    94:
    95: return desc, err
    96:

GetBlob blobs.ServeBlob(bh, w, r, desc.Digest)

  • 经过一系列调用,会到这个方法,可以看到,这个是真正取值的时候

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*blobStore).path() ./registry/storage/blobstore.go:120 (PC: 0xb9ab93)
    115: })
    116: }
    117:
    118: // path returns the canonical path for the blob identified by digest. The blob
    119: // may or may not exist.
    => 120: func (bs *blobStore) path(dgst digest.Digest) (string, error) {
    121: bp, err := pathFor(blobDataPathSpec{
    122: digest: dgst,
    123: })
    124:
    125: if err != nil {
    # ...
    (dlv) s
    > github.com/docker/distribution/registry/storage.pathFor() ./registry/storage/paths.go:221 (PC: 0xbb2b46)
    216: }
    217:
    218: blobPathPrefix := append(rootPrefix, "blobs")
    219: return path.Join(append(blobPathPrefix, components...)...), nil
    220: case blobDataPathSpec:
    => 221: components, err := digestPathComponents(v.digest, true)
    222: if err != nil {
    223: return "", err
    224: }
    225:
    226: components = append(components, "data")
    (dlv)
    # ...
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:45 (PC: 0xb98eca)
    40: case nil:
    41: // Redirect to storage URL.
    42: http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
    43: return err
    44:
    => 45: case driver.ErrUnsupportedMethod:
    46: // Fallback to serving the content directly.
    47: default:
    48: // Some unexpected error.
    49: return err
    50: }
    (dlv) s
    DEBU[63862] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=54.521µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=e7d8e37c-ff6b-4eeb-ab84-8afe24864518 trace.line=155 version="v2.8.3+unknown"
    > github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:39 (PC: 0xb98ef9)
    34: return err
    35: }
    36:
    37: if bs.redirect {
    38: redirectURL, err := bs.driver.URLFor(ctx, path, map[string]interface{}{"method": r.Method})
    => 39: switch err.(type) {
    40: case nil:
    41: // Redirect to storage URL.
    42: http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
    43: return err
    44:
    (dlv) s
    > github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:53 (PC: 0xb98fc2)
    48: // Some unexpected error.
    49: return err
    50: }
    51: }
    52:
    => 53: br, err := newFileReader(ctx, bs.driver, path, desc.Size)
    54: if err != nil {
    55: return err
    56: }
    57: defer br.Close()
    58:
    (dlv)
  • 文件传输

    hider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    (dlv) l
    > github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:76 (PC: 0xb99630)
    71: if w.Header().Get("Content-Length") == "" {
    72: // Set the content length if not already set.
    73: w.Header().Set("Content-Length", fmt.Sprint(desc.Size))
    74: }
    75:
    => 76: http.ServeContent(w, r, desc.Digest.String(), time.Time{}, br)
    77: return nil
    78: }
    (dlv) s
    DEBU[64560] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=70.19µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=b8c1cf63-9f39-4805-9b4b-5258f1c2be67 trace.line=155 version="v2.8.3+unknown"
    > net/http.ServeContent() /usr/local/go/src/net/http/fs.go:240 (PC: 0x872553)
    235: // handling an invalid range request), ServeContent responds with an
    236: // error message. By default, ServeContent strips the Cache-Control,
    237: // Content-Encoding, ETag, and Last-Modified headers from error responses.
    238: // The GODEBUG setting httpservecontentkeepheaders=1 causes ServeContent
    239: // to preserve these headers.
    => 240: func ServeContent(w ResponseWriter, req *Request, name string, modtime time.Time, content io.ReadSeeker) {
    241: sizeFunc := func() (int64, error) {
    242: size, err := content.Seek(0, io.SeekEnd)
    243: if err != nil {
    244: return 0, errSeeker
    245: }
    (dlv)
    # ...
    (dlv) l
    > io.CopyN() /usr/local/go/src/io/io.go:363 (PC: 0x4a29d3)
    358: // It returns the number of bytes copied and the earliest
    359: // error encountered while copying.
    360: // On return, written == n if and only if err == nil.
    361: //
    362: // If dst implements [ReaderFrom], the copy is implemented using it.
    => 363: func CopyN(dst Writer, src Reader, n int64) (written int64, err error) {
    364: written, err = Copy(dst, LimitReader(src, n))
    365: if written == n {
    366: return n, nil
    367: }
    368: if written < n && err == nil {
    (dlv)