【Distribution】02-mirror
本文主要解决以下问题,以当前 release 最新版 v2.8.3 源码进行分析(最新 tag 为 v3.0.0-rc.2)
- distribution 作为 mirror (即pull cache) 的实现原理是什么?首次时数据怎么落盘,第二次本地有数据了,如何返回给客户端?
- storage 中 cache 中存的内容是什么?如何加快速度
- 如何查看 docker pull 时真正的请求地址
TL;DR
对于问题一
- 当接收到 client 请求时,以
blob为例,经过一系列调用,会调用GetBlob,如果是首次,则会回源proxy并将数据在写入磁盘后,再返回给 client,如果是非首次,那么直接从磁盘返回数据。如果配置了cache,那么每次执行前,都会尝试访问cache以加快索引
- 当接收到 client 请求时,以
对于问题二
本例中测试的 blob 会存入以下数据
hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17127.0.0.1:6379> keys *
1) "repository::library/alpine::blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
2) "repository::library/alpine::blobs"
3) "blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
127.0.0.1:6379> HGETALL "repository::library/alpine::blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
1) "mediatype"
2) "application/octet-stream"
127.0.0.1:6379> SMEMBERS "repository::library/alpine::blobs"
1) "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
127.0.0.1:6379> HGETALL "blobs::sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
1) "size"
2) "3418409"
3) "mediatype"
4) "application/octet-stream"
5) "digest"
6) "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885"
127.0.0.1:6379>因为对于请求 url,如果是首次,那么就会经过一系列计算逻辑,得到这些值,因此存入 redis 后,可快速构建 HTTP 响应返回 client,加快速度
对于问题三
- registry 启动后,可在
proxy.log查看请求日志
- registry 启动后,可在
在开始之前
官方文档 中关于 mirror 的运作原理如下,可以看到 mirror 在首次被请求时,会回源数据并缓存到本地存储,后续请求则会直接从 mirror 本地返回
How does it work?
The first time you request an image from your local registry mirror, it pulls the image from the public Docker registry and stores it locally before handing it back to you. On subsequent requests, the local registry mirror is able to serve the image from its own storage.
What if the content changes on the Hub?
When a pull is attempted with a tag, the Registry checks the remote to ensure if it has the latest version of the requested content. Otherwise, it fetches and caches the latest content.
What about my disk?
In environments with high churn rates, stale data can build up in the cache. When running as a pull through cache the Registry periodically removes old content to save disk space. Subsequent requests for removed content causes a remote fetch and local re-caching.
To ensure best performance and guarantee correctness the Registry cache should be configured to use the
filesystemdriver for storage.
配置上,则是需要在配置文件中加下如下内容,才可作为 mirror 使用
The easiest way to run a registry as a pull through cache is to run the official Registry image. At least, you need to specify
proxy.remoteurlwithin/etc/distribution/config.ymlas described in the following subsection.Multiple registry caches can be deployed over the same back-end. A single registry cache ensures that concurrent requests do not pull duplicate data, but this property does not hold true for a registry cache cluster.
1 | proxy: |
但 pull 请求的服务入口是什么?请求的流转路径是什么,这些都没有详细的解释,下面将通过源码剖析的方式进行分析。其中一些基础的概念已在 【Distribution】01-概述 中,此处不再赘述。
镜像 pull
从 镜像仓库Harbor Pull实现原理 了解到,对于 distribution, 镜像 pull 会依次调用以下接口
/v2/*/manifests/*/v2/*/blobs/*
相应的接口文档可在 docs/spec/api.md 找到,后面将根据调用链进行源码分析。
初始化
在此之前,作为 mirror 使用的 distribution,有一些初始化动作,需要了解清楚,源码如下
1 | // registry/handlers/app.go |
其中,调用了 NewRegistryPullThroughCache 初始化 mirror 的一些行为,源码如下,主要工作是
- 初始化了
scheduler-state.json,用于存放blob和manifest的过期行为
1 | // registry/proxy/proxyregistry.go |
请求链路
在 【Distribution】01-概述 中已经提到
其中
NewApp用于真正处理请求,分为以下几个步骤
- 注册分发器,用于处理不同的请求,分发器的实际处理代码位于
registry/api/v2/descriptors.go中,对应的 handler 位于registry/handlers/{handler}.go中- 通过配置文件配置相应的组件
从 app.go 可以看到,针对 blob 获取注册相应的处理函数如下
1 | // registry/handlers/app.go |
首先是在 routeDescriptors 中定义了相关的行为,针对 pull 场景,就是 RouteNameBlob,大体分为 GET 和 DELETE 两个操作,其中 GET 又分为 Fetch Blob 和 Fetch Blob Part ,根据描述可以知道,两者的使用场景来源于客户端的请求方式,因此一般情况下,会走到 Fetch Blob 的逻辑,后面也会以这个进行分析
Name: “Fetch Blob Part”,
Description: “This endpoint may also support RFC7233 compliant range requests. Support can be detected by issuing a HEAD request. If the header
Accept-Range: bytesis returned, range requests can be used to fetch partial content.”,
相关源码如下
1 | // registry/api/v2/descriptors.go |
再看 blobDispatcher 代码如下
1 | // registry/handlers/blob.go |
可以看到,其中关键是调用了 GetBlob 函数,如下
1 | // registry/handlers/blob.go |
精彩的来了,blobs := bh.Repository.Blobs(bh) 从这里开始用了大量的封装抽象
首先是
bh,即blobHandler,定义如下,可以看到是一个嵌套结构体,因此可以直接使用Context定义的字段在 Go 中,嵌入一个结构体意味着可以在外部结构体中直接访问嵌入的结构体的字段和方法,而不需要显式地使用嵌入结构体的名称
hider 1
2
3
4
5
6
7// registry/handlers/blob.go
// blobHandler serves http blob requests.
type blobHandler struct {
*Context
Digest digest.Digest
}其中
Context是结构体,内含Repository字段hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// registry/handlers/context.go
// Context should contain the request specific context for use in across
// handlers. Resources that don't need to be shared across handlers should not
// be on this object.
type Context struct {
// App points to the application structure that created this context.
*App
context.Context
// Repository is the repository for the current request. All requests
// should be scoped to a single repository. This field may be nil.
Repository distribution.Repository
// RepositoryRemover provides method to delete a repository
RepositoryRemover distribution.RepositoryRemover
// Errors is a collection of errors encountered during the request to be
// returned to the client API. If errors are added to the collection, the
// handler *must not* start the response via http.ResponseWriter.
Errors errcode.Errors
urlBuilder *v2.URLBuilder
// TODO(stevvooe): The goal is too completely factor this context and
// dispatching out of the web application. Ideally, we should lean on
// context.Context for injection of these resources.
}Repository则是一个interface,定义如下hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// registry.go
// Repository is a named collection of manifests and layers.
type Repository interface {
// Named returns the name of the repository.
Named() reference.Named
// Manifests returns a reference to this repository's manifest service.
// with the supplied options applied.
Manifests(ctx context.Context, options ...ManifestServiceOption) (ManifestService, error)
// Blobs returns a reference to this repository's blob service.
Blobs(ctx context.Context) BlobStore
// TODO(stevvooe): The above BlobStore return can probably be relaxed to
// be a BlobService for use with clients. This will allow such
// implementations to avoid implementing ServeBlob.
// Tags returns a reference to this repositories tag service
Tags(ctx context.Context) TagService
}
到了这一步,能够知道,为了支持不同场景,因此只需要实现 Blobs 方法,即可以用接口调用,此时,全局搜索可以发现,一共有以下几个方法
notifications/listener.goregistry/client/repository.goregistry/proxy/proxyregistry.goregistry/storage/registry.go
1 | // notifications/listener.go |
尽管作为 mirror 时,能很明显的知道,实际调用了 registry/proxy/proxyregistry.go,但是需要弄清楚的是
- 为什么
Blobs方法的实现中,入参是context.Context,但是实际传入的却是blobHandler - 何时进行了定义传入
对于第一个问题,是利用了 Golang 中的 类型嵌套,即
结构体可以嵌套其他结构体。如果一个结构体嵌套了一个接口(如
context.Context),那么它就可以被当作该接口的实现来使用
因此 blobHandler 嵌套了 *Context, 而 Context 结构体嵌套了 context.Context,因此 *blobHandler 也可以作为 context.Context 类型传递。具体解释可看 Embedding interfaces in structs
对于第二个问题,往回找发现 初始化 中根据不同条件,对 app.registry 进行了不同的赋值,而对于 mirror 来说,即是如下代码。可以看到,如果设定为 mirror 则优先级最高
1 | // registry/handlers/app.go |
上面还只是设定了 ctx,真正生效则是在接收请求时,才会真正执行初始化(这个通过 delve 调试能比较好的找到,可以用 b app.go:655),代码如下
1 | // registry/handlers/app.go |
因此,当请求到达时,blobDispatcher 能够动态获取到 Repository 的定义为 proxyingRegistry 下的 Repository,后面就是走 proxy 相关逻辑进行处理了
获取到 blobs 后,则调用 desc, err := blobs.Stat(bh, bh.Digest) 获取 blob 相关的信息,主要是获取 digest, size 及 mediatype,desc 具体内容如下
1 | github.com/docker/distribution.Descriptor { |
这里 Stat 会根据 blobs 的定义 notifications.(*blobServiceListener).Stat() ,并根据 ctx 进入 registry/proxy.(*proxyBlobStore).Stat(),代码如下,可以看到,先尝试本地是否存在,如果不存在则回源
1 | // registry/proxy/proxyblobstore.go |
这里为什么还得看下
本地则先会调用 registry/storage.(*linkedBlobStore).Stat()
1 | // registry/storage/linkedblobstore.go |
这里为什么还得看下
随即调用 registry/storage/cache.(*cachedBlobStatter).Stat(),这个函数在后续会经常调用(即 cache 处理)
1 | func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { |
delve 调试
下面使用 delve 进行 debug,关于 delve 的详细使用,可查看 【Go】07-调试工具 delve
调试方法
1 | 1. debug 启动调试 |
GetBlob desc, err := blobs.Stat(bh, bh.Digest)
请求到达后,会根据
ctx和dgst首先查看 cache 中是否有相应的文件信息,如果没有,会查询一遍后,信息如下(以下是走到了 cache miss 的逻辑)hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21(dlv) l
github.com/docker/distribution/registry/storage/cache.(*cachedBlobStatter).Stat() ./registry/storage/cache/cachedblobdescriptorstore.go:87 (PC: 0xb74ff6)
82: cacheCount.WithValues("Miss").Inc(1)
83: if cbds.tracker != nil {
84: cbds.tracker.Miss()
85: }
86: desc, err = cbds.backend.Stat(ctx, dgst)
=> 87: if err != nil {
88: return desc, err
89: }
90:
91: if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
92: logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err)
(dlv) p desc
github.com/docker/distribution.Descriptor {
MediaType: "application/octet-stream",
Size: 3418409,
Digest: "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885",
URLs: []string len: 0, cap: 0, nil,
Annotations: map[string]string nil,
Platform: *github.com/docker/distribution/vendor/github.com/opencontainers/image-spec/specs-go/v1.Platform nil,}如果是本地缓存没有,那么就会访问 proxy,并写在本地
hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369(dlv) l
github.com/docker/distribution/registry/storage/driver/filesystem.(*driver).GetContent() /root/go/src/github.com/docker/distribution/registry/storage/driver/filesystem/driver.go:119 (PC: 0xd43a96)
114: func (d *driver) Name() string {
115: return driverName
116: }
117:
118: // GetContent retrieves the content stored at "path" as a []byte.
=> 119: func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
120: rc, err := d.Reader(ctx, path, 0)
121: if err != nil {
122: return nil, err
123: }
124: defer rc.Close()
(dlv)
...
(dlv) l
github.com/docker/distribution/registry/storage/driver/filesystem.(*driver).Reader() /root/go/src/github.com/docker/distribution/registry/storage/driver/filesystem/driver.go:151 (PC: 0xd44456)
146: return writer.Commit()
147: }
148:
149: // Reader retrieves an io.ReadCloser for the content stored at "path" with a
150: // given byte offset.
=> 151: func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
152: file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
153: if err != nil {
154: if os.IsNotExist(err) {
155: return nil, storagedriver.PathNotFoundError{Path: path}
156: }
(dlv)
...
(dlv) l
github.com/docker/distribution/registry/storage/driver/filesystem.(*driver).Reader() /root/go/src/github.com/docker/distribution/registry/storage/driver/filesystem/driver.go:153 (PC: 0xd44560)
148:
149: // Reader retrieves an io.ReadCloser for the content stored at "path" with a
150: // given byte offset.
151: func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
152: file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
=> 153: if err != nil {
154: if os.IsNotExist(err) {
155: return nil, storagedriver.PathNotFoundError{Path: path}
156: }
157:
158: return nil, err
(dlv) locals
err = error(*io/fs.PathError) 0xc0003e0e48
file = *os.File nil
...
(dlv) l
github.com/docker/distribution/registry/storage.(*blobStore).readlink() /root/go/src/github.com/docker/distribution/registry/storage/blobstore.go:144 (PC: 0xb9b009)
139:
140: // readlink returns the linked digest at path.
141: func (bs *blobStore) readlink(ctx context.Context, path string) (digest.Digest, error) {
142: content, err := bs.driver.GetContent(ctx, path)
143: if err != nil {
=> 144: return "", err
145: }
146:
147: linked, err := digest.Parse(string(content))
148: if err != nil {
149: return "", err
(dlv) locals
content = []uint8 len: 0, cap: 0, nil
err = error(github.com/docker/distribution/registry/storage/driver.PathNotFoundError) {Path: "/docker/registry/v2/repositories/library/alpine/_layers/sha256/4...+68 more", DriverName: "filesystem"}
(dlv) p err
error(github.com/docker/distribution/registry/storage/driver.PathNotFoundError) {
Path: "/docker/registry/v2/repositories/library/alpine/_layers/sha256/44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885/link",
DriverName: "filesystem",}
(dlv)
...
(dlv) l
github.com/docker/distribution/registry/storage.(*linkedBlobStatter).Stat() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:395 (PC: 0xbaa5ca)
390: if err == nil {
391: found = true
392: break // success!
393: }
394:
=> 395: switch err := err.(type) {
396: case driver.PathNotFoundError:
397: // do nothing, just move to the next linkPathFn
398: default:
399: return distribution.Descriptor{}, err
400: }
(dlv)
...
(dlv) s
github.com/docker/distribution/registry/storage/cache.(*cachedBlobStatter).Stat() /root/go/src/github.com/docker/distribution/registry/storage/cache/cachedblobdescriptorstore.go:86 (PC: 0xb74f3a)
Values returned:
~r0: github.com/docker/distribution.Descriptor {
MediaType: "",
Size: 0,
Digest: "",
URLs: []string len: 0, cap: 0, nil,
Annotations: map[string]string nil,
Platform: *github.com/docker/distribution/vendor/github.com/opencontainers/image-spec/specs-go/v1.Platform nil,}
~r1: error(*errors.errorString) *{
s: "unknown blob",}
81: fallback:
82: cacheCount.WithValues("Miss").Inc(1)
83: if cbds.tracker != nil {
84: cbds.tracker.Miss()
85: }
=> 86: desc, err = cbds.backend.Stat(ctx, dgst)
87: if err != nil {
88: return desc, err
89: }
90:
91: if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
(dlv)
...
(dlv) l
github.com/docker/distribution/registry/proxy.(*proxyBlobStore).Stat() /root/go/src/github.com/docker/distribution/registry/proxy/proxyblobstore.go:169 (PC: 0xbe318e)
164:
165: if err != distribution.ErrBlobUnknown {
166: return distribution.Descriptor{}, err
167: }
168:
=> 169: if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
170: return distribution.Descriptor{}, err
171: }
172:
173: return pbs.remoteStore.Stat(ctx, dgst)
174: }
(dlv) s
DEBU[1994] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=39.592µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=e0d4c1b9-1659-4fad-a556-aeec6b74d624 trace.line=155 version="v2.8.3+unknown"
github.com/docker/distribution/registry/proxy.(*remoteAuthChallenger).tryEstablishChallenges() /root/go/src/github.com/docker/distribution/registry/proxy/proxyregistry.go:217 (PC: 0xbe7696)
212: func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
213: return r.cm
214: }
215:
216: // tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
=> 217: func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
218: r.Lock()
219: defer r.Unlock()
220:
221: remoteURL := r.remoteURL
222: remoteURL.Path = "/v2/"
(dlv)
tryEstablishChallenges 只是尝试建立连接,还没有真正获取数据
真正获取数据,则是由后面的
(dlv) l
github.com/docker/distribution/registry/client.(*blobStatter).Stat() /root/go/src/github.com/docker/distribution/registry/client/repository.go:794 (PC: 0xbd7411)
789: func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
790: ref, err := reference.WithDigest(bs.name, dgst)
791: if err != nil {
792: return distribution.Descriptor{}, err
793: }
=> 794: u, err := bs.ub.BuildBlobURL(ref)
795: if err != nil {
796: return distribution.Descriptor{}, err
797: }
798:
799: resp, err := bs.client.Head(u)
(dlv)
获取数据前,需要指定 scope 和 生成 token
(dlv) l
github.com/docker/distribution/registry/client/auth.(*tokenHandler).getToken() /root/go/src/github.com/docker/distribution/registry/client/auth/session.go:278 (PC: 0xbda455)
273: defer th.tokenLock.Unlock()
274: scopes := make([]string, 0, len(th.scopes)+len(additionalScopes))
275: for _, scope := range th.scopes {
276: scopes = append(scopes, scope.String())
277: }
=> 278: var addedScopes bool
279: for _, scope := range additionalScopes {
280: if hasScope(scopes, scope) {
281: continue
282: }
283: scopes = append(scopes, scope)
(dlv)
通过 th.creds 中存储的账密像 proxy 获取 token
(dlv) l
github.com/docker/distribution/registry/client/auth.(*tokenHandler).fetchToken() /root/go/src/github.com/docker/distribution/registry/client/auth/session.go:494 (PC: 0xbdceb2)
489:
490: service := params["service"]
491:
492: var refreshToken string
493:
=> 494: if th.creds != nil {
495: refreshToken = th.creds.RefreshToken(realmURL, service)
496: }
497:
498: if refreshToken != "" || th.forceOAuth {
499: return th.fetchTokenWithOAuth(realmURL, refreshToken, service, scopes)
(dlv)
请求过程中则是将 username 和 password 进行 base64 加密
net/http.basicAuth() /usr/local/go/src/net/http/client.go:423 (PC: 0x869b60)
418: // "To receive authorization, the client sends the userid and password,
419: // separated by a single colon (":") character, within a base64
420: // encoded string in the credentials."
421: // It is not meant to be urlencoded.
422: func basicAuth(username, password string) string {
=> 423: auth := username + ":" + password
424: return base64.StdEncoding.EncodeToString([]byte(auth))
425: }
426:
427: // Get issues a GET to the specified URL. If the response is one of
428: // the following redirect codes, Get follows the redirect, up to a
(dlv)
发起请求获取数据
(dlv) l
github.com/docker/distribution/registry/client/transport.(*transport).RoundTrip() /root/go/src/github.com/docker/distribution/registry/client/transport/transport.go:56 (PC: 0xbc747f)
51: // access token. If no token exists or token is expired,
52: // tries to refresh/fetch a new token.
53: func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
54: req2 := cloneRequest(req)
55: for _, modifier := range t.Modifiers {
=> 56: if err := modifier.ModifyRequest(req2); err != nil {
57: return nil, err
58: }
59: }
60:
61: t.setModReq(req, req2)
(dlv)
数据存储前,会走到 cache 逻辑,此时 mirror 本地存储还没有数据
(dlv) l
github.com/docker/distribution/registry/storage/cache/memory.(*inMemoryBlobDescriptorCacheProvider).SetDescriptor() /root/go/src/github.com/docker/distribution/registry/storage/cache/memory/memory.go:55 (PC: 0xbc8ba9)
50:
51: func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
52: _, err := imbdcp.Stat(ctx, dgst)
53: if err == distribution.ErrBlobUnknown {
54:
=> 55: if dgst.Algorithm() != desc.Digest.Algorithm() && dgst != desc.Digest {
56: // if the digests differ, set the other canonical mapping
57: if err := imbdcp.global.SetDescriptor(ctx, desc.Digest, desc); err != nil {
58: return err
59: }
60: }
(dlv) n
DEBU[6818] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=36.344µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=7c42ff89-d43a-40f5-b91a-32d92f869377 trace.line=155 version="v2.8.3+unknown"
github.com/docker/distribution/registry/storage/cache/memory.(*inMemoryBlobDescriptorCacheProvider).SetDescriptor() /root/go/src/github.com/docker/distribution/registry/storage/cache/memory/memory.go:63 (PC: 0xbc8d32)
58: return err
59: }
60: }
61:
62: // unknown, just set it
=> 63: return imbdcp.global.SetDescriptor(ctx, dgst, desc)
64: }
65:
66: // we already know it, do nothing
67: return err
68: }
(dlv)
首次和二次 pull 走相同的逻辑,在 readlink 时出现不同的处理
(dlv) l
github.com/docker/distribution/registry/storage.(*blobStore).readlink() /root/go/src/github.com/docker/distribution/registry/storage/blobstore.go:144 (PC: 0xb9b009)
139:
140: // readlink returns the linked digest at path.
141: func (bs *blobStore) readlink(ctx context.Context, path string) (digest.Digest, error) {
142: content, err := bs.driver.GetContent(ctx, path)
143: if err != nil {
=> 144: return "", err
145: }
146:
147: linked, err := digest.Parse(string(content))
148: if err != nil {
149: return "", err
(dlv) s
DEBU[7454] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=36.833µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=40c6e2e6-6848-406b-b055-7111293ad295 trace.line=155 version="v2.8.3+unknown"
github.com/docker/distribution/registry/storage.(*linkedBlobStatter).resolveWithLinkFunc() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:449 (PC: 0xbaafc9)
Values returned:
~r0: ""
~r1: error(github.com/docker/distribution/registry/storage/driver.PathNotFoundError) {
Path: "/docker/registry/v2/repositories/library/alpine/_layers/sha256/44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885/link",
DriverName: "filesystem",}
444: blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
445: if err != nil {
446: return "", err
447: }
448:
=> 449: return lbs.blobStore.readlink(ctx, blobLinkPath)
450: }
451:
452: func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
453: // The canonical descriptor for a blob is set at the commit phase of upload
454: return nil
(dlv)
根据 uploadDataPathSpec ,获取数据写入到本地
(dlv) l
github.com/docker/distribution/registry/storage.(*linkedBlobStore).Create() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:156 (PC: 0xba7a55)
151: path, err := pathFor(uploadDataPathSpec{
152: name: lbs.repository.Named().Name(),
153: id: uuid,
154: })
155:
=> 156: if err != nil {
157: return nil, err
158: }
159:
160: startedAtPath, err := pathFor(uploadStartedAtPathSpec{
161: name: lbs.repository.Named().Name(),
(dlv)
写入前的最后准备工作
(dlv) locals
opts = github.com/docker/distribution.CreateOptions {Mount: (*"struct { ShouldMount bool; From github.com/docker/distribution/vendor/github.com/distribution/reference.Canonical; Stat *github.com/docker/distribution.Descriptor }")(0xc0002a0320)}
uuid = "fd96c777-cacd-4dbf-954b-17f98861599c"
startedAt = time.Time(2025-02-18T12:00:35Z){wall: 538346343, ext: 63875476835, loc: *time.Location nil}
err = error nil
path = "/docker/registry/v2/repositories/library/alpine/_uploads/fd96c77...+34 more"
startedAtPath = "/docker/registry/v2/repositories/library/alpine/_uploads/fd96c77...+39 more"
(dlv) p path
"/docker/registry/v2/repositories/library/alpine/_uploads/fd96c777-cacd-4dbf-954b-17f98861599c/data"
(dlv) p startedAtPath
"/docker/registry/v2/repositories/library/alpine/_uploads/fd96c777-cacd-4dbf-954b-17f98861599c/startedat"
(dlv)
lbs.blobStore.driver.PutContent 往 startedat 写入时间
(dlv) s
DEBU[8811] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=08f6ce04-c812-4ae8-aeb6-40b750dd355a service=registry trace.duration=16.772555154s trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=ad590e58-4273-4ea7-a6e1-d79fad18fca1 trace.line=155 version="v2.8.3+unknown"
github.com/docker/distribution/registry/storage.(*linkedBlobStore).Create() /root/go/src/github.com/docker/distribution/registry/storage/linkedblobstore.go:174 (PC: 0xba7d84)
169: // Write a startedat file for this upload
170: if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
171: return nil, err
172: }
173:
=> 174: return lbs.newBlobUpload(ctx, uuid, path, startedAt, false)
175: }
176:
177: func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
178: dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Resume")
179:
(dlv)
真正写入文件 lbs.newBlobUpload(ctx, uuid, path, startedAt, false)
(dlv) l
github.com/docker/distribution/registry/proxy.(*proxyBlobStore).copyContent() /root/go/src/github.com/docker/distribution/registry/proxy/proxyblobstore.go:46 (PC: 0xbe141e)
41: desc, err := pbs.remoteStore.Stat(ctx, dgst)
42: if err != nil {
43: return distribution.Descriptor{}, err
44: }
45:
=> 46: if w, ok := writer.(http.ResponseWriter); ok {
47: setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
48: }
49:
50: remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
51: if err != nil {
(dlv)
...
调用 io.CopyN(writer, remoteReader, desc.Size) 后,uuid data 才会有内容
(dlv) l
github.com/docker/distribution/registry/proxy.(*proxyBlobStore).copyContent() /root/go/src/github.com/docker/distribution/registry/proxy/proxyblobstore.go:57 (PC: 0xbe17a5)
52: return distribution.Descriptor{}, err
53: }
54:
55: defer remoteReader.Close()
56:
=> 57: _, err = io.CopyN(writer, remoteReader, desc.Size)
58: if err != nil {
59: return distribution.Descriptor{}, err
60: }
61:
62: proxyMetrics.BlobPush(uint64(desc.Size))
(dlv)
最后调用 moveBlob,从临时文件夹中移除
(dlv) l
github.com/docker/distribution/registry/storage.(*blobWriter).moveBlob() /root/go/src/github.com/docker/distribution/registry/storage/blobwriter.go:305 (PC: 0xb9e0b6)
300: if err != nil {
301: return err
302: }
303:
304: // Check for existence
=> 305: if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil {
306: switch err := err.(type) {
307: case storagedriver.PathNotFoundError:
308: break // ensure that it doesn't exist.
309: default:
310: return err
(dlv)如果本地缓存有,那么就会从本地直接返回
hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15走下面 err = nil 的逻辑
(dlv) l
github.com/docker/distribution/registry/storage.(*blobStore).readlink() /root/go/src/github.com/docker/distribution/registry/storage/blobstore.go:143 (PC: 0xb9b002)
138: }
139:
140: // readlink returns the linked digest at path.
141: func (bs *blobStore) readlink(ctx context.Context, path string) (digest.Digest, error) {
142: content, err := bs.driver.GetContent(ctx, path)
=> 143: if err != nil {
144: return "", err
145: }
146:
147: linked, err := digest.Parse(string(content))
148: if err != nil {
(dlv)随后将信息写入 cache,信息如下
hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65(dlv) l
github.com/docker/distribution/registry/storage/cache/memory.(*mapBlobDescriptorCache).SetDescriptor() ./registry/storage/cache/memory/memory.go:178 (PC: 0xbc9ef3)
173:
174: mbdc.mu.Lock()
175: defer mbdc.mu.Unlock()
176:
177: mbdc.descriptors[dgst] = desc
=> 178: return nil
179: }
(dlv) p mbdc
("*github.com/docker/distribution/registry/storage/cache/memory.mapBlobDescriptorCache")(0xc00036ec40)
*github.com/docker/distribution/registry/storage/cache/memory.mapBlobDescriptorCache {
descriptors: map[github.com/docker/distribution/vendor/github.com/opencontainers/go-digest.Digest]github.com/docker/distribution.Descriptor [
"sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885": (*"github.com/docker/distribution.Descriptor")(0xc0000a63a0),
],
mu: sync.RWMutex {
w: (*sync.Mutex)(0xc00036ec48),
writerSem: 0,
readerSem: 0,
readerCount: (*"sync/atomic.Int32")(0xc00036ec58),
readerWait: (*"sync/atomic.Int32")(0xc00036ec5c),},}
(dlv) p mbdc.descriptors
map[github.com/docker/distribution/vendor/github.com/opencontainers/go-digest.Digest]github.com/docker/distribution.Descriptor [
"sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885": {
MediaType: "application/octet-stream",
Size: 3418409,
Digest: "sha256:44cf07d57ee4424189f012074a59110ee2065adfdde9c7d9826bebdffce0a885",
URLs: []string len: 0, cap: 0, nil,
Annotations: map[string]string nil,
Platform: *github.com/docker/distribution/vendor/github.com/opencontainers/image-spec/specs-go/v1.Platform nil,},
]
(dlv) s
github.com/docker/distribution/registry/storage/cache/memory.(*inMemoryBlobDescriptorCacheProvider).SetDescriptor() ./registry/storage/cache/memory/memory.go:63 (PC: 0xbc8d9e)
Values returned:
~r0: error nil
58: return err
59: }
60: }
61:
62: // unknown, just set it
=> 63: return imbdcp.global.SetDescriptor(ctx, dgst, desc)
64: }
65:
66: // we already know it, do nothing
67: return err
68: }
(dlv) s
DEBU[53825] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=60.197µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=22bba5cd-3f4b-4609-8977-5ce26ad5bb7f trace.line=155 version="v2.8.3+unknown"
DEBU[53825] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=12.473µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=651c9e73-4c43-45f9-9134-5b9dbb5fcce6 trace.line=155 version="v2.8.3+unknown"
github.com/docker/distribution/registry/storage/cache.(*cachedBlobStatter).Stat() ./registry/storage/cache/cachedblobdescriptorstore.go:91 (PC: 0xb750bc)
Values returned:
~r0: error nil
86: desc, err = cbds.backend.Stat(ctx, dgst)
87: if err != nil {
88: return desc, err
89: }
90:
=> 91: if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
92: logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err)
93: }
94:
95: return desc, err
96:
GetBlob blobs.ServeBlob(bh, w, r, desc.Digest)
经过一系列调用,会到这个方法,可以看到,这个是真正取值的时候
hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70(dlv) l
github.com/docker/distribution/registry/storage.(*blobStore).path() ./registry/storage/blobstore.go:120 (PC: 0xb9ab93)
115: })
116: }
117:
118: // path returns the canonical path for the blob identified by digest. The blob
119: // may or may not exist.
=> 120: func (bs *blobStore) path(dgst digest.Digest) (string, error) {
121: bp, err := pathFor(blobDataPathSpec{
122: digest: dgst,
123: })
124:
125: if err != nil {
...
(dlv) s
github.com/docker/distribution/registry/storage.pathFor() ./registry/storage/paths.go:221 (PC: 0xbb2b46)
216: }
217:
218: blobPathPrefix := append(rootPrefix, "blobs")
219: return path.Join(append(blobPathPrefix, components...)...), nil
220: case blobDataPathSpec:
=> 221: components, err := digestPathComponents(v.digest, true)
222: if err != nil {
223: return "", err
224: }
225:
226: components = append(components, "data")
(dlv)
...
(dlv) l
github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:45 (PC: 0xb98eca)
40: case nil:
41: // Redirect to storage URL.
42: http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
43: return err
44:
=> 45: case driver.ErrUnsupportedMethod:
46: // Fallback to serving the content directly.
47: default:
48: // Some unexpected error.
49: return err
50: }
(dlv) s
DEBU[63862] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=54.521µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=e7d8e37c-ff6b-4eeb-ab84-8afe24864518 trace.line=155 version="v2.8.3+unknown"
github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:39 (PC: 0xb98ef9)
34: return err
35: }
36:
37: if bs.redirect {
38: redirectURL, err := bs.driver.URLFor(ctx, path, map[string]interface{}{"method": r.Method})
=> 39: switch err.(type) {
40: case nil:
41: // Redirect to storage URL.
42: http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
43: return err
44:
(dlv) s
github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:53 (PC: 0xb98fc2)
48: // Some unexpected error.
49: return err
50: }
51: }
52:
=> 53: br, err := newFileReader(ctx, bs.driver, path, desc.Size)
54: if err != nil {
55: return err
56: }
57: defer br.Close()
58:
(dlv)文件传输
hider 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40(dlv) l
github.com/docker/distribution/registry/storage.(*blobServer).ServeBlob() ./registry/storage/blobserver.go:76 (PC: 0xb99630)
71: if w.Header().Get("Content-Length") == "" {
72: // Set the content length if not already set.
73: w.Header().Set("Content-Length", fmt.Sprint(desc.Size))
74: }
75:
=> 76: http.ServeContent(w, r, desc.Digest.String(), time.Time{}, br)
77: return nil
78: }
(dlv) s
DEBU[64560] filesystem.Stat("/") environment=development go.version=go1.24.0 instance.id=5d53b068-4ba6-440a-8a3b-15a6a6dd7518 service=registry trace.duration=70.19µs trace.file="/root/go/src/github.com/docker/distribution/registry/storage/driver/base/base.go" trace.func="github.com/docker/distribution/registry/storage/driver/base.(*Base).Stat" trace.id=b8c1cf63-9f39-4805-9b4b-5258f1c2be67 trace.line=155 version="v2.8.3+unknown"
net/http.ServeContent() /usr/local/go/src/net/http/fs.go:240 (PC: 0x872553)
235: // handling an invalid range request), ServeContent responds with an
236: // error message. By default, ServeContent strips the Cache-Control,
237: // Content-Encoding, ETag, and Last-Modified headers from error responses.
238: // The GODEBUG setting httpservecontentkeepheaders=1 causes ServeContent
239: // to preserve these headers.
=> 240: func ServeContent(w ResponseWriter, req *Request, name string, modtime time.Time, content io.ReadSeeker) {
241: sizeFunc := func() (int64, error) {
242: size, err := content.Seek(0, io.SeekEnd)
243: if err != nil {
244: return 0, errSeeker
245: }
(dlv)
...
(dlv) l
io.CopyN() /usr/local/go/src/io/io.go:363 (PC: 0x4a29d3)
358: // It returns the number of bytes copied and the earliest
359: // error encountered while copying.
360: // On return, written == n if and only if err == nil.
361: //
362: // If dst implements [ReaderFrom], the copy is implemented using it.
=> 363: func CopyN(dst Writer, src Reader, n int64) (written int64, err error) {
364: written, err = Copy(dst, LimitReader(src, n))
365: if written == n {
366: return n, nil
367: }
368: if written < n && err == nil {
(dlv)




