温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么用python requests实现上传excel数据流

发布时间:2022-02-14 09:44:45 来源:亿速云 阅读:197 作者:iii 栏目:开发技术

本篇内容主要讲解“怎么用python requests实现上传excel数据流”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么用python requests实现上传excel数据流”吧!

requests上传excel数据流

headers=self.headers
        #获取导入模版
        file_home = self.import_template
        log.info(file_home)
        
        wb = load_workbook(filename=file_home)
        ws = wb['sheet1']
        # 修改产废单位名称,以及备注
        ws['b3'] = 'itest产废单位'+self.dic["t"]
        ws['s3'] = 'i原料销售'+self.dic["t"]
        wb.save(file_home)
        url=self.url_1+"/companies/import?companyType=2"
        payload={}
        
        m=MultipartEncoder(
            fields={
                "parent_dir":'/',
                "name":'file',
                "filename":'name.xlsx',
                'file':('name.xlsx',open(file_home,'rb'),'application/vnd.ms-excel')
            }
        )
        headers['Content-Type']=m.content_type
        r=requests.post(url,headers=headers,data=m)
        log.info(r.json())

数据驱动之python+requests+excel

数据驱动

是根据数据来测试的,如读取 excel表中的测试用例自动填写测试结果,发送测试报告包括以下模块:

  • 1.获取用例

  • 2.调用接口

  • 3.校验结果

  • 4.发送测试报告

  • 5.异常处理

  • 6.日志模块

1. 首先设计好测试用例

怎么用python requests实现上传excel数据流

2.建立文件结构

该自动化测试框架命名为:ATP,bin目录下写主程序,cases目录下放测试用例,conf目录下放配置文件,lib目录下放各个封装好的模块,logs目录下放日志文件,和readme文件。

怎么用python requests实现上传excel数据流

3.封装模块

common.py:封装读取excel用例、调用接口、检验结果、写入报告这几个模块。

"""
第一步:读取excel中用例
第二步:根据用例发送请求
第三步:校验结果
第四步:将测试结果、返回报文写入excel
"""
import xlrd,requests
from xlutils import copy
from lib.log import atp_log
class OpCase(object):
    def get_case(self,file_path):
        cases= []   #定义一个列表存放所有的cases
        if file_path.endswith('.xls') or file_path.endswith('.xlsx'):
           try:
               book = xlrd.open_workbook(file_path)
               sheet = book.sheet_by_index(0)
               for i in range(1,sheet.nrows):
                   row_data = sheet.row_values(i)   #获取的每一行数据存到列表row_data
                   cases.append(row_data[4:8])
               atp_log.info('共读取%s条用例'%(len(cases)))
               self.file_path = file_path   #因为该函数已经传了参数路径,为方便write_excel引用,在此实例化
           except Exception as e:
               atp_log.error('[%s]用例获取失败,错误信息:%s'%(file_path,e))
        else:
            atp_log.error('用例文件不合法,%s'%file_path)
        return cases
    def my_request(self,url,method,data):
        data = self.dataToDict(data)
        try:
            if method.upper() == 'POST':
                res = requests.post(url,data).text
            elif method.uper() == 'GET':
                res = requests.get(url,params=data).text
            else:
                atp_log.warning('该请求方式暂不支持')
                res = '该请求方式暂不支持'
        except Exception as e:
            msg = '【%s】接口调用失败,%s'%(url,e)
            atp_log.error(msg)
            res = msg
        return res
    def dataToDict(self,data):  #把数据转成字典。
        res = {}
        data = data.split(',')
        for d in data:  #
            k, v = d.split('=')
            res[k] = v
    def check_res(self,res,check):  #res:实际结果,check:预期结果
        res = res.replace('": "','=').replace('": ','=')
        for c in check.split(','):
            if c not in res:
                atp_log.info('结果校验失败,预期结果:【%s】,实际结果【%s】'%(c,res))
                return '失败'
            return '成功'
    def write_excel(self,case_res):
        book = xlrd.open_workbook(self.file_path)
        new_book = copy.copy(book)
        sheet = new_book.get_sheet(0)
        row = 1
        for case_case in case_res:
            sheet.write(row,8,case_case[0])
            sheet.write(row,9,case_case[1])
            row += 1
        new_book.save(self.file_path.replace('xlsx','xls'))

log.py:封装日志模块

import logging,os
from logging import handlers
from conf import setting
class Mylogger():
    def __init__(self,file_name,level='info',backCount=5,when='D'):
        logger = logging.getLogger()  # 先实例化一个logger对象,先创建一个办公室
        logger.setLevel(self.get_level(level))  # 设置日志的级别
        # f1 = logging.FileHandler(filename='a.log',mode='a',encoding='utf-8')    #找到写日志文件的这个人
        c1 = logging.StreamHandler()  # 负责往控制台输出的
        b1 = handlers.TimedRotatingFileHandler(filename=file_name, when=when, interval=1, backupCount=backCount, encoding='utf-8')
        fmt = logging.Formatter('%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
        c1.setFormatter(fmt)
        b1.setFormatter(fmt)
        logger.addHandler(c1)
        logger.addHandler(b1)
        self.logger = logger
    def get_level(self,str):
        level = {
            'debug':logging.DEBUG,
            'info':logging.INFO,
            'warm':logging.WARNING,
            'error':logging.ERROR
        }
        str = str.lower()
        return level.get(str)
path = os.path.join(setting.LOG_PATH,setting.LOG_NAME)
atp_log = Mylogger(path,'debug').logger
#直接在这里实例化,用的时候不用再实例化了
#别的地方用的时候,直接atp_log.warnning('xxxx')

send_mail.py:封装发送邮件模块

import yagmail
from conf import setting
from lib.log import atp_log
def sendmail(title,content,attrs=None):
    m = yagmail.SMTP(host=setting.MAIL_HOST,user=setting.MAIL_USER,
                 password=setting.MAIL_PASSWRD,smtp_ssl=True)
    m.send(to=setting.TO,
           subject=title,
           contents = content,
           attachments = attrs)
    atp_log.info('发送邮件完成')

4.配置文件

setting.py,配置文件:设置邮件地址、日志默认级别、用例存放路径、日志存放路径、日志文件名

import os
BASE_PATH = os.path.dirname(
    os.path.dirname(os.path.abspath(__file__))
)   #三层目录定位到ATP目录
MAIL_HOST = 'smtp.qq.com'
MAIL_USER='12*****89@qq.com'
MAIL_PASSWRD = 'gjn*****bcgh'
TO = [
    '12*****9@qq.com'
]
LEVEL = 'debug' #设置日志默认级别
LOG_PATH = os.path.join(BASE_PATH,'logs')   #日志文件在logs目录下
CASE_PATH = os.path.join(BASE_PATH,'cases') #用例文件在cases目录下
LOG_NAME = 'atp_log'    #设置日志文件名

5.将ATP文件

Mark directory  as Sources Root

6.编写主程序

start.py

import os,sys
BASE_PATH = os.path.dirname(
    os.path.dirname(os.path.abspath(__file__))
)
sys.path.insert(0,BASE_PATH)
from lib.common import OpCase
from lib.send_mail import sendmail
from conf import setting
class CaseRun(object):
    def find_case(self):
        op = OpCase()
        for f in os.listdir(setting.CASE_PATH): #每次循环的时候读一个excel
            abs_path = os.path.join(setting.CASE_PATH,f)
            case_list = op.get_case(abs_path)
            res_list = []
            pass_count,fail_count= 0,0
            for case in case_list:  #循环每一个excel里面的所有用例
                url,method,req_data,check = case
                res = op.my_request(url,method,req_data)    #调用完接口返回的结果
                status = op.check_res(res,check)
                res_list.append([res,status])
                if status == '通过':
                    pass_count += 1
                else:
                    fail_count += 1
            op.write_excel(res_list)
            msg = '''
            xx你好,
                本次共运行%s条用例,通过%s条,失败%s条。
            '''%(len(res_list),pass_count,fail_count)
            sendmail('测试用例运行结果',content=msg,attrs=abs_path)
CaseRun().find_case()

OK,数据驱动自动化测试框架编写完成,运行 start.py 程序,收到邮件内容如下:

怎么用python requests实现上传excel数据流

到此,相信大家对“怎么用python requests实现上传excel数据流”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI