小编给大家分享一下Hadoop Outline的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
Read by hadoop FsURLStreamHandlerFactory
Read/Write by hadoop DistributeFileSystem
package com.jinbao.hadoop.hdfs; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URL; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; /** * */ /** * @author cloudera * */ public class HdfsClient { static String sFileUrl = "hdfs://quickstart.cloudera/gis/gistool/README.md"; /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { if(args.length >= 2){ String sUrl = sFileUrl; if(args[0].equalsIgnoreCase("-r-url")){ sUrl = args[1]; //test read by hadoop FsURLStreamHandlerFactory readHdfsFileByDfsUrl(sUrl); } else if(args[0].equalsIgnoreCase("-r-file")){ sUrl = args[1]; //test read by hadoop dfsFile readHdfsFileByDfsFileApi(sUrl); } else if(args[0].equalsIgnoreCase("-w-file")){ sUrl = args[1]; //test read by hadoop dfsFile writeHdfsFileByDfsFileApi(sUrl); } else if(args[0].equalsIgnoreCase("-w-del")){ sUrl = args[1]; //test read by hadoop dfsFile deleteHdfsFileByDfsFileApi(sUrl); } } } private static void deleteHdfsFileByDfsFileApi(String sUrl) { Configuration conf = new Configuration(); try { FileSystem fs = FileSystem.get(URI.create(sUrl),conf); Path path = new Path(sUrl); fs.delete(path,true); } catch (IOException e) { e.printStackTrace(); } finally{ } } private static void writeHdfsFileByDfsFileApi(String sUrl) { Configuration conf = new Configuration(); OutputStream out = null; byte[] data = "Writing Test".getBytes(); // Get a FSDataInputStream object try { // Get a FSDataInputStream object, actually is HdfsDataInputSteam FileSystem fs = FileSystem.get(URI.create(sUrl),conf); Path path = new Path(sUrl); if(fs.exists(path)){ out = fs.append(path); IOUtils.write(data, out); } else{ out = fs.create(path); out.write(data); // flush buffer to OS out.flush(); FSDataOutputStream fsout = FSDataOutputStream.class.cast(out); // Sync data to disk fsout.hsync(); // call sync implicitly out.close(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ IOUtils.closeQuietly(out); } } public static void readHdfsFileByDfsUrl(String sUrl){ URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); InputStream in = null; try{ URL url = new URL(sUrl); in = url.openStream(); IOUtils.copy(in,System.out); } catch(IOException ioe){ ioe.printStackTrace(); } finally{ IOUtils.closeQuietly(in); } } private static void readHdfsFileByDfsFileApi(String sUrl) { Configuration conf = new Configuration(); InputStream in = null; try{ FileSystem fs = FileSystem.get(URI.create(sUrl),conf); //Get a FSDataInputStream object, actually HdfsDataInputStream in = fs.open(new Path(sUrl)); IOUtils.copy(in,System.out); } catch(IOException ioe){ ioe.printStackTrace(); } finally{ IOUtils.closeQuietly(in); } } }
1. FileSystem to get DistributeFileSystem
2. DistributeFileSystem通过和namenode调用,获得前面几个块位置,并返回datanode的地址,包括备用节点。
3. 客户端调用DistributeFileSystem获取FSDataInputStream,这个流对象连接距离最近的datanode,通过调用Read读取数据。
4. 如果当前块已经读完,则FSDataInputStream关闭这个块,并寻找下一个最佳的datanode并继续读取。
5. 读完最后一个块后,调用close方法关闭数据流。
容错处理
1. FSDataInputStream如果与datanode通信遇到错误,则尝试下一个最佳备用节点。另外,也会记住那个坏节点,以后不会在它上面读数据。并且,会通知namenode哪一个节点有问题。
1. FileSystem to get DistributeFileSystem
2. DistributeFileSystem通过和namenode的RPC调用create()方法,创建文件原数据。如果已经存在,报出IOException.
3. 客户端调用DistributeFileSystem获取FSDataOutputStream,它封装了一个DFSOutputStream对象,来处理datanode和namenode通信。
4. 客户端开始写数据,则FSDataOutputStream将当前块数据分成一个个的数据包packet,并写入一个数据包队列(Data packet queue), 然后datastreamer来根据datanode列表,要求namenode分配合适的datanode,DataStreamer把这些datanode组成数据管线 (datanode pipeline),数目有dfs.replication决定。
5. 开始写数据,每写入一个包都将它备份到另一个确认队列(Data Ack queue),第一个被写入的节点,会把数据写入第二个节点,然后第三个。如果收到都写完的通知,则从确认队列中删除。
6. 写完一个块后,重复4-5,直到最后写完,调用close方法关闭数据流。
容错处理
FSDataOutputStream如果与datanode通信遇到错误
1. 关闭pipeline
2. 把确认队列的数据包,添加到数据包队列,以防止下游节点(downstream node)丢失数据。
3. 为存储正常的datanode的当前数据制定一个新的标示(identifier), 并把这个标示传递给namenode,以便namenode删除故障node的部分数据。
4. 把剩余的数据写入剩下的好的datanode。namenode会创建新的节点,来复制数据,以达到复本量。对于当前写过程,如果写入成功的节点达到dfs.replication.min就算成功,其他的由namenode进行复制。
复本的布局
照顾稳定性和负载均衡
Hadoop的默认布局策略是在运行客户端上放置第1个复本,如果客户端在cluster外,则在集群中随机选择一个节点.
第2个和第3个会随机选择另外一个相同Rack上的两个节点.
% hadoop distcp hdfs://namenode/foo hdfs://namenode2/foo
distcp使用map-reduce的作业来实现,非常适用于两个数据中心同步数据。
如果两个数据中心版本不一致,可以试用hftp协议,使得作业之运行在目标系统上
% hadoop distcp hftp://namenode:50070/foo hdfs://namenode2/foo
注:需要指定hftp端口 50070
为了使map平衡集群,可以参考N*20的设置:-m 20*N,N是节点总数.
balancer还没看。
减小namenode的内存,适合管理小文件,它还是透明的,对map-reduce也是适用的。
%hadoop archive -archivename files.har /myfiles/ /my
不足:
har相当于tar功能,可以打包文件,不支持压缩。仅仅节省的namenode的内存。
一旦创建就不能修改,想添加和删除文件,必须重新建立har.
看完了这篇文章,相信你对“Hadoop Outline的示例分析”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。