本篇内容主要讲解“Kubernetes Replication Controller的结构定义是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Kubernetes Replication Controller的结构定义是什么”吧!
ReplicationManager就是ReplicationController控制器对象,方便在代码中和ReplicationController Resource API Object进行区分。下面代码是ReplicationManager的结构定义。
pkg/controller/replication/replication_controller.go:75 // ReplicationManager is responsible for synchronizing ReplicationController objects stored in the system with actual running pods. type ReplicationManager struct { kubeClient clientset.Interface podControl controller.PodControlInterface // internalPodInformer is used to hold a personal informer. If we're using // a normal shared informer, then the informer will be started for us. If // we have a personal informer, we must start it ourselves. If you start // the controller using NewReplicationManager(passing SharedInformer), this // will be null internalPodInformer cache.SharedIndexInformer // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error // A TTLCache of pod creates/deletes each rc expects to see. expectations *controller.UIDTrackingControllerExpectations // A store of replication controllers, populated by the rcController rcStore cache.StoreToReplicationControllerLister // Watches changes to all replication controllers rcController *cache.Controller // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods podController cache.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool lookupCache *controller.MatchingCache // Controllers that need to be synced queue workqueue.RateLimitingInterface // garbageCollectorEnabled denotes if the garbage collector is enabled. RC // manager behaves differently if GC is enabled. garbageCollectorEnabled bool }
重点对下面个几个对象介绍说明:
podControl: 提供Create/Delete Pod的操作接口。
burstReplicas: 每次批量Create/Delete Pods时允许并发的最大数量。
syncHandler: 真正执行Replica Sync的函数。
expectation: 维护的期望状态下的Pod的Uid Cache,并且提供了修正该Cache的接口。
rcStore: ReplicationController Resource对象的Indexer,数据由rcController提供和维护。
rcController: 用来watch 所有 ReplicationController Resource,watch到的change更新到rcStore中。
podStore: Pod的Indexer,数据由podController提供和维护。
podController: 用来watch所有Pod Resource,watch到的change更新到podStore中。
queue: 用来存放待sync的RC,是一个RateLimit类型的queue。
lookupCache: 提供Pod和RC匹配信息的cache,以提高查询效率。
看过我我的博文: Kubernetes ResourceQuota Controller内部实现原理及源码分析的可能有印象,里面也提到了controller manager是如何启动ResourceQuotaController的,ReplicationController也是一样的。在kube-controller-manager调用newControllerInitializers进行控制器初始化的时候,将startReplicationController注册进去了,用来启动ReplicationController控制器。
cmd/kube-controller-manager/app/controllermanager.go:224 func newControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefuleset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["certificatesigningrequests"] = startCSRController return controllers }
代码继续跟到startReplicationController,很简单,启动一个goroutine,调用replicationcontroller.NewReplicationManager创建一个ReplicationManager并执行其中Run方法开始工作。
cmd/kube-controller-manager/app/core.go:55 func startReplicationController(ctx ControllerContext) (bool, error) { go replicationcontroller.NewReplicationManager( ctx.InformerFactory.Pods().Informer(), ctx.ClientBuilder.ClientOrDie("replication-controller"), ResyncPeriod(&ctx.Options), replicationcontroller.BurstReplicas, int(ctx.Options.LookupCacheSizeForRC), ctx.Options.EnableGarbageCollector, ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) return true, nil }
上面分析到,controller-manager通过NewReplicationManager创建一个ReplicationManager对象,其实就是ReplicationController控制器。
pkg/controller/replication/replication_controller.go:122 // NewReplicationManager creates a replication manager func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) return newReplicationManager( eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}), podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) } pkg/controller/replication/replication_controller.go:132 // newReplicationManager configures a replication manager with the specified event recorder func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } rm := &ReplicationManager{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventRecorder, }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"), garbageCollectorEnabled: garbageCollectorEnabled, } rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) }, }, &v1.ReplicationController{}, // TODO: Can we have much longer period here? FullControllerResyncPeriod, cache.ResourceEventHandlerFuncs{ AddFunc: rm.enqueueController, UpdateFunc: rm.updateRC, // This will enter the sync loop and no-op, because the controller has been deleted from the store. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the controller. DeleteFunc: rm.enqueueController, }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rm.addPod, // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill // the most frequent pod update is status, and the associated rc will only list from local storage, so // it should be ok. UpdateFunc: rm.updatePod, DeleteFunc: rm.deletePod, }) rm.podStore.Indexer = podInformer.GetIndexer() rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController rm.podStoreSynced = rm.podController.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm }
newReplicationManager中主要配置ReplicationManager,比如:
通过workqueue.NewNamedRateLimitingQueue配置queue。
通过controller.NewUIDTrackingControllerExpectations配置expectations。
配置rcStore, podStore, rcController, podController。
配置syncHandler为rm.syncReplicationController,这个很重要,所以我单独列出来说。在后面会讲到,syncReplicationController就是做核心工作的的方法,可以说Replica的自动维护都是由它来完成的。
ReplicationManager创建好了,接下来得干活啦。Run方法就是干活的起步点,开始进行watching and syncing。
pkg/controller/replication/replication_controller.go:217 // Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting RC Manager") go rm.rcController.Run(stopCh) go rm.podController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(rm.worker, time.Second, stopCh) } if rm.internalPodInformer != nil { go rm.internalPodInformer.Run(stopCh) } <-stopCh glog.Infof("Shutting down RC Manager") rm.queue.ShutDown() }
watching
go rm.rcController.Run(stopCh)
负责watch all rc。
go rm.podController.Run(stopCh)
负责watch all pod。
syncing
启动workers数量的goroutine。
每个goroutine都不断循环执行rm.worker,每个循环之间停留1s。而rm.worker就是负责从queue中获取rc并调用syncHandler进行同步。
每个goroutine直到收到stopCh信号才结束。
下面是rcController和podController的Run方法实现,功能就是完成rc / pod的watch。
pkg/client/cache/controller.go:84 // Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() r.RunUntil(stopCh) wait.Until(c.processLoop, time.Second, stopCh) }
sync的关键实现,就在ReplicationManager的worker方法中,代码如下。
pkg/controller/replication/replication_controller.go:488 // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() { workFunc := func() bool { key, quit := rm.queue.Get() if quit { return true } defer rm.queue.Done(key) err := rm.syncHandler(key.(string)) if err == nil { rm.queue.Forget(key) return false } rm.queue.AddRateLimited(key) utilruntime.HandleError(err) return false } for { if quit := workFunc(); quit { glog.Infof("replication controller worker shutting down") return } } }
worker中的主要逻辑为:
从rm的RateLimited Queue中获取一个rc的key。
调用syncHandler Interface,对该rc进行sync。
在newReplicationManager时,通过rm.syncHandler = rm.syncReplicationController
注册syncHandler为syncReplicationController了。因此sync rc的逻辑就在syncReplicationController中了。
pkg/controller/replication/replication_controller.go:639 // syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked concurrently with the same key. func (rm *ReplicationManager) syncReplicationController(key string) error { trace := util.NewTrace("syncReplicationController: " + key) defer trace.LogIfLong(250 * time.Millisecond) startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) }() if !rm.podStoreSynced() { // Sleep so we give the pod reflector goroutine a chance to run. time.Sleep(PodStoreSyncedPollPeriod) glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key) rm.queue.Add(key) return nil } obj, exists, err := rm.rcStore.Indexer.GetByKey(key) if !exists { glog.Infof("Replication Controller has been deleted %v", key) rm.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rc := *obj.(*v1.ReplicationController) trace.Step("ReplicationController restored") rcNeedsSync := rm.expectations.SatisfiedExpectations(key) trace.Step("Expectations restored") // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*v1.Pod if rm.garbageCollectorEnabled { // list all pods to include the pods that don't match the rc's selector // anymore but has the stale controller ref. pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) // Adopt pods only if this replication controller is not going to be deleted. if rc.DeletionTimestamp == nil { for _, pod := range matchesNeedsController { err := cm.AdoptPod(pod) // continue to next pod if adoption fails. if err != nil { // If the pod no longer exists, don't even log the error. if !errors.IsNotFound(err) { utilruntime.HandleError(err) } } else { matchesAndControlled = append(matchesAndControlled, pod) } } } filteredPods = matchesAndControlled // remove the controllerRef for the pods that no longer have matching labels var errlist []error for _, pod := range controlledDoesNotMatch { err := cm.ReleasePod(pod) if err != nil { errlist = append(errlist, err) } } if len(errlist) != 0 { aggregate := utilerrors.NewAggregate(errlist) // push the RC into work queue again. We need to try to free the // pods again otherwise they will stuck with the stale // controllerRef. rm.queue.Add(key) return aggregate } } else { pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } filteredPods = controller.FilterActivePods(pods) } var manageReplicasErr error if rcNeedsSync && rc.DeletionTimestamp == nil { manageReplicasErr = rm.manageReplicas(filteredPods, &rc) } trace.Step("manageReplicas done") newStatus := calculateStatus(rc, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil { // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop return err } return manageReplicasErr }
syncReplicationController的主要逻辑为:
如果podStore还没有被同步过一次,则将该rc的key重新加入到queue中,以等待podStore同步,流程结束,否则继续后面的流程。
根据该rc的key值,从rcStore中获取对应的rc object,如果不存在该rc object,则说明该rc已经被删除了,然后根据key从epectations中删除该rc并返回,流程结束。如果存在该rc object,则继续后面的流程。
检测expectations中的add和del以及距离上一个时间戳是否超时5min,来判断该rc是否需要sync。
如果启动了GC,则获取podStore中整个namespace下的pods,然后将matchesAndControlled和matchesNeedsController的pods作为过滤后待同步的filteredPods。如果没有启动GC,则直接获取podStore中该namespace下匹配rc.Spec.Selector的Active状态的pods作为过滤后待同步的filteredPods。(关于matchesAndControlled和matchesNeedsController的理解,请参考pkg/controller/controller_ref_manager.go:57中定义的PodControllerRefManager.Classify函数)
如果第3步中检测到该rc需要sync,并且DeletionTimestamp这个时间戳为nil,则调用manageReplicas方法,使得该rc管理的active状态的pods数量和期望值一样。
执行完manageReplicas后,需要马上重新计算一下rc的status,更新status中的Conditions,Replicas,FullyLabeledReplicas,ReadyReplicas,AvailableReplicas信息。
通过updateReplicationControllerStatus方法调用kube-api-server的接口更新该rc的status为上一步重新计算后的新status,流程结束。
上面描述的syncReplicationController流程中,一个很关键的步骤是step 5中调用的manageReplicas方法,它负责rc对应replicas的修复工作(add or delete)。
pkg/controller/replication/replication_controller.go:516 // manageReplicas checks and updates replicas for the given replication controller. // Does NOT modify <filteredPods>. func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error { diff := len(filteredPods) - int(*(rc.Spec.Replicas)) rcKey, err := controller.KeyFunc(rc) if err != nil { return err } if diff == 0 { return nil } if diff < 0 { diff *= -1 if diff > rm.burstReplicas { diff = rm.burstReplicas } // TODO: Track UIDs of creates just like deletes. The problem currently // is we'd need to wait on the result of a create to record the pod's // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. errCh := make(chan error, diff) rm.expectations.ExpectCreations(rcKey, diff) var wg sync.WaitGroup wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) for i := 0; i < diff; i++ { go func() { defer wg.Done() var err error if rm.garbageCollectorEnabled { var trueVar = true controllerRef := &metav1.OwnerReference{ APIVersion: getRCKind().GroupVersion().String(), Kind: getRCKind().Kind, Name: rc.Name, UID: rc.UID, Controller: &trueVar, } err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) } else { err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc) } if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) errCh <- err utilruntime.HandleError(err) } }() } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil } if diff > rm.burstReplicas { diff = rm.burstReplicas } glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) // No need to sort pods if we are about to delete all of them if *(rc.Spec.Replicas) != 0 { // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(filteredPods)) } // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either // when we see it as an update of the deletion timestamp, or as a delete. // Note that if the labels on a pod/rc change in a way that the pod gets // orphaned, the rs will only wake up after the expectations have // expired even if other pods are deleted. deletedPodKeys := []string{} for i := 0; i < diff; i++ { deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) } // We use pod namespace/name as a UID to wait for deletions, so if the // labels on a pod/rc change in a way that the pod gets orphaned, the // rc will only wake up after the expectation has expired. errCh := make(chan error, diff) rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) var wg sync.WaitGroup wg.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { defer wg.Done() if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey, podKey) errCh <- err utilruntime.HandleError(err) } }(i) } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil }
上面manageReplicas代码的主要逻辑为:
首先计算filteredPods中Pods数量和rc.Spec.Replicas中定义的期望数量的差值diff。
如果差值diff为0,表示当前状态和期望状态一样,直接返回,流程结束。
如果差值diff为负数,表示当前Active状态的Pods数量不足,则启动下面流程:
比较|diff|
和burstReplicas的值,以保证这次最多只创建burstReplicas数量的pods。
调用expectations.ExpectCreations接口设置expectations中的add大小为|diff|
的值,表示要新创建|diff|
数量的pods以达到期望状态。
sync.WaitGroup启动|diff|
数量的goroutine协程,每个goroutine分别负责调用podControl.CreatePods接口创建一个该namespace.rc管理的对应spec Template的pod。
待所有goroutine都执行完毕后,如果其中一个或者多个pod创建失败,则返回err,否则返回nil,流程结束。
如果差值diff为正数,表示当前Active状态的Pods数量超过了期望值,则启动下面流程:
比较|diff|
和burstReplicas的值,以保证这次最多只删除burstReplicas数量的pods。
对filteredPods中的pods进行排序,排序目的是:not-ready < ready, unscheduled < scheduled, and pending < running,让stages越早的pods优先被delete。
排序完之后,挑选前面|diff|
个pods作为待delete的Pods。
调用expectations.ExpectDeletions接口设置expectations中的del大小为|diff|
的值,表示要新删除|diff|
数量的pods以达到期望状态。
sync.WaitGroup启动|diff|
数量的goroutine协程,每个goroutine分别负责调用podControl.DeletePod接口删除待delete Pods中的一个Pod。
待所有goroutine都执行完毕后,如果其中一个或者多个pod删除失败,则返回err,否则返回nil,流程结束。
到此,相信大家对“Kubernetes Replication Controller的结构定义是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。