所需jar包
一、URL API操作方式
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class HDFSUrlTest {
/**
* HDFS URL API操作方式
* 不需要读取core-site.xml和hdfs-site.xml配置文件
*/
// 让JAVA程序识别HDFS的URL
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
// 查看文件内容
@Test
public void testRead() throws Exception {
InputStream in = null;
// 文件路径
String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data";
try {
// 获取文件输入流
in = new URL(fileUrl).openStream();
// 将文件内容读取出来,打印控制台
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
二、通过FileSystem API操作HDFS
HDFS工具类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
public class HDFSUtils {
/**
* HDFS工具类
*/
public static FileSystem getFileSystem() {
//声明FileSystem
FileSystem hdfs=null;
try {
//获取文件配置信息
Configuration conf =new Configuration();
//获取文件系统
hdfs=FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}
return hdfs;
}
}
常用操作实现类
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.gethistory_jsp;
import org.junit.Test;
public class HDFSFsTest {
/**
*
* 通过FileSystem API操作HDFS
*/
// 读取文件内容
@Test
public void testRead() throws Exception {
// 获取文件系统
FileSystem hdfs = HDFSUtils.getFileSystem();
// 文件名称
Path path = new Path("/opt/data/test/touch.data");
// 打开文件输入流
FSDataInputStream inStream = hdfs.open(path);
// 读取文件到控制台显示
IOUtils.copyBytes(inStream, System.out, 4096, false);
// 关闭流
IOUtils.closeStream(inStream);
}
// 查看目录
@Test
public void testList() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
// 文件名称
Path path = new Path("/opt/data");
FileStatus[] fileStatus = hdfs.listStatus(path);
for (FileStatus file : fileStatus) {
Path p = file.getPath();
String info = file.isDir() ? "目录" : "文件";
System.out.println(info + ":" + p);
}
}
// 创建目录
@Test
public void testDirectory() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
// 要创建的目录
Path path = new Path("/opt/data/dir");
boolean isSuccessful = hdfs.mkdirs(path);// 相当于 linux下 mkdir -p
// /opt/data/dir
String info = isSuccessful ? "成功" : "失败";
System.out.println("创建目录【" + path + "】" + info);
}
// 上传文件-- put copyFromLocal
@Test
public void testPut() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
// 本地文件(目录+文件名称)
Path srcPath = new Path("c:/0125.log");
// hdfs文件上传路径
Path dstPath = new Path("/opt/data/dir/");
hdfs.copyFromLocalFile(srcPath, dstPath);
}
// 创建hdfs文件并写入内容
@Test
public void testCreate() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
Path path = new Path("/opt/data/dir/touch.data");
// 创建文件并获取输出流
FSDataOutputStream fSDataOutputStream = hdfs.create(path);
// 通过输出流写入数据
fSDataOutputStream.write("你好".getBytes());
fSDataOutputStream.writeUTF("hello hadoop!");
IOUtils.closeStream(fSDataOutputStream);
}
// 文件重命名
@Test
public void testRename() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
Path oldPath = new Path("/opt/data/dir/touch.data");
Path newPath = new Path("/opt/data/dir/rename.data");
boolean flag = hdfs.rename(oldPath, newPath);
System.out.println(flag);
}
// 删除文件
public void testDelete() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
Path path = new Path("/opt/data/dir/touch.data");
boolean flag = hdfs.deleteOnExit(path);
System.out.println(flag);
}
// 删除目录
public void testDeleteDir() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
Path path = new Path("/opt/data/dir");
boolean flag = hdfs.delete(path, true);// 如果是目录第二个参数必须为true
System.out.println(flag);
}
// 查找某个文件在hdfs集群的位置
public void testLocation() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
Path path = new Path("/opt/data/test.file");
FileStatus fileStatus = hdfs.getFileStatus(path);
BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus,
0, fileStatus.getLen());
for (BlockLocation blockLocation : blockLocations) {
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.print(host + " ");
}
System.out.println();
}
}
// 获取hdfs集群上所有节点名称信息
public void testCluster() throws Exception {
FileSystem hdfs = HDFSUtils.getFileSystem();
DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs;
DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats();
for (DatanodeInfo datanodeInfo : datanodeInfos) {
String hostName = datanodeInfo.getHostName();
System.out.println(hostName);
}
}
}
三、上传合并小文件到hdfs
实现思想:循环遍历本地文件输入流
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
*
* 向hdfs上传复制文件的过程中,进行合并文件
*
*/
public class PutMerge {
/**
*
* @param localDir
* 本地要上传的文件目录
* @param hdfsFile
* HDFS上的文件名称,包括路径
*/
public static void put(String localDir, String hdfsFile) throws Exception {
// 获取配置信息
Configuration conf = new Configuration();
Path localPath = new Path(localDir);
Path hdfsPath = new Path(hdfsFile);
// 获取本地文件系统
FileSystem localFs = FileSystem.getLocal(conf);
// 获取HDFS
FileSystem hdfs = FileSystem.get(conf);
// 本地文件系统指定目录中的所有文件
FileStatus[] status = localFs.listStatus(localPath);
// 打开hdfs上文件的输出流
FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath);
// 循环遍历本地文件
for (FileStatus fileStatus : status) {
// 获取文件
Path path = fileStatus.getPath();
System.out.println("文件为:" + path.getName());
// 打开文件输入流
FSDataInputStream fSDataInputStream = localFs.open(path);
// 进行流的读写操作
byte[] buff = new byte[1024];
int len = 0;
while ((len = fSDataInputStream.read(buff)) > 0) {
fSDataOutputStream.write(buff, 0, len);
}
fSDataInputStream.close();
}
fSDataOutputStream.close();
}
public static void main(String[] args) {
String localDir="D:/logs";
String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data";
try {
put(localDir,hdfsFile);
} catch (Exception e) {
e.printStackTrace();
}
}
}
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。