温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Python的logger怎么配置

发布时间:2021-12-01 11:14:38 来源:亿速云 阅读:184 作者:iii 栏目:大数据

这篇文章主要介绍“Python的logger怎么配置”,在日常操作中,相信很多人在Python的logger怎么配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Python的logger怎么配置”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

传递的数据结构如何考虑(是否对调用方有先验知识的要求,比如返回一个 Tuple,则需要用户了解 tuple 中元素的顺序,这样情况是否应该进行封装;),数据结构定义清楚了,很多东西也就清楚了。

如何操作数据库(可以学习 sqlalchemy,包括 core 和 orm 两种 api)

异常如何处理(异常应该分开捕获 — 可以清楚的知道什么情况下导致的,异常之后应该打印日志说明出现什么问题,如果情况恶劣需要进行异常再次抛出或者报警)

所有获取资源的地方都应该做 check(a. 没有获取到会怎么办;b.获取到异常的怎么办)

所有操作资源的地方都应该检查是否操作成功

每个函数都应该简短,如果函数过长应该进行拆分(有个建议值,函数包含的行数应该在 20-30 行之间,具体按照这个规范做过一次之后就会发现这样真好)

使用 class 之后,考虑重构 __str__ 函数,用户打印输出(如果不实现 __str__,会调用 __repr__ ),如果对象放到 collection 中之后,需要实现 __repr__ 函数,用于打印整个 collection 的时候,直观显示

如果有些资源会发生变化,可以单独抽取出来,做成函数,这样后续调用就可以不用改变了

附上一份 Python2.7 代码(将一些私有的东西进行了修改)

# -*- coding:utf-8 -*-   from sqlalchemy import create_engine import logging from logging.config import fileConfig import requests import Clinet # 私有的模块   fileConfig("logging_config.ini") logger = logging.getLogger("killduplicatedjob")   #配置可以单独放到一个模块中 DB_USER = "xxxxxxx" DB_PASSWORD = "xxxxxxxx" DB_PORT = 111111 DB_HOST_PORT = "xxxxxxxxxx" DB_DATA_BASE = "xxxxxxxxxxx"   REST_API_URL = "http://sample.com"   engine = create_engine("mysql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST_PORT, DB_PORT, DB_DATA_BASE))   # 这个 class 是为了在函数间传递时,不需要使用方了解属性的具体顺序而写的,也可以放到一个单独的模块中 class DuplicatedJobs(object):  def __init__(self, app_id, app_name, user):  self.app_id = app_id  self.app_name = app_name  self.user = user   def __repr__(self):  return '[appid:%s, app_name:%s, user:%s]' % (self.app_id, self.app_name, self.user)   def find_duplicated_jobs():  logger.info("starting find duplicated jobs")  (running_apps, app_name_to_user) = get_all_running_jobs()  all_apps_on_yarn = get_apps_from_yarn_with_queue(get_resource_queue())   duplicated_jobs = []  for app in all_apps_on_yarn:  (app_id, app_name) = app   if app_id not in running_apps:  if not app_name.startswith("test"):  logger.info("find a duplicated job, prefixed_name[%s] with appid[%s]" % (app_name, app_id))  user = app_name_to_user[app_name]  duplicated_jobs.append(DuplicatedJobs(app_id, app_name, user))  else:  logger.info("Job[%s] is a test job, would not kill it" % app_name)   logger.info("Find duplicated jobs [%s]" % duplicated_jobs)   return duplicated_jobs   def get_apps_from_yarn_with_queue(queue):  param = {"queue": queue}  r = requests.get(REST_API_URL, params=param)  apps_on_yarn = []  try:  jobs = r.json().get("apps")  app_list = jobs.get("app", [])  for app in app_list:  app_id = app.get("id")  name = app.get("name")  apps_on_yarn.append((app_id, name))   except Exception as e: #Exception ***进行单独的分开,针对每一种 Exception 进行不同的处理  logger.error("Get apps from Yarn Error, message[%s]" % e.message)   logger.info("Fetch all apps from Yarn [%s]" % apps_on_yarn)   return apps_on_yarn   def get_all_running_jobs():  job_infos = get_result_from_mysql("select * from xxxx where xx=yy")   app_ids = []  app_name_to_user = {}  for (topology_id, topology_name) in job_infos:  status_set = get_result_from_mysql("select * from xxxx where xx=yy")  application_id = status_set[0][0]  if "" != application_id:  configed_resource_queue = get_result_from_mysql(  "select * from xxxx where xx=yy")  app_ids.append(application_id)  app_name_to_user[topology_name] = configed_resource_queue[0][0].split(".")[1]   logger.info("All running jobs appids[%s] topology_name2user[%s]" % (app_ids, app_name_to_user))  return app_ids, app_name_to_user   def kill_duplicated_jobs(duplicated_jobs):  for job in duplicated_jobs:  app_id = job.app_id  app_name = job.app_name  user = job.user  logger.info("try to kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))  try:  Client.kill_job(app_id, user)  logger.info("Job[%s] with appid[%s] for user[%s] has been killed" % (app_name, app_id, user))  except Exception as e:  logger.error("Can't kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))   def get_result_from_mysql(sql):  a = engine.execute(sql)  return a.fetchall()   # 因为下面的资源可能发生变化,而且可能包含一些具体的逻辑,因此单独抽取出来,独立成一个函数 def get_resource_queue():  return "xxxxxxxxxxxxx"   if __name__ == "__main__":  kill_duplicated_jobs(find_duplicated_jobs())

其中 logger 配置文件如下(对于 Python 的 logger,官方文档写的非常好,建议读一次,并且实践一次)

[loggers] keys=root, simpleLogger   [handlers] keys=consoleHandler, logger_handler   [formatters] keys=formatter   [logger_root] level=WARN handlers=consoleHandler   [logger_simpleLogger] level=INFO handlers=logger_handler propagate=0 qualname=killduplicatedjob   [handler_consoleHandler] class=StreamHandler level=WARN formatter=formatter args=(sys.stdout,)   [handler_logger_handler] class=logging.handlers.RotatingFileHandler level=INFO formatter=formatter args=("kill_duplicated_streaming.log", "a", 52428800, 3,)   [formatter_formatter] format=%(asctime)s %(name)-12s %(levelname)-5s %(message)s

到此,关于“Python的logger怎么配置”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI