这篇文章给大家分享的是有关python如何实现公司内项目对接钉钉审批流程的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
上代码:
import requests import json import time from dingtalk.crypto import DingTalkCrypto from django.conf import settings # settings.BASE_DIR class Crypto(object): def __init__(self, token): # 随便填的字符串 self.token = token # 自己生成的43位随机字符串 self.aes_key = settings.DINGDING.get("DINGTALK_AES_TOKEN") # 钉钉企业ID self.corp_id = settings.DINGDING.get("CorpId") # print("corp_id:", self.corp_id) self.nonce = settings.DINGDING.get("nonce") self.crypto = DingTalkCrypto( token=self.nonce, encoding_aes_key=self.aes_key, corpid_or_suitekey=self.corp_id ) def encrypt_success(self): # 返回加密success result = self.crypto.encrypt_message( msg="success", nonce=self.nonce, timestamp=int(time.time()*1000) ) return result class DING(object): def __init__(self, approve_process): self.AgentId = settings.DINGDING.get("AgentId") self.AppKey = settings.DINGDING.get("AppKey") self.AppSecret = settings.DINGDING.get("AppSecret") self.dingding_url = settings.DINGDING.get("URL") self.process_code = settings.DINGDING.get("APPROVE_PROCESS").get(approve_process)['process_code'] self.aes_key = settings.DINGDING.get("DINGTALK_AES_TOKEN") self.nonce = settings.DINGDING.get("nonce") def get_token(self): ''' 获取钉钉的token :return: 钉钉token ''' url = self.dingding_url + '/gettoken?appkey={}&appsecret={}'.format(self.AppKey, self.AppSecret) req = requests.get(url) req = json.loads(req.text) return req['access_token'] # def createCallbackDd(): # ''' # 注册钉钉回调函数 # :return: # ''' # url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token=' + self.getToken() # data = { # "call_back_tag": ["bpms_task_change", "bpms_instance_change"], #这两个回调种类是审批的 # "token": TOKEN, #自定义的字符串 # "aes_key": AES_KEY, #自定义的43位字符串,密钥 # "url": URL #回调地址 # } # requests.post(url, data=json.dumps(data)) # return ('OK') def create_process(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list, has_cc=0): ''' 创建钉钉审批 approvers为list 元素为钉钉userid cc_list同理 ''' url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token() print("form_component_value_vo:", form_component_value_vo) if has_cc == 0: data = { 'agent_id': self.AgentId, 'process_code': self.process_code, #工单id 'originator_user_id': originator_user_id, 'dept_id': dept_id, #创建人的钉钉部门id 'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段, 'approvers': approvers, 'cc_list': cc_list, 'cc_position': 'START_FINISH' # 发起和完成时与抄送 } else: data = { 'agent_id': self.AgentId, 'process_code': self.process_code, #工单id 'originator_user_id': originator_user_id, #创建人的钉钉userid 'dept_id': dept_id, #创建人的钉钉部门id 'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段, 'approvers': approvers, } print("dingding_utils:", data) response = requests.post(url, data=data) return response.json() def get_status(self, process_instance_id): url = self.dingding_url + '/topapi/processinstance/get?access_token=' + self.get_token() data = { "process_instance_id": process_instance_id } response = requests.post(url, data=data) return response.json() def register_callback(self, call_back_url): # 注册回调 url = self.dingding_url + '/call_back/register_call_back?access_token=' + self.get_token() print("self.get_token():", self.get_token()) data = { "call_back_tag": ['bpms_task_change', 'bpms_instance_change'], "token": self.nonce, "aes_key": self.aes_key, "url": call_back_url, } response = requests.post(url, data=json.dumps(data)) return response.json() def get_callback(self): url = self.dingding_url + '/call_back/get_call_back?access_token=' + self.get_token() req = requests.get(url) req = json.loads(req.text) return req def create_process_approver_v2(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list): ''' 创建钉钉审批 ''' url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token() data = { 'agent_id': self.AgentId, 'process_code': self.process_code, 'originator_user_id': originator_user_id, 'dept_id': dept_id, 'form_component_values': str(form_component_value_vo), 'approvers_v2': json.dumps(approvers) } if cc_list: data['cc_list'] = cc_list data['cc_position'] = 'FINISH' response = requests.post(url, data=data) return response.json() def create_process_approver_v2_test(self, originator_user_id, dept_id, form_component_value_vo): ''' 创建钉钉审批 ''' url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token() data = { 'agent_id': self.AgentId, 'process_code': self.process_code, 'originator_user_id': originator_user_id, 'dept_id': dept_id, 'form_component_values': str(form_component_value_vo), 'approvers_v2': json.dumps([ { "task_action_type": "NONE", "user_ids": ["dingding_id"], # 单独审批人 }, { "task_action_type": "OR", "user_ids": ["dingding_id1", "dingding_id2"], # 或签 }, { "task_action_type": "AND", "user_ids": ["dingding_id1", "dingding_id2"], # 会签 } ]) } response = requests.post(url, data=data) return response.json() if __name__ == "__main__": import django, os, sys sys.path.append('xxxxxx') # 项目路径 os.environ['DJANGO_SETTINGS_MODULE'] = 'xx.settings' # print("settings.DINGDING", settings.DINGDING) ding = DING("create_xx") # print(ding.get_token()) # info = [{'name': '单行输入框','value': 'testixxxxxxxx'}] # # print(ding.create_process('11', 11, info)) a = [ {'name': "输入框1", 'value': "value1"}, {'name': "输入框2", 'value': "value2"}, ] # print(ding.create_process_test('11', 11, a)) # print(ding.create_process_approver_v2_test('11', 11, a)) # print(ding.create_process_test2()) # print(ding.get_status('xxx')) print(ding.get_status('xx')) # # 验证 回调 # a = ding.get_token() # print(a) # c = Crypto(a) # print(c.encrypt_success()) # 注册回调 # print(ding.register_callback("http://xxxx.vaiwan.com/xxx")) # print(ding.get_callback())
说明:
1 Crypto类用于对接钉钉回调用的。一个公司只有一个corpId,并且一个corpid只能注册一个回调地址。我司有公共组注册好了回调。只要接入公司内的回调即可。所以我实际没有使用到Crypto。
2 在钉钉管理后台中创建应用后会有这三个东西:AgentId、AppKey,AppSecret 。在创建钉钉审批流程,可以从审批流程浏览器中获取到APPROVE_PROCESS。别忘啦给这个流程审批接口权限。这些官方文档有说。
3 配置setting变量:
DINGDING = { "AgentId": 123, "AppKey": "xx", "AppSecret": "xx", "URL": "https://oapi.dingtalk.com", "APPROVE_PROCESS": { # process_code "create_xx": { "process_code": "abc", # 审批流程的id }, "DINGTALK_AES_TOKEN": "abc", "nonce": "abc", "CorpId": "abc", }
4 接口形式创建的审批流程,与钉钉管理后台创建的流程有一些不同:
1 不能在不同的审批环节设置不同的抄送人
2 不能审批流程前后有相同的人,不能自动显示成 “自动同意”(管理后台设置成去重后,但是接口指定审批人场景,不支持)
5 其他如:审批内容、或签,会签代码里都有示例。
感谢各位的阅读!关于“python如何实现公司内项目对接钉钉审批流程”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。