这篇“Argo Event云原生的事件驱动架构怎么用”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Argo Event云原生的事件驱动架构怎么用”文章吧。
Event事件大家都很熟悉,可以说Kubernetes就是完全由事件驱动的,不同的controller manager本质就是实现了不同的事件处理函数,比如所有ReplicaSet对象是由ReplicaSetController控制器管理,该控制器通过Informer监听ReplicaSet以及其关联的Pod的事件变化,从而维持运行状态和我们声明spec保持一致。
当然Kubernetes无论是什么Controller,其监听和处理的都是内部事件,而在应用层上我们也有很多外部事件,比如CICD事件、Webhook事件、日志事件等等,如何处理这些事件呢,目前Kubernetes原生是无法实现的。
当然你可以自己实现一个event handler运行在Kubernetes平台,不过实现难度也不小。而Argo Event组件完美解决了这个问题。
如图是Argo Event官方提供的的流程图:
首先事件源EventSource可以是Webhook、S3、Github、SQS等等,中间会经过一个叫Gateway(新版本叫EventBus)的组件,更准确地说老版本原来gateway的配置功能已经合并到EventSource了,EventBus是新引入的组件,后端默认基于高性能分布式消息中间件NATS[1]实现,当然其他中间件比如Kafka也是可以的。
这个EventBus可以看做是事件的一个消息队列,消息生产者连接EvenSource,EventSource又连接到Sensor。更详细地说EvenSource把事件发送给EvenBus,Sensor会订阅EvenBus的消息队列,EvenBus负责把事件转发到已订阅该事件的Sensor组件,EventSorce在上图中没有体现,具体设计文档可以参考Argo-events Enhancement Proposals[2]。
有些人可能会说为什么EventBus不直接到Trigger,中间引入一个Sensor,这主要是两个原因,一是为了使事件转发和处理松耦合,二是为了实现Trigger事件的参数化,通过Sensor不仅可以实现事件的过滤,还可以实现事件的参数化,比如后面的Trigger是创建一个Kubernetes Pod,那这个Pod的metadata、env等,都可以根据事件内容进行填充。
Sensor组件注册关联了一个或者多个触发器,这些触发器可以触发AWS Lambda事件、Argo Workflow事件、Kubernetes Objects等,通俗简单地说,可以执行Lambda函数,可以动态地创建Kubernetes的对象或者创建前面的介绍的Workflow。
还记得前面介绍的Argo Rollout吗,我们演示了手动promote实现应用发布或者回滚,通过Argo Event就可以很完美地和测试平台或者CI/CD事件结合起来,实现自动应用自动发布或者回滚。
关于Argo Event的部署非常简单,直接通过kubecl apply或者helm均可,可以参考文档Installation[3],这里不再赘述。
Argo Event部署完成后注意还需要部署EventBus,官方推荐使用NATS中间件,文档中有部署NATS stateful的文档。
接下来我们以一个最简单的Webhook事件为例,从而了解Argo Event的几个组件功能以及用法。
首先按照前面的介绍,我们需要先定义EventSource:
apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: webhook spec: service: ports: - port: 12000 targetPort: 12000 webhook: webhook_example: port: "12000" endpoint: /webhook method: POST
这个EventSource定义了一个webhook webhook_example
,端口为12000,路径为/webhook
,一般Webhook为POST方法,因此该Webhhok处理器我们配置只接收POST
方法。
为了把这个Webhook EventSource暴露,我们还创建了一个Service,端口也是12000。
此时我们可以手动curl该Service:
# kubectl get svc -l eventsource-name=webhook NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE webhook-eventsource-svc ClusterIP 10.96.93.24 <none> 12000/TCP 5m49s # curl -X POST -d '{}' 10.96.93.24:12000/webhook success
当然此时由于没有注册任何的Sensor,因此什么都不会发生。
接下来我们定义Sensor:
首先在dependencies
中定义了订阅的EventSource以及具体的Webhook,由于一个EventSource可以定义多个Webhook,因此必须同时指定EventSource和Webhook两个参数。
在Trigger中我们定义了对应Action为create一个workflow,这个workflow的spec定义在resource中配置。
最后的parameters
部分定义了workflow的参数,这些参数值从event中获取,这里我们会把整个event都当作workflow的input。当然你可以通过dataKey只汲取body部分:dataKey: body.message
。
此时我们再次curl这个webhook事件:
curl -X POST -d '{"message": "HelloWorld!"}' 10.96.93.24:12000/webhook
此时我们获取argo workflow列表发现新创建了一个实例:
# argo list NAME STATUS AGE DURATION PRIORITY webhook-8xt4s Succeeded 1m 18s 0
查看workflow输出如下:
由于我们是把整个event作为workflow input发过去的,因此data内容部分是base64编码,我们可以查看解码后的内容如下:
{ "header": { "Accept": [ "*/*" ], "Content-Length": [ "26" ], "Content-Type": [ "application/x-www-form-urlencoded" ], "User-Agent": [ "curl/7.58.0" ] }, "body": { "message": "HelloWorld!" } }
从这里我们也可以看出Event包含两个部分,一个是context,一个是data,data中又包含header部分以及body部分,在parameters
中可以通过Key获取任意部分内容。
如上的webhook触发是通过手动curl的,你可以很容易地在github或者bitbucket上配置到webhook中,这样一旦代码有更新就能触发这个事件了。
前面的例子中的EventSource使用了Webhook,除了Webhook,Argo Event还支持很多的EventSource,比如:
amqp
aws-sns
aws-sqs
github/gitlab
hdfs
kafka
Kubernetes resource
...
Trigger也同样支持很多,比如:
aws lambda
amqp
kafka
...
如上官方都提供了非常丰富的例子,可以参考argo events examples[4]。
这里以Kubernetes resource事件源为例,这个事件监听Kubernetes的资源状态,比如Pod创建、删除等,这里以创建Pod为例:
apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: k8s-resource-demo spec: template: serviceAccountName: argo-events-sa resource: pod_demo: namespace: argo-events version: v1 resource: pods eventTypes: - ADD filter: afterStart: true labels: - key: app operation: "==" value: my-pod
如上例子监听Pods的ADD事件,即创建Pod,filter中过滤只有包含app=my-pod标签的Pod,特别需要注意的是使用的serviceaccount argo-events-sa
必须具有Pod的list、watch权限。
接下来我们使用AWS Lambda触发器,Lambda函数已经在AWS提前创建好:
这个Lambda函数很简单,直接返回event本身。
创建Sensor如下:
apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: aws-lambda-trigger-demo spec: template: serviceAccountName: argo-events-sa dependencies: - name: test-dep eventSourceName: k8s-resource-demo eventName: pod_demo triggers: - template: name: lambda-trigger awsLambda: functionName: hello accessKey: name: aws-secret key: accesskey secretKey: name: aws-secret key: secretkey namespace: argo-events region: cn-northwest-1 payload: - src: dependencyName: test-dep dataKey: body.name dest: name
如上AWS access key和access secret需要提前放到aws-secret中。
此时我们创建一个新的Pod my-pod:
apiVersion: v1 kind: Pod metadata: labels: app: my-pod name: my-pod spec: containers: - image: nginx name: my-pod dnsPolicy: ClusterFirst restartPolicy: Always
当Pod启动后,我们发现AWS Lambda函数被触发执行:
前面的例子中webhook中所有的事件都会被sensor触发,我们有时不需要处理所有的事件,Argo Event支持基于data以及context过滤,比如我们只处理message为hello
或者为hey
的事件,其他消息忽略,只需要在原来的dependencies
中test-dep
增加filter即可:
dependencies: - name: test-dep eventSourceName: webhook eventName: webhook_example filters: - name: data-filter data: - path: body.message type: string value: - "hello" - "hey"
filter指定了基于data
过滤,过滤的字段为body.message
,匹配的内容为hello、hey
。
trigger policy主要用来判断最后触发器执行的结果是成功还是失败,如果是创建Kubernetes资源比如Workflow,可以根据Workflow最终状态决定这个Trigger的执行结果,而如果是触发一个HTTP或者AWS Lambda,则需要自定义policy status。
awsLambda: functionName: hello accessKey: name: aws-secret key: accesskey secretKey: name: aws-secret key: secretkey namespace: argo-events region: us-east-1 payload: - src: dependencyName: test-dep dataKey: body.message dest: message policy: status: allow: - 200 - 201
如上表示当AWS Lambda返回200或者201时表示Trigger成功。
以上就是关于“Argo Event云原生的事件驱动架构怎么用”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。