温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Argo Event云原生的事件驱动架构怎么用

发布时间:2022-01-14 18:02:10 来源:亿速云 阅读:200 作者:iii 栏目:云计算

这篇“Argo Event云原生的事件驱动架构怎么用”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Argo Event云原生的事件驱动架构怎么用”文章吧。

1、Argo Event简介

Event事件大家都很熟悉,可以说Kubernetes就是完全由事件驱动的,不同的controller manager本质就是实现了不同的事件处理函数,比如所有ReplicaSet对象是由ReplicaSetController控制器管理,该控制器通过Informer监听ReplicaSet以及其关联的Pod的事件变化,从而维持运行状态和我们声明spec保持一致。

当然Kubernetes无论是什么Controller,其监听和处理的都是内部事件,而在应用层上我们也有很多外部事件,比如CICD事件、Webhook事件、日志事件等等,如何处理这些事件呢,目前Kubernetes原生是无法实现的。

当然你可以自己实现一个event handler运行在Kubernetes平台,不过实现难度也不小。而Argo Event组件完美解决了这个问题。

如图是Argo Event官方提供的的流程图:

Argo Event云原生的事件驱动架构怎么用

首先事件源EventSource可以是Webhook、S3、Github、SQS等等,中间会经过一个叫Gateway(新版本叫EventBus)的组件,更准确地说老版本原来gateway的配置功能已经合并到EventSource了,EventBus是新引入的组件,后端默认基于高性能分布式消息中间件NATS[1]实现,当然其他中间件比如Kafka也是可以的。

这个EventBus可以看做是事件的一个消息队列,消息生产者连接EvenSource,EventSource又连接到Sensor。更详细地说EvenSource把事件发送给EvenBus,Sensor会订阅EvenBus的消息队列,EvenBus负责把事件转发到已订阅该事件的Sensor组件,EventSorce在上图中没有体现,具体设计文档可以参考Argo-events Enhancement Proposals[2]

有些人可能会说为什么EventBus不直接到Trigger,中间引入一个Sensor,这主要是两个原因,一是为了使事件转发和处理松耦合,二是为了实现Trigger事件的参数化,通过Sensor不仅可以实现事件的过滤,还可以实现事件的参数化,比如后面的Trigger是创建一个Kubernetes Pod,那这个Pod的metadata、env等,都可以根据事件内容进行填充。

Sensor组件注册关联了一个或者多个触发器,这些触发器可以触发AWS Lambda事件、Argo Workflow事件、Kubernetes Objects等,通俗简单地说,可以执行Lambda函数,可以动态地创建Kubernetes的对象或者创建前面的介绍的Workflow。

还记得前面介绍的Argo Rollout吗,我们演示了手动promote实现应用发布或者回滚,通过Argo Event就可以很完美地和测试平台或者CI/CD事件结合起来,实现自动应用自动发布或者回滚。

2、一个简单的Webhook例子

关于Argo Event的部署非常简单,直接通过kubecl apply或者helm均可,可以参考文档Installation[3],这里不再赘述。

Argo Event部署完成后注意还需要部署EventBus,官方推荐使用NATS中间件,文档中有部署NATS stateful的文档。

接下来我们以一个最简单的Webhook事件为例,从而了解Argo Event的几个组件功能以及用法。

首先按照前面的介绍,我们需要先定义EventSource:

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: webhook
spec:
  service:
    ports:
      - port: 12000
        targetPort: 12000
  webhook:
    webhook_example:
      port: "12000"
      endpoint: /webhook
      method: POST

这个EventSource定义了一个webhook webhook_example,端口为12000,路径为/webhook,一般Webhook为POST方法,因此该Webhhok处理器我们配置只接收POST方法。

为了把这个Webhook EventSource暴露,我们还创建了一个Service,端口也是12000。

此时我们可以手动curl该Service:

# kubectl get svc -l eventsource-name=webhook
NAME                      TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)     AGE
webhook-eventsource-svc   ClusterIP   10.96.93.24   <none>        12000/TCP   5m49s
# curl -X POST -d '{}' 10.96.93.24:12000/webhook
success

当然此时由于没有注册任何的Sensor,因此什么都不会发生。

接下来我们定义Sensor:

Argo Event云原生的事件驱动架构怎么用

首先在dependencies中定义了订阅的EventSource以及具体的Webhook,由于一个EventSource可以定义多个Webhook,因此必须同时指定EventSource和Webhook两个参数。

在Trigger中我们定义了对应Action为create一个workflow,这个workflow的spec定义在resource中配置。

最后的parameters部分定义了workflow的参数,这些参数值从event中获取,这里我们会把整个event都当作workflow的input。当然你可以通过dataKey只汲取body部分:dataKey: body.message

此时我们再次curl这个webhook事件:

curl -X POST -d '{"message": "HelloWorld!"}' 10.96.93.24:12000/webhook

