private final static Logger logger = LoggerFactory.getLogger(GetData.class);
public static void main(String[] arg) throws Exception {
TypeInformation[] fieldTypes = new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://ip:3306/tablename?characterEncoding=utf8")
.setUsername("*")
.setPassword("*")
.setQuery("select name from words")
.setRowTypeInfo(rowTypeInfo)
.finish();
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource s = env.createInput(jdbcInputFormat); // datasource
BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());
tableEnv.registerDataSet("t2", s);
tableEnv.sqlQuery("select * from t2").printSchema();
Table query = tableEnv.sqlQuery("select * from t2");
DataSet result = tableEnv.toDataSet(query, Row.class);
result.print();
System.out.println(s.count());
}
通过插件将所需的类打到一个jar中
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 此处指定main方法入口的class -->
<mainClass>*</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
然后执行
./bin/flink run /flink-1.8.0/collector-api-0.1.jar
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。