概述

  • 本系列将以 v2.8.3 为例来进行分析

  • 本地编译:由于老版本的 distibution 不是用 go.mod 运行,而是 GOPATH 的编译运行方式,因此需要做以下设置才可正常按照 Building the registry source 编译代码

    1. 禁用 Go Modules: export GO111MODULE=off

    2. 通过 go env 查看 GOPATH,比如 /root/go

    3. 下载 distribution/distribution 代码,按以下结构放入 GOPATH,即 /root/go/src/github.com/docker/distribution

      1
      2
      3
      4
      5
      6
      7
      $GOPATH/
      └── src/
      └── github.com/
      └── user/
      └── project/
      ├── main.go
      └── vendor/
    4. 编译运行 make,即可在 bin 文件夹下得到二进制文件

  • 镜像编译:已经提供了 Dockerfile,直接 docker build -t registry:local . 即可

    • 其中用到了 alpine 下载部分软件包,可以做如下修改,加快构建

      1
      2
      -RUN apk add --no-cache bash coreutils file git
      +RUN echo -e 'https://mirrors.aliyun.com/alpine/v3.18/main/\nhttps://mirrors.aliyun.com/alpine/v3.18/community/'>/etc/apk/repositories && apk add --no-cache bash coreutils file git

distribution 介绍

服务入口

Dockerfile 中可以看到,主服务位于 cmd/registry/main.go 中,并利用 ENTRYPOINTCMD 实现读取配置文件并启动(配置文件中的一些默认值在调用相应函数才会赋值,并不在统一的地方,感觉这点还是 harbor 写法更优秀)

1
2
3
4
# Dockerfile
# ...
ENTRYPOINT ["registry"]
CMD ["serve", "/etc/docker/registry/config.yml"]

其中服务端的调用逻辑实现方式如下,其中 ServeCmd 会解析配置文件,并启动一个 server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// cmd/registry/main.go
func main() {
registry.RootCmd.Execute()
}

// registry/root.go
func init() {
RootCmd.AddCommand(ServeCmd)
// ...
}

// registry/registry.go
var ServeCmd = &cobra.Command {
// ...
config, err := resolveConfiguration(args)
registry, err := NewRegistry(ctx, config)
if err = registry.ListenAndServe(); err != nil {
log.Fatalln(err)
}
// ...
}

其中比较重要的就是配置文件解析 resolveConfiguration 及服务注册 NewRegistry,以下是服务注册的关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// registry/registry.go
// NewRegistry creates a new registry from a context and configuration struct.
func NewRegistry(ctx context.Context, config *configuration.Configuration) (*Registry, error) {
// ...
app := handlers.NewApp(ctx, config)
// TODO(aaronl): The global scope of the health checks means NewRegistry
// can only be called once per process.
app.RegisterHealthChecks()
handler := configureReporting(app)
handler = alive("/", handler)
handler = health.Handler(handler)
handler = panicHandler(handler)

server := &http.Server{
Handler: handler,
}

return &Registry{
app: app,
config: config,
server: server,
}, nil
}

其中 NewApp 用于真正处理请求,分为以下几个步骤

  1. 注册分发器,用于处理不同的请求,分发器的实际处理代码位于 registry/api/v2/descriptors.go 中,对应的 handler 位于 registry/handlers/{handler}.go
  2. 通过配置文件配置相应的组件

具体代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// registry/handlers/app.go
// NewApp takes a configuration and returns a configured app, ready to serve
// requests. The app only implements ServeHTTP and can be wrapped in other
// handlers accordingly.
func NewApp(ctx context.Context, config *configuration.Configuration) *App {
app := &App{
Config: config,
Context: ctx,
router: v2.RouterWithPrefix(config.HTTP.Prefix),
isCache: config.Proxy.RemoteURL != "",
}

// Register the handler dispatchers.
app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
return http.HandlerFunc(apiBase)
})
app.register(v2.RouteNameManifest, manifestDispatcher)
app.register(v2.RouteNameCatalog, catalogDispatcher)
app.register(v2.RouteNameTags, tagsDispatcher)
app.register(v2.RouteNameBlob, blobDispatcher)
app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)

// override the storage driver's UA string for registry outbound HTTP requests
storageParams := config.Storage.Parameters()
if storageParams == nil {
storageParams = make(configuration.Parameters)
}
storageParams["useragent"] = fmt.Sprintf("docker-distribution/%s %s", version.Version, runtime.Version())

var err error
app.driver, err = factory.Create(config.Storage.Type(), storageParams)
if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via
// a health check.
panic(err)
}

purgeConfig := uploadPurgeDefaultConfig()
if mc, ok := config.Storage["maintenance"]; ok {
if v, ok := mc["uploadpurging"]; ok {
purgeConfig, ok = v.(map[interface{}]interface{})
if !ok {
panic("uploadpurging config key must contain additional keys")
}
}
if v, ok := mc["readonly"]; ok {
readOnly, ok := v.(map[interface{}]interface{})
if !ok {
panic("readonly config key must contain additional keys")
}
if readOnlyEnabled, ok := readOnly["enabled"]; ok {
app.readOnly, ok = readOnlyEnabled.(bool)
if !ok {
panic("readonly's enabled config key must have a boolean value")
}
}
}
}

startUploadPurger(app, app.driver, dcontext.GetLogger(app), purgeConfig)

app.driver, err = applyStorageMiddleware(app.driver, config.Middleware["storage"])
if err != nil {
panic(err)
}

app.configureSecret(config)
app.configureEvents(config)
app.configureRedis(config)
app.configureLogHook(config)

