本篇内容主要讲解“ORC文件读写工具类和Flink输出ORC格式文件的方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ORC文件读写工具类和Flink输出ORC格式文件的方法”吧!
压缩
压缩比例在1:7到1:10之间,3份副本的话会节省接近10倍空间
调查数据周末要给出
数据压缩后要注意负载均衡问题,可以尝试reblance
导出
hive的orc文件使用sqoop导出到mysql使用hcatalog直接增加一些配置参数即可
查看
以json方式查看orc文件
hive --orcfiledump -j -p /user/hive/warehouse/dim.db/dim_province/000000_0
下载
以KV形式查看orc文件
hive --orcfiledump -d /user/hive/warehouse/dim.db/dim_province/000000_0 > myfile.txt
orc读取会查找字段在min和max中的值,不包含则跳过,所以速度会快
注意事项: 在windows读写时,请务必保证classpath ,path中不要有hadoop的环境变量! 如果有,请先删除,并且重启IDE
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; import java.io.IOException; public class CoreReader { public static void main(Configuration conf, String[] args) throws IOException { // Get the information from the file footer Reader reader = OrcFile.createReader(new Path("my-file.orc"), OrcFile.readerOptions(conf)); System.out.println("File schema: " + reader.getSchema()); System.out.println("Row count: " + reader.getNumberOfRows()); // Pick the schema we want to read using schema evolution TypeDescription readSchema = TypeDescription.fromString("struct<z:int,y:string,x:bigint>"); // Read the row data VectorizedRowBatch batch = readSchema.createRowBatch(); RecordReader rowIterator = reader.rows(reader.options() .schema(readSchema)); LongColumnVector z = (LongColumnVector) batch.cols[0]; BytesColumnVector y = (BytesColumnVector) batch.cols[1]; LongColumnVector x = (LongColumnVector) batch.cols[2]; while (rowIterator.nextBatch(batch)) { for(int row=0; row < batch.size; ++row) { int zRow = z.isRepeating ? 0: row; int xRow = x.isRepeating ? 0: row; System.out.println("z: " + (z.noNulls || !z.isNull[zRow] ? z.vector[zRow] : null)); System.out.println("y: " + y.toString(row)); System.out.println("x: " + (x.noNulls || !x.isNull[xRow] ? x.vector[xRow] : null)); } } rowIterator.close(); } public static void main(String[] args) throws IOException { main(new Configuration(), args); } }
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import java.io.IOException;import java.nio.charset.StandardCharsets;public class CoreWriter { public static void main(Configuration conf, String[] args) throws IOException { TypeDescription schema = TypeDescription.fromString("struct<x:int,y:string>"); Writer writer = OrcFile.createWriter(new Path("my-file.orc"), OrcFile.writerOptions(conf) .setSchema(schema)); VectorizedRowBatch batch = schema.createRowBatch(); LongColumnVector x = (LongColumnVector) batch.cols[0]; BytesColumnVector y = (BytesColumnVector) batch.cols[1];for(int r=0; r < 10000; ++r) { int row = batch.size++; x.vector[row] = r; byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8); y.setRef(row, buffer, 0, buffer.length); // If the batch is full, write it out and start over. if (batch.size == batch.getMaxSize()) { writer.addRowBatch(batch); batch.reset(); } }if (batch.size != 0) { writer.addRowBatch(batch); } writer.close(); } public static void main(String[] args) throws IOException {main(new Configuration(), args); } }
import org.apache.flink.core.fs.Path; import org.apache.flink.orc.OrcSplitReaderUtil; import org.apache.flink.orc.vector.RowDataVectorizer; import org.apache.flink.orc.writer.OrcBulkWriterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.hadoop.conf.Configuration; import org.apache.orc.TypeDescription; import java.util.Properties; public class StreamingWriteFileOrc { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000); env.setParallelism(1); DataStream<RowData> dataStream = env.addSource( new MySource()); //写入orc格式的属性 final Properties writerProps = new Properties(); writerProps.setProperty("orc.compress", "LZ4"); //定义类型和字段名 LogicalType[] orcTypes = new LogicalType[]{ new IntType(), new DoubleType(), new VarCharType()}; String[] fields = new String[]{"a", "b", "c"}; TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of( orcTypes, fields)); //构造工厂类OrcBulkWriterFactory final OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>( new RowDataVectorizer(typeDescription.toString(), orcTypes), writerProps, new Configuration()); StreamingFileSink orcSink = StreamingFileSink .forBulkFormat(new Path("file:///tmp/aaaa"), factory) .build(); dataStream.addSink(orcSink); env.execute(); } public static class MySource implements SourceFunction<RowData>{ @Override public void run(SourceContext<RowData> sourceContext) throws Exception{ while (true){ GenericRowData rowData = new GenericRowData(3); rowData.setField(0, (int) (Math.random() * 100)); rowData.setField(1, Math.random() * 100); rowData.setField(2, org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() * 100))); sourceContext.collect(rowData); Thread.sleep(1); } } @Override public void cancel(){ } } }
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <encoding>UTF-8</encoding> <maven.compiler.target>1.8</maven.compiler.target> <scala.tools.version>2.11</scala.tools.version> <scala.version>2.11</scala.version> <flink.cluster.version>1.12.3</flink.cluster.version> <logback.version>1.2.0</logback.version> <slf4j.version>1.7.21</slf4j.version> <hbase.version>1.3.1</hbase.version> <scope.value>compile</scope.value> </properties> <dependencies> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.15</version> </dependency> <!-- 单元测试组件--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.cluster.version}</version> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>${flink.cluster.version}</version> <type>pom</type> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.cluster.version}</version> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.cluster.version}</version> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.11.3</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_${scala.version}</artifactId> <version>1.11.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-orc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-orc_2.11</artifactId> <version>1.12.3</version> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-ml_${scala.version}</artifactId> <version>1.8.1</version> <scope>${scope.value}</scope> </dependency> <!-- 新的Blink planner --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.cluster.version}</version> <scope>${scope.value}</scope> </dependency> <!-- 如果需要实现自定义的格式(比如和kafka交互)或者用户自定义函数,需要添加如下依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.cluster.version}</version> <scope>${scope.value}</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>1.12.3</version> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.cluster.version}</version> <exclusions> <exclusion> <artifactId>commons-lang3</artifactId> <groupId>org.apache.commons</groupId> </exclusion> <exclusion> <artifactId>commons-cli</artifactId> <groupId>commons-cli</groupId> </exclusion> </exclusions> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.cluster.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <scope>${scope.value}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> <scope>${scope.value}</scope> <exclusions> <exclusion> <groupId>xml-apis</groupId> <artifactId>xml-apis</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_${scala.version}</artifactId> <version>${flink.cluster.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.cluster.version}</version> </dependency> <!-- 日志相关组件--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <!-- 用于redis访问--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.5.0</version> </dependency> <!--alibaba druid数据库连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.11</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.cluster.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> </dependencies>
到此,相信大家对“ORC文件读写工具类和Flink输出ORC格式文件的方法”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。