hadoop深入研究:(二)——java访问hdfs
2013-12-02 20:37
447 查看
转载请注明出处,http://blog.csdn.net/lastsweetop/article/details/9001467
所有源码在github上,https://github.com/lastsweetop/styhadoop
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.net.URL;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-5-31
* Time: 上午10:16
* To change this template use File | Settings | File Templates.
*/
public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
InputStream in = null;
try {
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
然后FileSystem可以通过一个Path对象打开一个流,之后的操作和上面的例子一样
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-5-31
* Time: 上午11:24
* To change this template use File | Settings | File Templates.
*/
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri=args[0];
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create(uri),conf);
InputStream in=null;
try {
in=fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
[java] view
plaincopy
public interface Seekable {
void seek(long l) throws java.io.IOException;
long getPos() throws java.io.IOException;
boolean seekToNewSource(long l) throws java.io.IOException;
}
seek方法可跳到文件中的任意位置,我们这里跳到文件的初始位置再重新读一次
[java] view
plaincopy
public class FileSystemDoubleCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in=null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
FSDataInputStream还实现了PositionedReadable接口,
[java] view
plaincopy
public interface PositionedReadable {
int read(long l, byte[] bytes, int i, int i1) throws java.io.IOException;
void readFully(long l, byte[] bytes, int i, int i1) throws java.io.IOException;
void readFully(long l, byte[] bytes) throws java.io.IOException;
}
可以在任意位置(第一个参数),偏移量(第三个参数),长度(第四个参数),到数组中(第二个参数)
这里就不实现了,大家可以试下
它还有很多重载方法,可以指定是否强制覆盖已存在的文件,文件的重复因子,写缓存的大小,文件的块大小,文件的权限等。
还可以指定一个回调接口:
[java] view
plaincopy
public interface Progressable {
void progress();
}
和普通文件系统一样,也支持apend操作,写日志时最常用
但并非所有hadoop文件系统都支持append,hdfs支持,s3就不支持。
以下是个拷贝本地文件到hdfs的例子
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-2
* Time: 下午4:54
* To change this template use File | Settings | File Templates.
*/
public class FileCopyWithProgress {
public static void main(String[] args) throws Exception {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
mkdirs方法会自动创建所有不存在的父目录
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-2
* Time: 下午8:58
* To change this template use File | Settings | File Templates.
*/
public class ShowFileStatus {
public static void main(String[] args) throws IOException {
Path path = new Path(args[0]);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
FileStatus status = fs.getFileStatus(path);
System.out.println("path = " + status.getPath());
System.out.println("owner = " + status.getOwner());
System.out.println("block size = " + status.getBlockSize());
System.out.println("permission = " + status.getPermission());
System.out.println("replication = " + status.getReplication());
}
}
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-2
* Time: 下午10:09
* To change this template use File | Settings | File Templates.
*/
public class ListStatus {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-3
* Time: 下午2:49
* To change this template use File | Settings | File Templates.
*/
public class RegexExludePathFilter implements PathFilter {
private final String regex;
public RegexExludePathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-3
* Time: 下午2:37
* To change this template use File | Settings | File Templates.
*/
public class GlobStatus {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FileStatus[] status = fs.globStatus(new Path(uri),new RegexExludePathFilter("^.*/1901"));
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
第一个参数很明确,第二个参数表示是否递归删除子目录或目录下的文件,在Path为目录但目录是空的或者Path为文件时可以忽略,但如果Path为目录且不为空的情况下,如果recursive为false,那么删除就会抛出io异常。
所有源码在github上,https://github.com/lastsweetop/styhadoop
读数据
使用hadoop url读取
比较简单的读取hdfs数据的方法就是通过java.net.URL打开一个流,不过在这之前先要预先调用它的setURLStreamHandlerFactory方法设置为FsUrlStreamHandlerFactory(由此工厂取解析hdfs协议),这个方法只能调用一次,所以要写在静态块中。然后调用IOUtils类的copyBytes将hdfs数据流拷贝到标准输出流System.out中,copyBytes前两个参数好理解,一个输入,一个输出,第三个是缓存大小,第四个指定拷贝完毕后是否关闭流。我们这里要设置为false,标准输出流不关闭,我们要手动关闭输入流。[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.net.URL;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-5-31
* Time: 上午10:16
* To change this template use File | Settings | File Templates.
*/
public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
InputStream in = null;
try {
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
使用FileSystem API读取数据
首先是实例化FileSystem对象,通过FileSystem类的get方法,这里要传入一个java.net.URL和一个配置Configuration。然后FileSystem可以通过一个Path对象打开一个流,之后的操作和上面的例子一样
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-5-31
* Time: 上午11:24
* To change this template use File | Settings | File Templates.
*/
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri=args[0];
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create(uri),conf);
InputStream in=null;
try {
in=fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
FSDataInputStream
通过FileSystem打开流返回的对象是个FSDataInputStream对象,该类实现了Seekable接口,[java] view
plaincopy
public interface Seekable {
void seek(long l) throws java.io.IOException;
long getPos() throws java.io.IOException;
boolean seekToNewSource(long l) throws java.io.IOException;
}
seek方法可跳到文件中的任意位置,我们这里跳到文件的初始位置再重新读一次
[java] view
plaincopy
public class FileSystemDoubleCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in=null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
FSDataInputStream还实现了PositionedReadable接口,
[java] view
plaincopy
public interface PositionedReadable {
int read(long l, byte[] bytes, int i, int i1) throws java.io.IOException;
void readFully(long l, byte[] bytes, int i, int i1) throws java.io.IOException;
void readFully(long l, byte[] bytes) throws java.io.IOException;
}
可以在任意位置(第一个参数),偏移量(第三个参数),长度(第四个参数),到数组中(第二个参数)
这里就不实现了,大家可以试下
写数据
FileSystem类有很多种创建文件的方法,最简单的一种是public FSDataOutputStream create(Path f) throws IOException
它还有很多重载方法,可以指定是否强制覆盖已存在的文件,文件的重复因子,写缓存的大小,文件的块大小,文件的权限等。
还可以指定一个回调接口:
[java] view
plaincopy
public interface Progressable {
void progress();
}
和普通文件系统一样,也支持apend操作,写日志时最常用
public FSDataOutputStream append(Path f) throws IOException
但并非所有hadoop文件系统都支持append,hdfs支持,s3就不支持。
以下是个拷贝本地文件到hdfs的例子
[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-2
* Time: 下午4:54
* To change this template use File | Settings | File Templates.
*/
public class FileCopyWithProgress {
public static void main(String[] args) throws Exception {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
目录
创建一个目录的方法:public boolean mkdirs(Path f) throws IOException
mkdirs方法会自动创建所有不存在的父目录
检索
检索一个目录,查看目录和文件的信息在任何操作系统这些都是不可或缺的功能,hdfs也不例外,但也有一些特别的地方:FileStatus
FileStatus 封装了hdfs文件和目录的元数据,包括文件的长度,块大小,重复数,修改时间,所有者,权限等信息,FileSystem的getFileStatus可以获得这些信息,[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-2
* Time: 下午8:58
* To change this template use File | Settings | File Templates.
*/
public class ShowFileStatus {
public static void main(String[] args) throws IOException {
Path path = new Path(args[0]);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
FileStatus status = fs.getFileStatus(path);
System.out.println("path = " + status.getPath());
System.out.println("owner = " + status.getOwner());
System.out.println("block size = " + status.getBlockSize());
System.out.println("permission = " + status.getPermission());
System.out.println("replication = " + status.getReplication());
}
}
Listing files
有时候你可能会需要找一组符合要求的文件,那么下面的示例就可以帮到你,通过FileSystem的listStatus方法可以获得符合条件的一组FileStatus对象,listStatus有几个重载的方法,可以传入多个路径,还可以使用PathFilter做过滤,我们下面就会讲到它。这里还有一个重要的方法,FileUtils.stat2Paths可以将一组FileStatus对象转换成一组Path对象,这是个非常便捷的方法。[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-2
* Time: 下午10:09
* To change this template use File | Settings | File Templates.
*/
public class ListStatus {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
PathFilter
接着上面我们来讲PathFilter接口,该接口只需实现其中的一个方法即可,即accpet方法,方法返回true时表示被过滤掉,我们来实现一个正则过滤,并在下面的例子里起作用[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-3
* Time: 下午2:49
* To change this template use File | Settings | File Templates.
*/
public class RegexExludePathFilter implements PathFilter {
private final String regex;
public RegexExludePathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}
File patterns
当需要很多文件时,一个个列出路径是很不便捷的,hdfs提供了一个通配符列出文件的方法,通过FileSystem的globStatus方法提供了这个便捷,globStatus也有重载的方法,使用PathFilter过滤,那么我们结合两个来实现一下[java] view
plaincopy
package com.sweetop.styhadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* Created with IntelliJ IDEA.
* User: lastsweetop
* Date: 13-6-3
* Time: 下午2:37
* To change this template use File | Settings | File Templates.
*/
public class GlobStatus {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FileStatus[] status = fs.globStatus(new Path(uri),new RegexExludePathFilter("^.*/1901"));
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
删除数据
删除数据比较简单public abstract boolean delete(Path f, boolean recursive) throws IOException
第一个参数很明确,第二个参数表示是否递归删除子目录或目录下的文件,在Path为目录但目录是空的或者Path为文件时可以忽略,但如果Path为目录且不为空的情况下,如果recursive为false,那么删除就会抛出io异常。
相关文章推荐
- hadoop深入研究:(二)——java访问hdfs
- hadoop深入研究:(二)——java访问hdfs
- hadoop深入研究:(二)——java访问hdfs
- hadoop深入研究:(二)——java访问hdfs
- 09_java访问Hadoop的HDFS
- hadoop深入研究:(六)——HDFS数据完整性
- Java程序中不通过hadoop jar的方式访问hdfs
- hadoop深入研究:(六)——HDFS数据完整性
- hadoop深入研究:(一)——hdfs介绍
- hadoop深入研究:(一)——hdfs介绍
- hadoop深入研究:(三)——hdfs数据流
- hadoop深入研究:(三)——hdfs数据流
- Hadoop(八)Java程序访问HDFS集群中数据块与查看文件系统
- 访问Hadoop的HDFS文件系统的Java实现
- hadoop学习笔记--5.HDFS的java api接口访问
- Hadoop HDFS (3) JAVA访问HDFS
- hadoop深入研究:(一)——hdfs介绍
- Hadoop日记Day9---HDFS的java访问接口
- hadoop深入研究:(一)——hdfs介绍
- Hadoop深入研究(HDFS)---HDFS介绍