options := registrymiddleware.GetRegistryOptions()
if config.Compatibility.Schema1.TrustKey != "" {
app.trustKey, err = libtrust.LoadKeyFile(config.Compatibility.Schema1.TrustKey)
if err != nil {
panic(fmt.Sprintf(`could not load schema1 "signingkey" parameter: %v`, err))
}
} else {
// Generate an ephemeral key to be used for signing converted manifests
// for clients that don't support schema2.
app.trustKey, err = libtrust.GenerateECP256PrivateKey()
if err != nil {
panic(err)
}
}

options = append(options, storage.Schema1SigningKey(app.trustKey))

if config.Compatibility.Schema1.Enabled {
options = append(options, storage.EnableSchema1)
}

if config.HTTP.Host != "" {
u, err := url.Parse(config.HTTP.Host)
if err != nil {
panic(fmt.Sprintf(`could not parse http "host" parameter: %v`, err))
}
app.httpHost = *u
}

if app.isCache {
options = append(options, storage.DisableDigestResumption)
}

// configure deletion
if d, ok := config.Storage["delete"]; ok {
e, ok := d["enabled"]
if ok {
if deleteEnabled, ok := e.(bool); ok && deleteEnabled {
options = append(options, storage.EnableDelete)
}
}
}

// configure redirects
var redirectDisabled bool
if redirectConfig, ok := config.Storage["redirect"]; ok {
v := redirectConfig["disable"]
switch v := v.(type) {
case bool:
redirectDisabled = v
default:
panic(fmt.Sprintf("invalid type for redirect config: %#v", redirectConfig))
}
}
if redirectDisabled {
dcontext.GetLogger(app).Infof("backend redirection disabled")
} else {
options = append(options, storage.EnableRedirect)
}

if !config.Validation.Enabled {
config.Validation.Enabled = !config.Validation.Disabled
}

// configure validation
if config.Validation.Enabled {
if len(config.Validation.Manifests.URLs.Allow) == 0 && len(config.Validation.Manifests.URLs.Deny) == 0 {
// If Allow and Deny are empty, allow nothing.
options = append(options, storage.ManifestURLsAllowRegexp(regexp.MustCompile("^$")))
} else {
if len(config.Validation.Manifests.URLs.Allow) > 0 {
for i, s := range config.Validation.Manifests.URLs.Allow {
// Validate via compilation.
if _, err := regexp.Compile(s); err != nil {
panic(fmt.Sprintf("validation.manifests.urls.allow: %s", err))
}
// Wrap with non-capturing group.
config.Validation.Manifests.URLs.Allow[i] = fmt.Sprintf("(?:%s)", s)
}
re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Allow, "|"))
options = append(options, storage.ManifestURLsAllowRegexp(re))
}
if len(config.Validation.Manifests.URLs.Deny) > 0 {
for i, s := range config.Validation.Manifests.URLs.Deny {
// Validate via compilation.
if _, err := regexp.Compile(s); err != nil {
panic(fmt.Sprintf("validation.manifests.urls.deny: %s", err))
}
// Wrap with non-capturing group.
config.Validation.Manifests.URLs.Deny[i] = fmt.Sprintf("(?:%s)", s)
}
re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Deny, "|"))
options = append(options, storage.ManifestURLsDenyRegexp(re))
}
}
}

// configure storage caches
if cc, ok := config.Storage["cache"]; ok {
v, ok := cc["blobdescriptor"]
if !ok {
// Backwards compatible: "layerinfo" == "blobdescriptor"
v = cc["layerinfo"]
}

switch v {
case "redis":
if app.redis == nil {
panic("redis configuration required to use for layerinfo cache")
}
cacheProvider := rediscache.NewRedisBlobDescriptorCacheProvider(app.redis)
localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
if err != nil {
panic("could not create registry: " + err.Error())
}
dcontext.GetLogger(app).Infof("using redis blob descriptor cache")
case "inmemory":
cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider()
localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
if err != nil {
panic("could not create registry: " + err.Error())
}
dcontext.GetLogger(app).Infof("using inmemory blob descriptor cache")
default:
if v != "" {
dcontext.GetLogger(app).Warnf("unknown cache type %q, caching disabled", config.Storage["cache"])
}
}
}

if app.registry == nil {
// configure the registry if no cache section is available.
app.registry, err = storage.NewRegistry(app.Context, app.driver, options...)
if err != nil {
panic("could not create registry: " + err.Error())
}
}

app.registry, err = applyRegistryMiddleware(app, app.registry, config.Middleware["registry"])
if err != nil {
panic(err)
}

authType := config.Auth.Type()

if authType != "" && !strings.EqualFold(authType, "none") {
accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters())
if err != nil {
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
}
app.accessController = accessController
dcontext.GetLogger(app).Debugf("configured %q access controller", authType)
}

// configure as a pull through cache
if config.Proxy.RemoteURL != "" {
app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy)
if err != nil {
panic(err.Error())
}
app.isCache = true
dcontext.GetLogger(app).Info("Registry configured as a proxy cache to ", config.Proxy.RemoteURL)
}
var ok bool
app.repoRemover, ok = app.registry.(distribution.RepositoryRemover)
if !ok {
dcontext.GetLogger(app).Warnf("Registry does not implement RempositoryRemover. Will not be able to delete repos and tags")
}

return app
}

其他

  • distribution 的 tag 和 commit 关系比较迷惑,v3.0.0 有一些 2020 年的 commit,但是 v2.8.3 却没有
  • distribution 默认的配置启动路径是 /etc/docker/registry/config.ymlv3.0.0-beta.1 改为 /etc/distribution/config.yml),但 harbor 用的时候改为了 /etc/registry/config.yml,见 goharbor/harbor

参考