这篇文章给大家介绍如何实现一个MapReduce读取数据存入HBase,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
车辆位置数据文件,格式:车辆id 速度:油耗:当前里程。
通过MapReduce算出每辆车的平均速度、油耗、里程
vid1 78:8:120 vid1 56:11:124 vid1 98:5:130 vid1 72:6:131 vid2 78:4:281 vid2 58:9:298 vid2 67:15:309
创建Map类和map函数
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class VehicleMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String vehicle = value.toString();// 将输入的纯文本的数据转换成String
// 将输入的数据先按行进行分割
StringTokenizer tokenizerArticle = new StringTokenizer(vehicle, "\n");
// 分别对每一行进行处理
while (tokenizerArticle.hasMoreTokens()) {
// 每行按空格划分
StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());
String vehicleId = tokenizer.nextToken(); // vid
String vehicleInfo = tokenizer.nextToken(); // 车辆信息
Text vid = new Text(vehicleId);
Text info = new Text(vehicleInfo);
context.write(vid, info);
}
}
}
创建Reduce类
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
public class VehicleReduce extends TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int speed = 0;
int oil = 0;
int mile = 0;
int count = 0;
for (Text val : values) {
String str = val.toString();
String[] arr = str.split(":");
speed += Integer.valueOf(arr[0]);
oil += Integer.valueOf(arr[1]);
mile += Integer.valueOf(arr[2]) - mile; // 累积里程
count++;
}
speed = (int) speed / count; // 求平均值
oil = (int) oil / count;
mile = (int) mile / count;
String result = speed + ":" + oil + ":" + mile;
Put put = new Put(key.getBytes());
put.add(Bytes.toBytes("info"), Bytes.toBytes("property"), Bytes.toBytes(result));
ImmutableBytesWritable keys = new ImmutableBytesWritable(key.getBytes());
context.write(keys, put);
}
}
运行任务
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class VehicleMapReduceJob {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
Job job = new Job(conf, "HBase_VehicleInfo");
job.setJarByClass(VehicleMapReduceJob.class);
job.setMapperClass(VehicleMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0])); // 设置输入文件路径
TableMapReduceUtil.initTableReducerJob("vehicle", VehicleReduce.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
将代码导出成vehicle.jar,放在hadoop-1.2.1目录下,输入命令
./bin/hadoop jar vehicle.jar com/xh/vehicle/VehicleMapReduceJob input/vehicle.txt
HBase结果查询:
关于如何实现一个MapReduce读取数据存入HBase就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/xhgogogo/blog/470610