温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么用flink 1.11使sql客户端支持执行sql文件

发布时间:2021-12-31 10:38:30 阅读:341 作者:iii 栏目:大数据
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章主要讲解了“怎么用flink 1.11使sql客户端支持执行sql文件”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用flink 1.11使sql客户端支持执行sql文件”吧!

背景

目前flink的sql客户端提供了一种交互式的sql查询服务,用户可以使用sql客户端执行一些sql的批任务或者流任务。但是当我想执行一些sql的定时任务时,flink却没有提供一个合适的方式,所以综合考虑了一下,我决定在sql的客户端基础上给加一个 '-filename (-f)' 参数,就像类似'hive -f abc.sql' 一样,可以执行一批sql任务。 

源码修改

目前我只是想通过sql客户端执行一些批任务,再加上flink sql 客户端本身的一些设计,所以目前修改后的sql client 执行sql文件的时候支持 SET,DDL,INSERT INTO SELECT ...等语句,其他比如select暂不支持。

修改后执行的方式为:

/home/flink/bin/sql-client.sh embedded -f flink.sql  
  

CliOptionsParser.java

在这个sql 客户端参数解析类里添加一个选项,用于解析-f参数。

 public static final Option OPTION_FILENAME = Option  .builder("f")  .required(false)  .longOpt("filename")  .numberOfArgs(1)  .argName("the path of the sql file")  .desc("SQL from files")  .build();  
  

CliOptions.java

在这里添加一个变量filename

private final String filename;  
  

SqlClient.java

在SqlClient里添加对于-filename的处理

  if (options.getUpdateStatement() != null){    // execute  update statement    final boolean success = cli.submitUpdate(options.getUpdateStatement());    if (!success) {     throw new SqlClientException("Could not submit given SQL update statement to cluster.");    }   } else if (options.getFilename() != null){    final boolean success = cli.executeFile(options.getFilename());    if (!success) {     throw new SqlClientException("Could not submit given SQL file  to cluster.");    }   } else {    cli.open();   }  
  

SqlClient#executeFile

添加具体的执行sql文件的方法,sql文件里的所有sql以分号切分,然后分别判断是什么类型,调用不同的方法来执行。

 public boolean executeFile(String filename){  File file = new File(filename);  if (!file.exists()){   printError("the file do not exist");   return false;  } else {   String statement = null;   try {    statement = FileUtils.readFileToString(file);   } catch (IOException e){    printError("read the sql file error , " + e.getMessage());    return false;   }   String[] sqls = statement.split(";");   for (String sql : sqls){    if (sql == null || "".equals(sql.trim())){     continue;    }    final Optional<SqlCommandCall> parsedStatement = parseCommand(sql);    if (parsedStatement.isPresent()){     SqlCommandCall cmdCall = parsedStatement.get();     switch (cmdCall.command) {      case SET:       callSet(cmdCall);       break;                        ................      case INSERT_INTO:      case INSERT_OVERWRITE:       callInsert(cmdCall);       break;      case CREATE_TABLE:       callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);       break;                            .....................       throw new SqlClientException("Unsupported command: " + cmdCall.command);     }    }   }  }  return true; }

感谢各位的阅读,以上就是“怎么用flink 1.11使sql客户端支持执行sql文件”的内容了,经过本文的学习后,相信大家对怎么用flink 1.11使sql客户端支持执行sql文件这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/u/4596020/blog/4465602

AI

开发者交流群×