本篇内容主要讲解“hive怎么指定多个字符作为列分隔符”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“hive怎么指定多个字符作为列分隔符”吧!
hive创建表指定分隔符,不支持多个字符作为分隔符,如果想使用多个字符作为分割符的话就需要实现InputFormat.主要重写next方法,代码如下
package hiveStream;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
public class MyHiveInputFormat extends TextInputFormat implements
JobConfigurable {
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new MyRecordReader((FileSplit) genericSplit, job);
}
}
package hiveStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;
public class MyRecordReader implements RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader lineReader;
int maxLineLength;
// 构造方法
public MyRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
end = start + inputSplit.getLength();
final Path file = inputSplit.getPath();
// 创建压缩器
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// 打开文件系统
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine = false;
if (codec != null) {
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
lineReader = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
public MyRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength) {
this.maxLineLength = maxLineLength;
this.start = offset;
this.lineReader = new LineReader(in);
this.pos = offset;
this.end = endOffset;
}
public MyRecordReader(InputStream in, long offset, long endOffset,
Configuration job) throws IOException {
this.maxLineLength = job.getInt(
"mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
this.lineReader = new LineReader(in, job);
this.start = offset;
this.end = endOffset;
}
@Override
public void close() throws IOException {
if (lineReader != null)
lineReader.close();
}
@Override
public LongWritable createKey() {
return new LongWritable();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float) (end - start));
}
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
while (pos < end) {
key.set(pos);
int newSize = lineReader.readLine(value, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
// 把字符串中的"##"转变为"#"
String strReplace = value.toString().replace("##", "#");
Text txtReplace = new Text();
txtReplace.set(strReplace);
value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
if (newSize == 0)
return false;
pos += newSize;
if (newSize < maxLineLength)
return true;
}
return false;
}
}
建表语句:自定义 outputformat/inputformat 后,在建表时需要指定 outputformat/inputformat
create external table testHiveInput( id int,name string,age int) row format delimited fields terminated by '|' stored as INPUTFORMAT 'hiveStream.MyHiveInputFormat'
OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/user/hdfs/hiveInput';
测试数据:
1##Tom##22
2##Jerry##22
3##Jeny##22
测试代码( 通过jdbc来查询数据):
public static void testHive() throws Exception {
String sql = "select id,name,age from testHiveInput";
Class.forName("org.apache.hive.jdbc.HiveDriver");
String url = "jdbc:hive2://xxx.xxx.xxx.xxx:10000";
Connection conn = DriverManager.getConnection(url, "hive", "passwd");
Statement stmt = conn.createStatement();
stmt.execute("add jar /usr/lib/hive/lib/hiveInput.jar");
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("id")+" "+ rs.getString("name") + " " + rs.getString("age"));
}
}
到此,相信大家对“hive怎么指定多个字符作为列分隔符”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/1167806/blog/200808