鲁春利的工作笔记,谁说程序员不能有文艺范?
序列化和反序列化在分布式数据处理中,主要应用于进程建通信和永久存储两个领域。
序列化(serialization)就是结构化的数据转换为字节流以便在网络上传输或写到磁盘进行永久存储的过程;反序列化(deserialization)就是将字节流转换回结构化对象的逆过程。
Hadoop系统节点进程间通信采用RPC实现,Hadoop没有采用Java的序列化机制,而是定义了两个序列化相关的接口:Writable和Comparable,而这两个接口由抽象出了一个WritableComparable接口。
在Hadoop中,Writable接口定义了两个方法:
将数据写入二进制格式的DataOutput流;
从二进制格式读取数据的DataInput流;
package org.apache.hadoop.io;
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*/
void readFields(DataInput in) throws IOException;
}
Hadoop中Writable接口的结构为
Writable
WritableComparable<T>
IntWritable int(定长)
VintWritable int(变长)
BooleanWritable boolean
ByteWritable byte(single byte)
ShortWritable short
FloatWritable float
DoubleWritable double
LongWritable long(定长)
VlongWirtable long(变长)
Text 是针对UTF-8序列的Writable类,一般认为它等价于java.lang.String
BytesWritable byte(byte sequence)
ArrayWritable 数组
TwoDArrayWritable 二维数组
MapWritable implements Map<Writable,Writable>
SortedMapWritable implements SortedMap<WritableComparable,Writable>
WritableComparable接口提供了类型比较的功能,而类型比较对MapReduce至关重要。
package org.apache.hadoop.io;
public interface WritableComparable<T> extends Writable, Comparable<T> {}
# WritableComparator类是一个通用实现。
1. 提供对原始compare()方法的一个默认实现;
2. 充当RawComparator实例的工厂(已注册Writable实现)
package org.apache.hadoop.io;
public class WritableComparator implements RawComparator, Configurable {
/** For backwards compatibility. **/
public static WritableComparator get(Class<? extends WritableComparable> c) {
return get(c, null);
}
/** Get a comparator for a {@link WritableComparable} implementation. */
public static WritableComparator get(Class<? extends WritableComparable> c, Configuration conf) {
// ......
}
}
简单示例
package com.lucl.hadoop.hdfs;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
/**
*
* @author lucl
*
*/
public class CustomizeComparator {
public static void main(String[] args) {
@SuppressWarnings("unchecked")
RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
IntWritable int003 = new IntWritable(300);
IntWritable int004 = new IntWritable(400);
// 利用内存字节数字实现writable到bytes的转化
ByteArrayOutputStream bytes001 = new ByteArrayOutputStream();
DataOutput out001 = new DataOutputStream(bytes001);
try {
int003.write(out001);
} catch (IOException e) {
e.printStackTrace();
}
byte [] b1 = bytes001.toByteArray();
// 利用内存字节数字实现int到bytes的转化
ByteArrayOutputStream bytes002 = new ByteArrayOutputStream();
DataOutput out002 = new DataOutputStream(bytes002);
try {
int004.write(out002);
} catch (IOException e) {
e.printStackTrace();
}
byte [] b2 = bytes002.toByteArray();
int comvalue = comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
System.out.println("comvalue : " + comvalue);
// 利用原始值比较int数据
int value1 = int003.get();
int value2 = int004.get();
if (value1 > value2) {
System.out.println("value1 > value2");
} else {
System.out.println("value1 < value2");
}
}
}
MapReduce程序
需要处理的数据(不同类型网站的访问量及流量)
[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log
视频网站 15 1527
信息安全 20 3156
站点统计 24 6960
搜索引擎 28 3659
站点统计 3 1938
综合门户 15 1938
搜索引擎 21 9531
搜索引擎 63 11058
自定义序列化类
package com.lucl.hadoop.mapreduce.serialize;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class CustomizeWritable implements Writable {
private Long pv;
private Long flow;
public CustomizeWritable () {
// ......
}
public CustomizeWritable (String pv, String flow) {
this(Long.valueOf(pv), Long.valueOf(flow));
}
public CustomizeWritable (Long pv, Long flow) {
this.pv = pv;
this.flow = flow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(pv);
out.writeLong(flow);
}
@Override
public void readFields(DataInput in) throws IOException {
pv = in.readLong();
flow = in.readLong();
}
public Long getPv() {
return pv;
}
public void setPv(Long pv) {
this.pv = pv;
}
public Long getFlow() {
return flow;
}
public void setFlow(Long flow) {
this.flow = flow;
}
@Override
public String toString() {
return this.pv + "\t" + this.flow;
}
}
Mapper端代码
package com.lucl.hadoop.mapreduce.serialize;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WritableMapper extends Mapper<LongWritable, Text, Text, CustomizeWritable> {
Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] splited = value.toString().split("\t");
text.set(splited[0]);
CustomizeWritable wr = new CustomizeWritable(splited[1], splited[2]);
context.write(text, wr);
}
}
Reducer端代码
package com.lucl.hadoop.mapreduce.serialize;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WritableReducer extends Reducer<Text, CustomizeWritable, Text, CustomizeWritable> {
@Override
protected void reduce(Text key, Iterable<CustomizeWritable> values, Context context)
throws IOException, InterruptedException {
Long pv = 0L;
Long flow = 0L;
for (CustomizeWritable customizeWritable : values) {
pv += customizeWritable.getPv();
flow += customizeWritable.getFlow();
}
CustomizeWritable wr = new CustomizeWritable(pv, flow);
context.write(key, wr);
}
}
驱动类
package com.lucl.hadoop.mapreduce.serialize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import com.lucl.hadoop.mapreduce.customize.MyWordCountApp;
/**
* @author lucl
*/
public class CustomizeWritableMRApp extends Configured implements Tool {
private static final Logger logger = Logger.getLogger(MyWordCountApp.class);
public static void main(String[] args) {
try {
ToolRunner.run(new CustomizeWritableMRApp(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
logger.info("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(CustomizeWritableMRApp.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
/**
* map
*/
job.setMapperClass(WritableMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CustomizeWritable.class);
/**
* reduce
*/
job.setReducerClass(WritableReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CustomizeWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
程序运行
[hadoop@nnode code]$ hadoop jar WRApp.jar /data/HTTP_SITE_FLOW.log /201511291404
15/11/29 14:44:13 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/11/29 14:44:15 INFO input.FileInputFormat: Total input paths to process : 1
15/11/29 14:44:15 INFO mapreduce.JobSubmitter: number of splits:1
15/11/29 14:44:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448763754600_0004
15/11/29 14:44:15 INFO impl.YarnClientImpl: Submitted application application_1448763754600_0004
15/11/29 14:44:15 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448763754600_0004/
15/11/29 14:44:15 INFO mapreduce.Job: Running job: job_1448763754600_0004
15/11/29 14:44:45 INFO mapreduce.Job: Job job_1448763754600_0004 running in uber mode : false
15/11/29 14:44:45 INFO mapreduce.Job: map 0% reduce 0%
15/11/29 14:45:14 INFO mapreduce.Job: map 100% reduce 0%
15/11/29 14:45:34 INFO mapreduce.Job: map 100% reduce 100%
15/11/29 14:45:34 INFO mapreduce.Job: Job job_1448763754600_0004 completed successfully
15/11/29 14:45:34 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=254
FILE: Number of bytes written=216031
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=277
HDFS: Number of bytes written=107
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=27010
Total time spent by all reduces in occupied slots (ms)=16429
Total time spent by all map tasks (ms)=27010
Total time spent by all reduce tasks (ms)=16429
Total vcore-seconds taken by all map tasks=27010
Total vcore-seconds taken by all reduce tasks=16429
Total megabyte-seconds taken by all map tasks=27658240
Total megabyte-seconds taken by all reduce tasks=16823296
Map-Reduce Framework
Map input records=8
Map output records=8
Map output bytes=232
Map output materialized bytes=254
Input split bytes=103
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=254
Reduce input records=8
Reduce output records=5
Spilled Records=16
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=167
CPU time spent (ms)=2320
Physical memory (bytes) snapshot=304205824
Virtual memory (bytes) snapshot=1695969280
Total committed heap usage (bytes)=136450048
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=174
File Output Format Counters
Bytes Written=107
[hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /201511291404
Found 2 items
-rw-r--r-- 2 hadoop hadoop 0 2015-11-29 14:45 /201511291404/_SUCCESS
-rw-r--r-- 2 hadoop hadoop 107 2015-11-29 14:45 /201511291404/part-r-00000
[hadoop@nnode code]$ hdfs dfs -text /201511291404/part-r-00000
信息安全 20 3156
搜索引擎 112 24248
站点统计 27 8898
综合门户 15 1938
视频网站 15 1527
[hadoop@nnode code]$
注意:在第一次执行的时候报错如下
15/11/29 14:41:28 INFO mapreduce.Job: map 100% reduce 0%
15/11/29 14:42:04 INFO mapreduce.Job: Task Id : attempt_1448763754600_0003_r_000000_0, Status : FAILED
Error: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.<init>()
at java.lang.Class.getConstructor0(Class.java:2892)
at java.lang.Class.getDeclaredConstructor(Class.java:2058)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
... 13 more
在网上查询原因为“在自定义writable的时候需要注意,反射过程中需要调用无参构造,需要定义无参的构造方法。”。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。