温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何分析SuperEdge 拓扑算法

发布时间:2022-01-17 10:53:49 阅读:150 作者:柒染 栏目:云计算
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章主要为大家分析了如何分析SuperEdge 拓扑算法的相关知识点,内容详细易懂,操作细节合理,具有一定参考价值。如果感兴趣的话,不妨跟着跟随小编一起来看看,下面跟着小编一起深入学习“如何分析SuperEdge 拓扑算法”的知识吧。

前言

 

SuperEdge 介绍

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制。同时 superedge 自研了 service group 实现了基于边缘计算的服务访问控制,极大简化了应用从云端部署到边缘端的过程。  

SuperEdge service group拓扑感知特性

SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问。

在深入分析 application-grid-wrapper 之前,这里先简单介绍一下社区 Kubernetes 原生支持的拓扑感知特性[1]

Kubernetes service topology awareness 特性于v1.17发布 alpha 版本,用于实现路由拓扑以及就近访问特性。用户需要在service 中添加 topologyKeys 字段标示拓扑key类型,只有具有相同拓扑域的 endpoint 会被访问到,目前有三种 topologyKeys 可供选择:

  • "kubernetes.io/hostname":访问本节点内(     kubernetes.io/hostname      label value相同)的endpoint,如果没有则service访问失败
  • "topology.kubernetes.io/zone":访问相同zone域内(     topology.kubernetes.io/zone      label value相同)的endpoint,如果没有则service访问失败
  • "topology.kubernetes.io/region":访问相同region域内(     topology.kubernetes.io/region      label value 相同)的 endpoint,如果没有则 service 访问失败

除了单独填写如上某一个拓扑key之外,还可以将这些key构造成列表进行填写,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],这表示:优先访问本节点内的 endpoint;如果不存在,则访问同一个 zone 内的 endpoint;如果再不存在,则访问同一个 region 内的 endpoint,如果都不存在则访问失败。

另外,还可以在列表最后(只能最后一项)添加"*"表示:如果前面拓扑域都失败,则访问任何有效的 endpoint,也即没有限制拓扑了,示例如下:  
A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints.apiVersion: v1kind: Servicemetadata:    name: my-servicespec:    selector:        app: my-app    ports:        - protocol: TCP            port: 80            targetPort: 9376    topologyKeys:        - "kubernetes.io/hostname"    - "topology.kubernetes.io/zone"     - "topology.kubernetes.io/region"    - "*"
 

而service group 实现的拓扑感知和社区对比,有如下区别:

  • service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"。
  • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的访问。

service group 实现的拓扑感知,service 配置如下:

# A Service that only prefers node zone1al endpoints.apiVersion: v1kind: Servicemetadata:    annotations:        topologyKeys: '["zone1"]'    labels:        superedge.io/grid-selector: servicegrid-demo    name: servicegrid-demo-svcspec:    ports:    - port: 80        protocol: TCP        targetPort: 8080    selector:        appGrid: echo
 

在介绍完 service group 实现的拓扑感知后,我们深入到源码分析实现细节。同样的,这里以一个使用示例开始分析:

# step1: labels edge nodes$ kubectl  get nodesNAME    STATUS   ROLES    AGE   VERSIONnode0   Ready    <none>   16d   v1.16.7node1    Ready    <none>   16d   v1.16.7node2    Ready    <none>   16d   v1.16.7# nodeunit1(nodegroup and servicegroup zone1)$ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1  # nodeunit2(nodegroup and servicegroup zone1)$ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2$ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2...# step3: deploy echo ServiceGrid$ cat <<EOF | kubectl --kubeconfig config apply --apiVersion: superedge.io/v1kind: ServiceGridmetadata:    name: servicegrid-demo    namespace: defaultspec:    gridUniqKey: zone1    template:        selector:            appGrid: echo        ports:        - protocol: TCP            port: 80            targetPort: 8080EOFservicegrid.superedge.io/servicegrid-demo created# note that there is only one relevant service generated$ kubectl  get svcNAME                TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)   AGEkubernetes          ClusterIP   192.168.0.1       <none>        443/TCP   16dservicegrid-demo-svc   ClusterIP   192.168.6.139     <none>        80/TCP    10m    # step4: access servicegrid-demo-svc(service topology and closed-looped)# execute on node0$ curl 192.168.6.139|grep "node name"        node name:      node0# execute on node1 and node2$ curl 192.168.6.139|grep "node name"        node name:      node2$ curl 192.168.6.139|grep "node name"       node name:      node1
 

在创建完 ServiceGrid CR 后,ServiceGrid Controller 负责根据 ServiceGrid产生对应的 service (包含由 serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations);而 application-grid-wrapper 根据 service 实现拓扑感知,下面依次分析。

 

ServiceGrid Controller 分析

ServiceGrid Controller 逻辑和 DeploymentGrid Controller 整体一致,如下:

  • 1、创建并维护 service group 需要的若干 CRDs(包括:ServiceGrid)
  • 2、监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
  • 3、监听 service event,并将相关的  ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑

注意这里区别于 DeploymentGrid Controller:

  • 一个 ServiceGrid 对象只产生一个 service
  • 只需额外监听 service event,无需监听 node 事件。因为 node 的CRUD与 ServiceGrid 无关
  • ServiceGrid 对应产生的 service,命名为:     {ServiceGrid}-svc
func (sgc *ServiceGridController) syncServiceGrid(key stringerror {        startTime := time.Now()        klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime)        defer func() {              klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime))        }()            namespace, name, err := cache.SplitMetaNamespaceKey(key)        if err != nil {              return err        }            sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name)        if errors.IsNotFound(err) {                      klog.V(2).Infof("service grid %v has been deleted", key)                return nil        }        if err != nil {                return err        }            if sg.Spec.GridUniqKey == "" {               sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty""This service grid has an empty grid key")                return nil        }            // get service workload list of this grid        svcList, err := sgc.getServiceForGrid(sg)        if err != nil {                return err        }            if sg.DeletionTimestamp != nil {                return nil        }            // sync service grid relevant services workload        return sgc.reconcile(sg, svcList)    }    func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) {      svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything())      if err != nil {            return nil, err      }        labelSelector, err := common.GetDefaultSelector(sg.Name)      if err != nil {              return nil, err      }          canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error)  {                 fresh, err :=   sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{})              if err != nil {                    return nil, err              }              if fresh.UID != sg.UID {                      return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace,                                sg.Name, fresh.UID, sg.UID)              }              return fresh, nil        })              cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc)        return cm.ClaimService(svcList)}    func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList []*corev1.Service) error {        var (                adds    []*corev1.Service             updates []*corev1.Service             deletes []*corev1.Service        )            sgTargetSvcName := util.GetServiceName(g)        isExistingSvc := false        for _, svc := range svcList {             if svc.Name == sgTargetSvcName {                        isExistingSvc = true                 template := util.KeepConsistence(g, svc)                        if !apiequality.Semantic.DeepEqual(template, svc) {                           updates = append(updates, template)                        }                } else {                        deletes = append(deletes, svc)                }        }            if !isExistingSvc {                adds = append(adds, util.CreateService(g))        }            return sgc.syncService(adds, updates, deletes)}func CreateService(sg *crdv1.ServiceGrid) *corev1.Service {        svc := &corev1.Service{                ObjectMeta: metav1.ObjectMeta{                      Name:      GetServiceName(sg),                      Namespace: sg.Namespace,             // Append existed ServiceGrid labels to service to be created            Labels: func() map[string]string {                              if sg.Labels != nil {                   newLabels := sg.Labels                                      newLabels[common.GridSelectorName] = sg.Name                               newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey                                      return newLabels                           } else {                                 return map[string]string{                                              common.GridSelectorName:        sg.Name,                                   common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey,               }                         }                  }(),                  Annotations: make(map[string]string),           },            Spec: sg.Spec.Template,     }        keys := make([]string1)      keys[0] = sg.Spec.GridUniqKey      keyData, _ := json.Marshal(keys)       svc.Annotations[common.TopologyAnnotationsKey] = string(keyData)        return svc}
 

由于逻辑与 DeploymentGrid 类似,这里不展开细节,重点关注 application-grid-wrapper 部分。

 

application-grid-wrapper 分析

在 ServiceGrid Controller 创建完 service 之后,application-grid-wrapper 的作用就开始启动了:

apiVersion: v1kind: Servicemetadata:    annotations:        topologyKeys: '["zone1"]'    creationTimestamp: "2021-03-03T07:33:30Z"    labels:        superedge.io/grid-selector: servicegrid-demo      name: servicegrid-demo-svc      namespace: default      ownerReferences:      - apiVersion: superedge.io/v1          blockOwnerDeletion: true          controller: true          kind: ServiceGrid          name: servicegrid-demo          uid78c74d3c-72ac-4e68-8c79-f1396af5a581      resourceVersion"127987090"      selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc      uid8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfcspec:      clusterIP192.168.161.1      ports:      - port80        protocol: TCP        targetPort8080    selector:        appGrid: echo    sessionAffinity: None    typeClusterIPstatus:    loadBalancer: {}
 

为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,架构如下:

如何分析SuperEdge 拓扑算法  

调用链路如下:

kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
 

因此 application-grid-wrapper 会起服务,接受来自 kube-proxy 的请求,如下:

func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile stringerror {        ...        klog.Infof("Start to run interceptor server")        /* filter         */        server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}            if insecure {                return server.ListenAndServe()        }        ...        server.TLSConfig = tlsConfig        return server.ListenAndServeTLS("""")}func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {       handler := http.Handler(http.NewServeMux())            handler = s.interceptEndpointsRequest(handler)        handler = s.interceptServiceRequest(handler)        handler = s.interceptEventRequest(handler)        handler = s.interceptNodeRequest(handler)        handler = s.logger(handler)          if debug {              handler = s.debugger(handler)     }        return handler}
 
这里会首先创建 interceptorServer,然后注册处理函数,由外到内依次如下:  
 
  • debug:接受 debug 请求,返回 wrapper pprof 运行信息

  • logger:打印请求日志

  • node:接受 kube-proxy node GET(/api/v1/nodes/{node})请求,并返回 node信息

  • event:接受 kube-proxy events POST(/events)请求,并将请求转发给 lite-apiserver

func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler {     return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {           if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") {                   handler.ServeHTTP(w, r)              return           }                targetURL, _ := url.Parse(s.restConfig.Host)           reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)           reverseProxy.Transport, _ = rest.TransportFor(s.restConfig)           reverseProxy.ServeHTTP(w, r)     })  }
 
  • service:接受 kube-proxy service List&Watch(/api/v1/services)请求,并根据 storageCache 内容返回(GetServices)

  • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints)请求,并根据 storageCache 内容返回(GetEndpoints)

下面先重点分析 cache 部分的逻辑,然后再回过头来分析具体的 http handler List&Watch 处理逻辑。

wrapper 为了实现拓扑感知,自己维护了一个 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注册了这三类资源的处理函数:

type storageCache struct {        // hostName is the nodeName of node which application-grid-wrapper deploys on        hostName         string        wrapperInCluster bool            // mu lock protect the following map structure        mu           sync.RWMutex        servicesMap  map[types.NamespacedName]*serviceContainer        endpointsMap map[types.NamespacedName]*endpointsContainer        nodesMap     map[types.NamespacedName]*nodeContainer            // service watch channel        serviceChan chan<- watch.Event       // endpoints watch channel        endpointsChan chan<- watch.Event}...func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {       msc := &storageCache{               hostName:         hostName,          wrapperInCluster: wrapperInCluster,               servicesMap:      make(map[types.NamespacedName]*serviceContainer),        endpointsMap:     make(map[types.NamespacedName]*endpointsContainer),               nodesMap:         make(map[types.NamespacedName]*nodeContainer),         serviceChan:      serviceNotifier,               endpointsChan:    endpointsNotifier,        }            return msc}...func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile stringerror {        ...        if err := s.setupInformers(ctx.Done()); err != nil {                return err        }         klog.Infof("Start to run interceptor server")       /* filter        */      server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}      ...      return server.ListenAndServeTLS("""")}func (s *interceptorServer) setupInformers(stop <-chan struct{}) error {       klog.Infof("Start to run service and endpoints informers")        noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)        if err != nil {                klog.Errorf("can't parse proxy label, %v", err)                return err        }            noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)        if err != nil {                klog.Errorf("can't parse headless label, %v", err)                return err        }                labelSelector := labels.NewSelector()        labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)         resyncPeriod := time.Minute * 5       client := kubernetes.NewForConfigOrDie(s.restConfig)        nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)        informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,              informers.WithTweakListOptions(func(options *metav1.ListOptions) {                    options.LabelSelector = labelSelector.String()               }))                 nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()        serviceInformer := informerFactory.Core().V1().Services().Informer()        endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()           /*        */        nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod)        serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod)           endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod)            go nodeInformer.Run(stop)        go serviceInformer.Run(stop)        go endpointsInformer.Run(stop)            if !cache.WaitForNamedCacheSync("node", stop,                nodeInformer.HasSynced,                serviceInformer.HasSynced,             endpointsInformer.HasSynced) {              return fmt.Errorf("can't sync informers")        }            return nil}func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler {        return &nodeHandler{cache: sc}}func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler {        return &serviceHandler{cache: sc}}func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler {        return &endpointsHandler{cache: sc}}
 
这里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:  
 

1、NodeEventHandler

NodeEventHandler 负责监听 node 资源相关 event,并将 node 以及 node Labels 添加到 storageCache.nodesMap 中(key为nodeName,value为node以及node labels)。

func (nh *nodeHandler) add(node *v1.Node) {        sc := nh.cache            sc.mu.Lock()            nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}        klog.Infof("Adding node %v", nodeKey)        sc.nodesMap[nodeKey] = &nodeContainer{                node:   node,                labels: node.Labels,        }        // update endpoints        changedEps := sc.rebuildEndpointsMap()            sc.mu.Unlock()            for _, eps := range changedEps {         sc.endpointsChan <- eps            }}func (nh *nodeHandler) update(node *v1.Node) {        sc := nh.cache            sc.mu.Lock()            nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}        klog.Infof("Updating node %v", nodeKey)        nodeContainer, found := sc.nodesMap[nodeKey]        if !found {                sc.mu.Unlock()                klog.Errorf("Updating non-existed node %v", nodeKey)                return        }            nodeContainer.node = node        // return directly when labels of node stay unchanged        if reflect.DeepEqual(node.Labels, nodeContainer.labels) {                sc.mu.Unlock()                return        }        nodeContainer.labels = node.Labels            // update endpoints        changedEps := sc.rebuildEndpointsMap()            sc.mu.Unlock()            for _, eps := range changedEps {                sc.endpointsChan <- eps      }}...
 

同时由于 node 的改变会影响 endpoint,因此会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified eventsfunc (sc *storageCache) rebuildEndpointsMap() []watch.Event {       evts := make([]watch.Event, 0)       for name, endpointsContainer := range sc.endpointsMap {               newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)           if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {                        continue                }                sc.endpointsMap[name].modified = newEps                evts = append(evts, watch.Event{                        Type:   watch.Modified,                  Object: newEps,            })       }       return evts}
 

rebuildEndpointsMap 是 cache 的核心函数,同时也是拓扑感知的算法实现:

// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labelsfunc pruneEndpoints(hostName string,    nodes map[types.NamespacedName]*nodeContainer,        services map[types.NamespacedName]*serviceContainer,        eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints {           epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}          if wrapperInCluster {               eps = genLocalEndpoints(eps)     }           // dangling endpoints        svc, ok := services[epsKey]     if !ok {                klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets)                return eps        }               // normal service        if len(svc.keys) == 0 {             klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets)            return eps       }            // topology endpoints        newEps := eps.DeepCopy()        for si := range newEps.Subsets {         subnet := &newEps.Subsets[si]        subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses)                subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses)        }        klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets)         return newEps}// filterConcernedAddresses aims to filter out endpoints addresses within the same node unitfunc filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer,        addresses []v1.EndpointAddress) []v1.EndpointAddress {        hostNode, found := nodes[types.NamespacedName{Name: hostName}]        if !found {                return nil        }            filteredEndpointAddresses := make([]v1.EndpointAddress, 0)        for i := range addresses {               addr := addresses[i]                if nodeName := addr.NodeName; nodeName != nil {                         epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]                        if !found {                              continue                         }                         if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                                 filteredEndpointAddresses = append(filteredEndpointAddresses, addr)                         }                }        }            return filteredEndpointAddresses }func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {     if n1 == nil || n2 == nil {              return false        }            for _, key := range keys {               val1, v1found := n1[key]             val2, v2found := n2[key]                    if v1found && v2found && val1 == val2 {                    return true                }        }            return false   }
 

算法逻辑如下:

  • 判断 endpoint 是否为 default kubernetes service,如果是,则将该 endpoint 转化为 wrapper 所在边缘节点的 lite-apiserver 地址(127.0.0.1)和端口(5100     3)。
apiVersion: v1kind: Endpointsmetadata:    annotations:        superedge.io/local-endpoint: 127.0.0.1        superedge.io/local-port: "51003"   name: kubernetes    namespace: defaultsubsets:- addresses:   - ip: 172.31.0.60    ports:    - name: https      port: xxx      protocol: TCP
 
func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints {        if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName {                return eps        }            klog.V(4).Infof("begin to gen local ep %v", eps)        ipAddress, e := eps.Annotations[EdgeLocalEndpoint]        if !e {                return eps        }                portStr, e := eps.Annotations[EdgeLocalPort]        if !e {                return eps        }            klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr)        port, err := strconv.ParseInt(portStr, 1032)        if err != nil {                klog.Errorf("parse int %s err %v", portStr, err)                return eps        }          ip := net.ParseIP(ipAddress)       if ip == nil {                klog.Warningf("parse ip %s nil", ipAddress)                return eps        }            nep := eps.DeepCopy()        nep.Subsets = []v1.EndpointSubset{               {                       Addresses: []v1.EndpointAddress{                              {                                       IP: ipAddress,                   },                        },                        Ports: []v1.EndpointPort{                                 {                                         Protocol: v1.ProtocolTCP,                                         Port:     int32(port),                                        Name:     "https",                                },                      },               },       }            klog.V(4).Infof("gen new endpoint complete %v", nep)          return nep}
 
这样做的目的是使边缘节点上的服务采用集群内 (InCluster) 方式访问的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver。  
 
  • 从 storageCache.servicesMap cache 中根据 endpoint 名称(namespace/name) 取出对应 service,如果该 service 没有 topologyKeys 则无需做拓扑转化(非 service group)。

func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string {       if !hasTopologyKey(objectMeta) {         return nil        }           var keys []string       keyData := objectMeta.Annotations[TopologyAnnotationsKey]        if err := json.Unmarshal([]byte(keyData), &keys); err != nil {                klog.Errorf("can't parse topology keys %s, %v", keyData, err)               return nil        }            return keys}
 
  • 调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。

// filterConcernedAddresses aims to filter out endpoints addresses within the same node unitfunc filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer,        addresses []v1.EndpointAddress) []v1.EndpointAddress {        hostNode, found := nodes[types.NamespacedName{Name: hostName}]        if !found {                return nil       }            filteredEndpointAddresses := make([]v1.EndpointAddress, 0)        for i := range addresses {               addr := addresses[i]                if nodeName := addr.NodeName; nodeName != nil {                        epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]               if !found {                                continue                        }                       if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                                filteredEndpointAddresses = append(filteredEndpointAddresses, addr)                        }               }       }            return filteredEndpointAddresses}    func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {           if n1 == nil || n2 == nil {                   return false            }              for _, key := range keys {                  val1, v1found := n1[key]                  val2, v2found := n2[key]                 if v1found && v2found && val1 == val2 {                     return true               }       }         return false}
 
注意:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service。  
回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将修改后的 endpoints 赋值给 storageCache .endpointsMap [endpoint]. modified (该字段记录了拓扑感知后修改的endpoints)。  
func (nh *nodeHandler) add(node *v1.Node) {        sc := nh.cache           sc.mu.Lock()            nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}        klog.Infof("Adding node %v", nodeKey)        sc.nodesMap[nodeKey] = &nodeContainer{                node:   node,                labels: node.Labels,        }        // update endpoints        changedEps := sc.rebuildEndpointsMap()            sc.mu.Unlock()            for _, eps := range changedEps {         sc.endpointsChan <- eps        }}// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified eventsfunc (sc *storageCache) rebuildEndpointsMap() []watch.Event {        evts := make([]watch.Event, 0)      for name, endpointsContainer := range sc.endpointsMap {                newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)                if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {                        continue                }                sc.endpointsMap[name].modified = newEps                evts = append(evts, watch.Event{                        Type:   watch.Modified,               Object: newEps,                })        }        return evts}
 
另外,如果 endpoints (拓扑感知后修改的 endpoints)发生改变,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。  

2、ServiceEventHandler

storageCache.servicesMap 结构体 key 为 service 名称(namespace/name),value 为 serviceContainer,包含如下数据:

  • svc:service对象
  • keys:service topologyKeys
对于 service 资源的改动,这里用 Update event 说明:  
func (sh *serviceHandler) update(service *v1.Service) {        sc := sh.cache           sc.mu.Lock()        serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}        klog.Infof("Updating service %v", serviceKey)        newTopologyKeys := getTopologyKeys(&service.ObjectMeta)        serviceContainer, found := sc.servicesMap[serviceKey]        if !found {                sc.mu.Unlock()                klog.Errorf("update non-existed service, %v", serviceKey)                return        }          sc.serviceChan <- watch.Event{           Type:   watch.Modified,               Object: service,        }            serviceContainer.svc = service       // return directly when topologyKeys of service stay unchanged        if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) {                sc.mu.Unlock()                return        }            serviceContainer.keys = newTopologyKeys            // update endpoints        changedEps := sc.rebuildEndpointsMap()        sc.mu.Unlock()            for _, eps := range changedEps {         sc.endpointsChan <- eps        }}
 
逻辑如下:  
 
  • 获取 service topologyKeys
  • 构建 service event.Modified event
  • 比较 service topologyKeys 与已经存在的是否有差异
  • 如果有差异则更新 topologyKeys,且调用 rebuildEndpointsMap 刷新该 service 对应的endpoints,如果 endpoints 发生变化,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

3、EndpointsEventHandler

