温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink入门wordCount

发布时间:2020-06-04 11:25:14 来源:网络 阅读:362 作者:qq513283439 栏目:大数据

Flink的编程模型
1、获取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加载、创建数据;
DataSet
3、数据转换;
Transformation
4、数据结果存放;
5、触发执行。
env.execution

下面为flink输出wordcount数据:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkMain {

@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{

    @SuppressWarnings("rawtypes")
    @Override
    /**
     * @param value 原数据
     * @param out 输出的数据
     */
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.split(" ");
        for (String token : tokens) {
            if(token!=null && token.length()>0){
                Tuple2 t = new Tuple2<String, Integer>(token,1);
                out.collect(t);
            }
        }
    }

}

public static void main(String[] args) throws Exception {
    //创建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //创建数据集
    DataSet<String> text = env.fromElements("to be","or no to be","is question");
    //对数据集转换
    DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
    //输出转换后的数据集(print中包含了env.execute执行)
    count.print();
    System.out.println("-----------------------");
    //对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数
    count = count.groupBy(0).sum(1);
    //控制台输出数据集
    count.print();
    System.out.println("-----------------------");
}

}

Flink使用sql方式转换数据
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class FlinkMain2 {

@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {

    //创建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

    List<WordCount> list = new ArrayList();
    String workStr = "to be or no to be is question";
    String[] tokens = workStr.split(" ");
    for (String token : tokens) {
        if(token!=null && token.length()>0){
            list.add( new WordCount(token,1));
        }
    }
    //创建数据集
    DataSet<WordCount> input = env.fromCollection(list);
    //注册为数据表wordCount为数据库表,word,frequency为wordCount表字段
    tEnv.registerDataSet("wordCount", input, "word, frequency");

    Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );

    DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
    //控制台输出
    res.print();

}

public static class WordCount    {
    public String word;
    public long frequency;
    public WordCount(){}

    public WordCount(String word, long frequency) {
        this.word = word;
        this.frequency = frequency;
    }

    @Override
    public String toString() {
        return "词语:" + word + ",词频:" + frequency;
    }
}

}

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI