Kubernetes ReplicaSet controller 详解

与 ReplicationController 区别

  • ReplicaSet 支持 Set-based requirement 和 Equality-based requirement 类型的 Label selectors,而 ReplicationController 只支持 Equality-based requirement 类型的 Label selectors
  • ReplicaSet 会通过 ownerReference 接管 Pod,而 ReplicationController 只通过 Label selectors 接管 Pod。

Read more...

2020-03-03

Kubernetes Deployment Controller 详解

创建与启动

  • ctx.AvailableResources:可用的 GVR,由 cmd/kube-controller-manager/app.GetAvailableResources 通过 pkg/controller.SimpleControllerClientBuilder 创建的 client-go/kubernetes.Clientset 调用 kube-apiserver 的接口获得。
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
        return nil, false, nil
    }
    // 创建控制器
    dc, err := deployment.NewDeploymentController(
        ctx.InformerFactory.Apps().V1().Deployments(),
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    }
    // 启动控制器
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    return nil, true, nil
}

Read more...

2020-02-22

Kubernetes API 资源对象的版本控制

一个 API 资源对象的 Schema 的唯一标识由 apiVersion 和 kind 组成,其中 apiVersion 又分为 Group 和 Version。例如:

Group    Version    Kind
apps     v1         Deployment

Read more...

2020-02-17

golang sync.WaitGroup 底层实现

数据结构

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state, and the other 4 as storage
    // for the sema.
    state1 [3]uint32
}
  • noCopy:用于 go vet 检查 sync.WaitGroup 类型变量是否采用了值传递的方式
    • 如果采用了值传递,go vet 检查会抛出错误:call of foo copies lock value: sync.WaitGroup contains sync.noCopy
    • 因为如果采用值传递,那么 state1 就会被复制一份,而对应的信号量并不会跟着复制,所以值传递后复制出来的是一个不可用的 WaitGroup
  • state1:12字节内存
    • 4字节用于 wg.ADD 和 wg.Done 的计数
    • 4字节用于 wg.Wait 的计数
    • 4字节用于信号量的唤醒和等待


Read more...

2020-02-15

Kubernetes ApiServer 并发安全机制

问题定义

笔者在这篇文章中想要探讨的问题是:当向 Kubernetes ApiServer 并发的发起多个请求,对同一个 API 资源对象进行更新时,Kubernetes ApiServer 是如何确保更新不丢失,以及如何做冲突检测的?


Read more...

2020-01-30

Kubernetes ControllerManager 源码学习

Kubernetes 版本:1.15.7

服务的创建

入口文件:cmd/kube-controller-manager/app/controllermanager.go,启动的主要逻辑都在该文件的 Run 方法中。

  1. 如果开启了 –leader-elect 参数,添加对应的健康检查,该功能是用于启动多个 kube-controller-manager 实例时,选举出一个领导者负责执行控制循环
  2. 启动安全和非安全的 HTTP 服务,提供了健康检查、性能分析、查看配置的接口
  3. run 闭包函数,如果开启了 –leader-elect,会在 leaderelection.RunOrDie 中被调用,否则直接调用
    1. 准备 rootClientBuilder 和 clientBuilder
    2. 创建服务的主要配置 app.ControllerContext
      1. 创建 Informer 工厂
        1. InformerFactory:用于常规 API 资源对象的控制器
        2. GenericInformerFactory:用于 garbagecollector 和 resourcequota 控制器
      2. 创建 RESTMapper
      3. 创建 CloudProvider
    3. 创建 serviceAccountTokenController,并启动
    4. 创建其他的 Controller,并启动
    5. 启动 Informer:InformerFactory.Start(app.ControllerContext.Stop)、GenericInformerFactory.Start(app.ControllerContext.Stop)

Read more...

2020-01-01

Kubernetes APIServer 源码学习

Kubernetes 版本:1.15.7

服务的创建

入口文件:cmd/kube-apiserver/apiserver.go,主要函数 CreateServerChain 在 cmd/kube-apiserver/app/server.go 文件中,负责整个服务的构建,主要逻辑如下:

CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error)

  • CreateKubeAPIServerConfig
    • 创建核心服务配置
    • 参数:
    • completedOptions completedServerRunOptions
    • 返回值:
    • kubeAPIServerConfig *master.Config // genericapiserver.Config + master.ExtraConfig
    • insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo // 是否监听 localhost:8080,没有鉴权的接口地址
    • serviceResolver aggregatorapiserver.ServiceResolver // 扩展API的服务路由
    • pluginInitializers []admission.PluginInitializer // webhookPluginInitializer + kubePluginInitializer
    • admissionPostStartHook genericapiserver.PostStartHookFunc // discoveryRESTMapper.Reset()
    • lastErr error
  • createAPIExtensionsConfig
    • 创建提供 Custom Resources API 服务的配置
    • 参数:
    • kubeAPIServerConfig genericapiserver.Config // master.Config.GenericConfig
    • externalInformers kubeexternalinformers.SharedInformerFactory // master.Config.ExtraConfig.VersionedInformers
    • pluginInitializers []admission.PluginInitializer // webhookPluginInitializer + kubePluginInitializer
    • commandOptions *options.ServerRunOptions // completedOptions.ServerRunOptions
    • masterCount int, // completedOptions.MasterCount 控制节点的数量
    • serviceResolver webhook.ServiceResolver // 扩展API的服务路由
    • authResolverWrapper webhook.AuthenticationInfoResolverWrapper // webhook.AuthenticationInfoResolverDelegator
  • createAPIExtensionsServer
    • 创建 Custom Resources API 服务
    • 参数:
    • apiextensionsConfig *apiextensionsapiserver.Config
    • delegateAPIServer genericapiserver.DelegationTarget // genericapiserver.NewEmptyDelegate()
    • 返回值:
    • *apiextensionsapiserver.CustomResourceDefinitions // apiExtensionsServer
    • error
  • CreateKubeAPIServer
    • 创建核心服务
    • 参数:
    • kubeAPIServerConfig *master.Config
    • delegateAPIServer genericapiserver.DelegationTarget // apiExtensionsServer.GenericAPIServer
    • admissionPostStartHook genericapiserver.PostStartHookFunc // webhookPluginInitializer + kubePluginInitializer
    • 返回值:
    • *master.Master // kubeAPIServer
    • error
  • kubeAPIServer.GenericAPIServer.PrepareRun()
  • apiExtensionsServer.GenericAPIServer.PrepareRun()
    • 配置开启 swagger
    • 安装/healthz健康检查处理器
    • 设置在服务关闭之前调用 GenericAPIServer.AuditBackend.Shutdown()
  • createAggregatorConfig
    • 创建聚合服务配置
    • 参数:
    • kubeAPIServerConfig genericapiserver.Config
    • commandOptions *options.ServerRunOptions
    • externalInformers kubeexternalinformers.SharedInformerFactory // master.Config.ExtraConfig.VersionedInformers
    • serviceResolver aggregatorapiserver.ServiceResolver // 扩展API的服务路由
    • proxyTransport *http.Transport
    • pluginInitializers []admission.PluginInitializer // webhookPluginInitializer + kubePluginInitializer
    • 返回值:
    • *aggregatorapiserver.Config // aggregatorConfig
    • error
  • createAggregatorServer
    • 创建聚合服务
    • 参数:
    • aggregatorConfig *aggregatorapiserver.Config
    • delegateAPIServer genericapiserver.DelegationTarget // kubeAPIServer.GenericAPIServer
    • apiExtensionInformers apiextensionsinformers.SharedInformerFactory // apiExtensionsServe.Informers
    • 返回值:
    • *aggregatorapiserver.APIAggregator // aggregatorServer
    • error
  • BuildInsecureHandlerChain
    • 监听不安全的localhost:8080地址
    • 参数:
    • apiHandler http.Handler, // aggregatorServer.GenericAPIServer.UnprotectedHandler()
    • c *server.Config // master.Config.GenericConfig
    • 返回值:
    • http.Handler // insecureHandlerChain
  • insecureServingInfo.Serve
    • 启动监听localhost:8080的服务
  • aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
  • preparedGenericAPIServer.NonBlockingRun(stopCh <-chan struct{})
    • 聚合服务启动
    • preparedGenericAPIServer.AuditBackend.Run(auditStopCh)
    • preparedGenericAPIServer.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
      • server.RunServer(server *http.Server, ln net.Listener, shutDownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error)
    • preparedGenericAPIServer.RunPostStartHooks(stopCh)

CreateServerChain 创建的3个 Server,都采用了类似 config->complete()->completedConfig->new()->server 的调用完成,如下图:


Read more...

2019-12-27