此时我们获取argo workflow列表发现新创建了一个实例:

# argo list
NAME            STATUS      AGE   DURATION   PRIORITY
webhook-8xt4s   Succeeded   1m    18s        0

查看workflow输出如下:

Argo Event云原生的事件驱动架构怎么用

由于我们是把整个event作为workflow input发过去的,因此data内容部分是base64编码,我们可以查看解码后的内容如下:

{
  "header": {
    "Accept": [
      "*/*"
    ],
    "Content-Length": [
      "26"
    ],
    "Content-Type": [
      "application/x-www-form-urlencoded"
    ],
    "User-Agent": [
      "curl/7.58.0"
    ]
  },
  "body": {
    "message": "HelloWorld!"
  }
}

从这里我们也可以看出Event包含两个部分,一个是context,一个是data,data中又包含header部分以及body部分,在parameters中可以通过Key获取任意部分内容。

如上的webhook触发是通过手动curl的,你可以很容易地在github或者bitbucket上配置到webhook中,这样一旦代码有更新就能触发这个事件了。

3、Kubernetes触发AWS Lambda函数

前面的例子中的EventSource使用了Webhook,除了Webhook,Argo Event还支持很多的EventSource,比如:

  • amqp

  • aws-sns

  • aws-sqs

  • github/gitlab

  • hdfs

  • kafka

  • redis

  • Kubernetes resource

  • ...

Trigger也同样支持很多,比如:

  • aws lambda

  • amqp

  • kafka

  • ...

如上官方都提供了非常丰富的例子,可以参考argo events examples[4]

这里以Kubernetes resource事件源为例,这个事件监听Kubernetes的资源状态,比如Pod创建、删除等,这里以创建Pod为例:

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: k8s-resource-demo
spec:
  template:
    serviceAccountName: argo-events-sa
  resource:
    pod_demo:
      namespace: argo-events
      version: v1
      resource: pods
      eventTypes:
        - ADD
      filter:
        afterStart: true
        labels:
          - key: app
            operation: "=="
            value: my-pod

如上例子监听Pods的ADD事件,即创建Pod,filter中过滤只有包含app=my-pod标签的Pod,特别需要注意的是使用的serviceaccount argo-events-sa必须具有Pod的list、watch权限。

接下来我们使用AWS Lambda触发器,Lambda函数已经在AWS提前创建好:

Argo Event云原生的事件驱动架构怎么用

这个Lambda函数很简单,直接返回event本身。

创建Sensor如下:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: aws-lambda-trigger-demo
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
    - name: test-dep
      eventSourceName: k8s-resource-demo
      eventName: pod_demo
  triggers:
    - template:
        name: lambda-trigger
        awsLambda:
          functionName: hello
          accessKey:
            name: aws-secret
            key: accesskey
          secretKey:
            name: aws-secret
            key: secretkey
          namespace: argo-events
          region: cn-northwest-1
          payload:
            - src:
                dependencyName: test-dep
                dataKey: body.name
              dest: name

如上AWS access key和access secret需要提前放到aws-secret中。

此时我们创建一个新的Pod my-pod:

apiVersion: v1
kind: Pod
metadata:
  labels:
    app: my-pod
  name: my-pod
spec:
  containers:
  - image: nginx
    name: my-pod
  dnsPolicy: ClusterFirst
  restartPolicy: Always

当Pod启动后,我们发现AWS Lambda函数被触发执行:

Argo Event云原生的事件驱动架构怎么用

4、event filter

前面的例子中webhook中所有的事件都会被sensor触发,我们有时不需要处理所有的事件,Argo Event支持基于data以及context过滤,比如我们只处理message为hello或者为hey的事件,其他消息忽略,只需要在原来的dependenciestest-dep增加filter即可:

dependencies:
- name: test-dep
  eventSourceName: webhook
  eventName: webhook_example
  filters:
  - name: data-filter
    data:
    - path: body.message
      type: string
      value:
      - "hello"
      - "hey"

filter指定了基于data过滤,过滤的字段为body.message,匹配的内容为hello、hey

5、trigger policy

trigger policy主要用来判断最后触发器执行的结果是成功还是失败,如果是创建Kubernetes资源比如Workflow,可以根据Workflow最终状态决定这个Trigger的执行结果,而如果是触发一个HTTP或者AWS Lambda,则需要自定义policy status。

awsLambda:
  functionName: hello
  accessKey:
    name: aws-secret
    key: accesskey
  secretKey:
    name: aws-secret
    key: secretkey
  namespace: argo-events
  region: us-east-1
  payload:
  - src:
      dependencyName: test-dep
      dataKey: body.message
    dest: message
  policy:
    status:
        allow:
         - 200
         - 201

如上表示当AWS Lambda返回200或者201时表示Trigger成功。

以上就是关于“Argo Event云原生的事件驱动架构怎么用”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI