您的位置:首页 > 编程语言

Hadoop-0.20.0源代码分析(08)

2009-09-23 20:48 316 查看
这里,继续对FsShell类中一些命令进行阅读分析,主要是看与拷贝文件有关的几个命令。

cp命令

该命令实现对文件的拷贝操作,并且支持在不同的文件系统之间进行文件的拷贝。拷贝文件涉及的操作比较复杂,核心拷贝操作还是调用了org.apache.hadoop.fs.FileUtil类的copy方法实现的。 先看该类中定义的其中一个copy方法的实现:

private int copy(String argv[], Configuration conf) throws IOException {
int i = 0;
int exitCode = 0;
String cmd = argv[i++];
String dest = argv[argv.length-1]; // 命令行中最后一个参数
// 若指定了多个输入源文件,则最后一个参数必须是一个目录
if (argv.length > 3) {
Path dst = new Path(dest);
if (!fs.isDirectory(dst)) { // 最后一个参数必须是目录
throw new IOException("When copying multiple files, " + "destination " + dest + " should be a directory.");
}
}
// 循环对每一个文件进行拷贝操作
for (; i < argv.length - 1; i++) {
try {
copy(argv[i], dest, conf); // 将文件argv[i]拷贝到dest目录中
} catch (RemoteException e) {
// 捕获命令执行发生的异常信息
exitCode = -1;
try {
String[] content;
content = e.getLocalizedMessage().split("/n");
System.err.println(cmd.substring(1) + ": " +
content[0]);
} catch (Exception ex) {
System.err.println(cmd.substring(1) + ": " +
ex.getLocalizedMessage());
}
} catch (IOException e) {
// 捕获IO异常信息
exitCode = -1;
System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());
}
}
return exitCode;
}


该命令的实现与mv命令类似,这里调用了一个重载的copy命令,实现对一个文件执行拷贝操作。该重载的拷贝方法如下所示:

void copy(String srcf, String dstf, Configuration conf) throws IOException {
Path srcPath = new Path(srcf); // 构造Path
FileSystem srcFs = srcPath.getFileSystem(getConf()); // 获取到srcPath所在的文件系统srcFs
Path dstPath = new Path(dstf);
FileSystem dstFs = dstPath.getFileSystem(getConf()); // 获取到dstPath所在的文件系统dstFs
Path [] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPath), srcPath); // 获取到srcFs中满足srcPath模式的FileStatus[]并转换成为Path[]
if (srcs.length > 1 && !dstFs.isDirectory(dstPath)) {
throw new IOException("When copying multiple files, " + "destination should be a directory.");
}
for(int i=0; i<srcs.length; i++) { // 循环拷贝操作
FileUtil.copy(srcFs, srcs[i], dstFs, dstPath, false, conf); // 调用FileUtil类的拷贝方法copy完成文件在srcFs与dstFs文件系统之间拷贝文件的操作
}
}


现在,我们开始追踪 org.apache.hadoop.fs.FileUtil类的copy方法,看一看拷贝到底是如何实现的。FileUtil类中定义了多个重载的拷贝方法copy,我们只从FsShell类中调用的copy方法开始追踪其实现。上面调用的FileUtil类的copy实现如下所示:

public static boolean copy(FileSystem srcFS, Path src,
FileSystem dstFS, Path dst,
boolean deleteSource,
Configuration conf) throws IOException {
return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); // 调用了一个重载的copy方法实现文件在srcFS与dstFS之间进行复制
}


看重载的copy方法的实现,如下所示:

public static boolean copy(FileSystem srcFS, Path src,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
dst = checkDest(src.getName(), dstFS, dst, overwrite); // 检查目的文件系统dstFS中dst目录是否合法(参照src)
if (srcFS.getFileStatus(src).isDir()) { // 若源文件系统srcFS中src是目录
checkDependencies(srcFS, src, dstFS, dst); // 检查文件依赖性:如果srcFS=dstFS,并且dst不是src的子目录,检查通过;如果srcFS与dstFS不是同一个文件系统,依赖性检查通过
if (!dstFS.mkdirs(dst)) { // 在dstFS中创建dst目录,准备向其中拷贝数据
return false;
}
FileStatus contents[] = srcFS.listStatus(src); // 获取srcFS中src目录下的文件列表
for (int i = 0; i < contents.length; i++) { // 分治思想:分治后执行递归拷贝文件操作
copy(srcFS, contents[i].getPath(), dstFS,
new Path(dst, contents[i].getPath().getName()),
deleteSource, overwrite, conf); // 递归调用
}
} else if (srcFS.isFile(src)) { // 递归出口(如果src是一个普通文件)
InputStream in=null;
OutputStream out = null;
try {
in = srcFS.open(src); // 打开srcFS中的src文件,并返回输入流对象
out = dstFS.create(dst, overwrite); // 在目的文件系统dstFS中创建dst文件,并返回输出流,等待写入
IOUtils.copyBytes(in, out, conf, true); // 调用:通过调用IOUtils类的copyBytes方法实现流式拷贝
} catch (IOException e) {
IOUtils.closeStream(out); // 关闭out
IOUtils.closeStream(in); // 关闭in
throw e;
}
} else {
throw new IOException(src.toString() + ": No such file or directory");
}
if (deleteSource) { // 如果设置了拷贝完成后删除源文件选项
return srcFS.delete(src, true); // 删除源文件系统srcFS的源文件src
} else {
return true;
}

}