storageCache.endpointsMap 结构体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,包含如下数据:

  • endpoints:拓扑修改前的 endpoints
  • modified:拓扑修改后的 endpoints
对于 endpoints 资源的改动,这里用 Update event 说   明:  
func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {        sc := eh.cache            sc.mu.Lock()       endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}        klog.Infof("Updating endpoints %v", endpointsKey)            endpointsContainer, found := sc.endpointsMap[endpointsKey]        if !found {                sc.mu.Unlock()               klog.Errorf("Updating non-existed endpoints %v", endpointsKey)            return        }        endpointsContainer.endpoints = endpoints        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)        changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)        if changed {                endpointsContainer.modified = newEps       }        sc.mu.Unlock()            if changed {               sc.endpointsChan <- watch.Event{                        Type:   watch.Modified,               Object: newEps,               }       }}
 
逻辑如下:  
 
  • 更新 endpointsContainer.endpoint 为新的 endpoints 对象
  • 调用 pruneEndpoints 获取拓扑刷新后的 endpoints
  • 比较 endpointsContainer.modified 与新刷新后的 endpoints
  • 如果有差异则更新 endpointsContainer.modified,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。
在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我们回到具体的 http handler List&Watch 处理逻辑上,这里以 endpoints 为例:      
func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler {        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {         if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") {                        handler.ServeHTTP(w, r)               return               }                        queries := r.URL.Query()             acceptType := r.Header.Get("Accept")                info, found := s.parseAccept(acceptType, s.mediaSerializer)               if !found {                        klog.Errorf("can't find %s serializer", acceptType)                       w.WriteHeader(http.StatusBadRequest)                        return                }                        encoder := scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion)           // list request                if queries.Get("watch") == "" {                       w.Header().Set("Content-Type", info.MediaType)                        allEndpoints := s.cache.GetEndpoints()                       epsItems := make([]v1.Endpoints, 0len(allEndpoints))                    for _, eps := range allEndpoints {                                epsItems = append(epsItems, *eps)                        }                                    epsList := &v1.EndpointsList{                                Items: epsItems,                  }                                  err := encoder.Encode(epsList, w)                      if err != nil {                           klog.Errorf("can't marshal endpoints list, %v", err)                      w.WriteHeader(http.StatusInternalServerError)                             return                      }                              return             }                      // watch request             timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")         timeout := time.Minute              if timeoutSecondsStr != "" {            timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))             }                      timer := time.NewTimer(timeout)            defer timer.Stop()                   flusher, ok := w.(http.Flusher)            if !ok {                      klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)                      w.WriteHeader(http.StatusMethodNotAllowed)                      return                }                    e := restclientwatch.NewEncoder(                       streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),                        scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),                      encoder)              if info.MediaType == runtime.ContentTypeProtobuf {                   w.Header().Set("Content-Type",     runtime.ContentTypeProtobuf+";stream=watch")             } else {                     w.Header().Set("Content-Type", runtime.ContentTypeJSON)              }              w.Header().Set("Transfer-Encoding""chunked")              w.WriteHeader(http.StatusOK)         flusher.Flush()              for {                      select {                      case <-r.Context().Done():               return                      case <-timer.C:                          return                     case evt := <-s.endpointsWatchCh:                             klog.V(4).Infof("Send endpoint watch event: %+#v", evt)                   err := e.Encode(&evt)                 if err != nil {                       klog.Errorf("can't encode watch event, %v", err)                           return                          }                                   if len(s.endpointsWatchCh) == 0 {                                  flusher.Flush()                     }                     }               }      })}
 

逻辑如下:

  • 如果为 List请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回

func (sc *storageCache) GetEndpoints() []*v1.Endpoints {        sc.mu.RLock()       defer sc.mu.RUnlock()           epList := make([]*v1.Endpoints, 0len(sc.endpointsMap))        for _, v := range sc.endpointsMap {                epList = append(epList, v.modified)        }        return epList}
 
  • 如果为 Watch 请求,则不断从 storageCache.endpointsWatchCh 管道中接受 watch event,并返回 interceptServiceRequest 逻辑与 interceptEndpointsRequest 一致,这里不再赘述 。

 

总结

  • SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问
  • service group 实现的拓扑感知和 Kubernetes 社区原生实现对比,有如下区别:
    • service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
    • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint的访问
  • ServiceGrid Controller 负责根据 ServiceGrid 产生对应的 service(包含由serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations),逻辑和 DeploymentGrid Controller 整体一致,如下:
    • 创建并维护 service group 需要的若干CRDs(包括:ServiceGrid)
    • 监听 ServiceGrid event,并填充 ServiceGrid到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
    • 监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑
  • 为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,调用链路如下:     kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
  • application-grid-wrapper 是一个 http server,接受来自 kube-proxy 的请求,同时维护一个资源缓存,处理函数由外到内依次如下:
    • debug:接受 debug 请求,返回 wrapper pprof 运行信息
    • logger:打印请求日志
    • node:接受 kube-proxy node GET (/api/v1/nodes/{node}) 请求,并返回 node 信息
    • event:接受 kube-proxy events POST (/events) 请求,并将请求转发给 lite-apiserver
    • service:接受 kube-proxy service List&Watch (/api/v1/services) 请求,并根据 storageCache 内容返回 (GetServices)
    • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints) 请求,并根据 storageCache 内容返回(GetEndpoints)
  • wrapper 为了实现拓扑感知,维护了一个资源 cache,包括:node,service,endpoint,同时注册了相关 event 处理函数。核心拓扑算法逻辑为:调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。另外,如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该service
  • wrapper 接受来自 kube-proxy 对 endpoints 以及 service 的 List&Watch 请求,以endpoints 为例:如果为List 请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回;如果为 Watch 请求,则不断从storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service 逻辑与 endpoints 一致

关于“如何分析SuperEdge 拓扑算法”就介绍到这了,更多相关内容可以搜索亿速云以前的文章,希望能够帮助大家答疑解惑,请多多支持亿速云网站!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/cncf/blog/4988144

AI

开发者交流群×