IOUtils类中实现了Hadoop文件系统中文件的 流式拷贝操作,我们追踪该工具类的copyBytes方法,分析实现的过程。该方法如下所示:

/**
* 从一个流拷贝到另一个流中
*/
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
throws IOException {
copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close); // 调用:重载的copyBytes方法实现流式拷贝
}


我们看重载流式拷贝实现方法copyBytes,如下所示:

public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; // 使用PrintStream为out流增加便捷功能
byte buf[] = new byte[buffSize]; // 字节缓冲区
try {
int bytesRead = in.read(buf); // 从输入流in读取bytesRead个字节到buf缓冲区中
while (bytesRead >= 0) { // 确实读取到了字节
out.write(buf, 0, bytesRead); // 将从in中读取到的字节,通过buf缓冲区写入到输出流out中
if ((ps != null) && ps.checkError()) { // 如果ps=(PrintStream)out,测试内部标志,并自动刷新
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf); // 继续从in读取字节
}
} finally {
if(close) {
out.close(); // 关闭out
in.close(); // 关闭in
}
}
}


上面在从InputStream in拷贝到OutputStream out中的过程中,使用了更加高效的PrintStream流类,它能够为OutputStream增加方便打印各种数据值的表示形式,而且,它不会抛出IO异常,而是将流式拷贝过程中发生的异常,设置为通过调用checkError方法来检测内部的标志。另外,它还可以实现自动刷新,在向输出流中写入字节(通过字节缓冲区)之后,自动刷新。

cp命令的具体实现都在上面进行分析了,应该能够理解在Hadoop中如何在不同文件系统之间执行流式拷贝文件的过程。

copyFromLocal命令

该命令实现了从本地文件系统拷贝文件的操作。实现方法为,如下所示:

/**
* 从本地文件系统(srcs在本地文件系统中)拷贝srcs到目的文件系统(对应Path为dstf)
*/
void copyFromLocal(Path[] srcs, String dstf) throws IOException {
Path dstPath = new Path(dstf);
FileSystem dstFs = dstPath.getFileSystem(getConf()); // 获取到目的文件系统dstFs
if (srcs.length == 1 && srcs[0].toString().equals("-")) // 如果只指定了一个参数“-”
copyFromStdin(dstPath, dstFs); // 调用:从标准输入流中进行流式拷贝操作
else // 否则
dstFs.copyFromLocalFile(false, false, srcs, dstPath); // 调用目的文件系统dstFs的copyFromLocalFile方法执行拷贝操作
}


我们关注一下copyFromStdin方法拷贝的实现,如下所示:

private void copyFromStdin(Path dst, FileSystem dstFs) throws IOException {
if (dstFs.isDirectory(dst)) { // 如果目的文件是目录,不支持源为标准输入流的情况
throw new IOException("When source is stdin, destination must be a file.");
}
if (dstFs.exists(dst)) { // 如果目的文件系统dstFs中存在文件dst,出错
throw new IOException("Target " + dst.toString() + " already exists.");
}
FSDataOutputStream out = dstFs.create(dst); // 满足拷贝要求,执行流式拷贝操作
try {
IOUtils.copyBytes(System.in, out, getConf(), false); // 调用IOUtils类的copyBytes方法实现,前面已经分析过拷贝过程
}
finally {
out.close(); // 拷贝完成,关闭输出流out
}
}


再看一下,如果指定的是待拷贝的文件源不是标准输入流的情况,文件系统FileSystem是如何实现拷贝操作的。实现的方法copyFromLocalFile如下所示:

/**
* 将本地的srcs,拷贝到目的文件系统中的dst
* delSrc指示了拷贝文件完成之后,是否删除源文件srcs
*/
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf); // 调用FileUtil工具类实现拷贝操作
}


关于FileUtil的copy方法,前面已经详细分析过,不再累述。

像moveFromLocal、moveFromLocal、copyToLocal、moveToLocal、copyMergeToLocal等命令的实现都非常类似,也不做过多的解释了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: