HDFS文件系统内的文件格式转换(zip格式转化成gzip格式)
2013-02-27 14:44
465 查看
这篇主要介绍利用hdfs接口,使用java编程向hdfs写入数据。
一、模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs:
View Code
二、写一个与hadoop fs -getmerge相对应的一个简单程序: putmerge 。
我们知道,getmerge命令是从hdfs上获取大量文件组合成一个文件放到本地文件系统中的命令。但是hadoop没有提供与这一过程相逆的命令。不幸的是我们会在处理apache日志过程中常用到这样的一个命令,比如有很多按日期分的apache日志。
我们想传到hdfs中使用MepReduce来处理的话,我们只能用笨办法先本地合成大文件,然后上传这个大文件到hdfs,这种方法很低效。我们接下来给出一个程序,利用hdfs提供的JavaAPI来编写一个上传多个文件的过程中合成一个大文件的程序:
将代码打包成putMerge.jar格式后放在个人常用的路径下
执行的时候采用:
[root@JueFan pconline]#hadoop jar putMerge.jar putMerge /home/juefan/*(本地目录名) /user/juefan/(HDFS文件系统目录)
三、有时候我们想合并hdfs中的文件,并存在hdfs里,又不想经过下载到local文件系统里这一过程,我们可以书写这样的程序,并且实现递归合并:
四、更不幸的是我们经常遇到的并非正常的文本文件,因为直接存储文本文件比较浪费空间,所以大部分服务器运维人员针对该类日志文件都是进行压缩打包存放的,所以我们有时候,或者说更多情况下需要的是对大量压缩包进行解压缩合并上传到hdfs的命令,为了方便我们同样只能自己搞生产了。
以下代码的主要作用是将HDFS内的zip格式文件转换成gzip格式
目的:hive在进行外部表数据读取的时候不能读取zip格式文件,能读取文本文件
先前的作法是:先从HDFS中把zip文件取到本地中,再将zip文件在本地中解压出来,通过加载数据的形式将本地的文件回到到HDFS中
其弊端主要有:1、可操作性弱,要来回折腾HDFS与本地上的文件 2、zip文件解压出来加载回HDFS后占用的空间较大
通过正面这种形式能够解决以上两个问题
具体的执行方式如下:
[root@JueFan pconline]# hadoop jar zip_to_gzip.jar filesmerge /zip/(待转换文件路径,在HDFS上) /user/juefan/pconline/(转换完成后的文件存储地址,也在HDFS上) pconline(待转换的文件名包含的字符)
如果要实现递归的话,可以在filesmerge后面加上 -r
本文的前三个要点转载自三江小渡的博客:http://blog.pureisle.net/archives/1701.html
该博客的第四个要点没有写出具体实现,本人就帮忙完善了
一、模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs:
View Code
二、写一个与hadoop fs -getmerge相对应的一个简单程序: putmerge 。
我们知道,getmerge命令是从hdfs上获取大量文件组合成一个文件放到本地文件系统中的命令。但是hadoop没有提供与这一过程相逆的命令。不幸的是我们会在处理apache日志过程中常用到这样的一个命令,比如有很多按日期分的apache日志。
我们想传到hdfs中使用MepReduce来处理的话,我们只能用笨办法先本地合成大文件,然后上传这个大文件到hdfs,这种方法很低效。我们接下来给出一个程序,利用hdfs提供的JavaAPI来编写一个上传多个文件的过程中合成一个大文件的程序:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class putMerge { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); FileSystem local = FileSystem.getLocal(conf); Path inputDir = new Path(args[0]); Path hdfsFile = new Path(args[1]); try { FileStatus[] inputFiles = local.listStatus(inputDir); FSDataOutputStream out = hdfs.create(hdfsFile); for (int i = 0; i < inputFiles.length; i++) { System.out.println(inputFiles[i].getPath().getName()); FSDataInputStream in = local.open(inputFiles[i].getPath()); byte buffer[] = new byte[256]; int bytesRead = 0; while ((bytesRead = in.read(buffer)) > 0) { out.write(buffer, 0, bytesRead); } in.close(); } out.close(); } catch (IOException e) { e.printStackTrace(); } } }
将代码打包成putMerge.jar格式后放在个人常用的路径下
执行的时候采用:
[root@JueFan pconline]#hadoop jar putMerge.jar putMerge /home/juefan/*(本地目录名) /user/juefan/(HDFS文件系统目录)
三、有时候我们想合并hdfs中的文件,并存在hdfs里,又不想经过下载到local文件系统里这一过程,我们可以书写这样的程序,并且实现递归合并:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class filesmerge { public static boolean isRecur = false; public static void merge(Path inputDir, Path hdfsFile, FileSystem hdfs,FSDataOutputStream out) { try { FileStatus[] inputFiles = hdfs.listStatus(inputDir); for (int i = 0; i < inputFiles.length; i++) { if (!hdfs.isFile(inputFiles[i].getPath())) { if (isRecur){ merge(inputFiles[i].getPath(), hdfsFile, hdfs,out); return ; } else { System.out.println(inputFiles[i].getPath().getName() + "is not file and not allow recursion, skip!"); continue; } } System.out.println(inputFiles[i].getPath().getName()); FSDataInputStream in = hdfs.open(inputFiles[i].getPath()); byte buffer[] = new byte[256]; int bytesRead = 0; while ((bytesRead = in.read(buffer)) > 0) { out.write(buffer, 0, bytesRead); } in.close(); } out.close(); } catch (IOException e) { e.printStackTrace(); } } public static void errorMessage(String str) { System.out.println("Error Message: " + str); System.exit(1); } public static void main(String[] args) throws IOException { if (args.length == 0) errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); if (args[0].matches("^-[rR]$")) { isRecur = true; } if ((isRecur && args.length != 3) || ( !isRecur && args.length != 2)) { errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); } Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); Path inputDir; Path hdfsFile; if(isRecur){ inputDir = new Path(args[1]); hdfsFile = new Path(args[2]); } else{ inputDir = new Path(args[0]); hdfsFile = new Path(args[1]); } if (!hdfs.exists(inputDir)) { errorMessage("hdfsTargetDir not exist!"); } if (hdfs.exists(hdfsFile)) { errorMessage("hdfsFileName exist!"); } FSDataOutputStream out = hdfs.create(hdfsFile); merge(inputDir, hdfsFile, hdfs,out); System.exit(0); } }
四、更不幸的是我们经常遇到的并非正常的文本文件,因为直接存储文本文件比较浪费空间,所以大部分服务器运维人员针对该类日志文件都是进行压缩打包存放的,所以我们有时候,或者说更多情况下需要的是对大量压缩包进行解压缩合并上传到hdfs的命令,为了方便我们同样只能自己搞生产了。
以下代码的主要作用是将HDFS内的zip格式文件转换成gzip格式
目的:hive在进行外部表数据读取的时候不能读取zip格式文件,能读取文本文件
先前的作法是:先从HDFS中把zip文件取到本地中,再将zip文件在本地中解压出来,通过加载数据的形式将本地的文件回到到HDFS中
其弊端主要有:1、可操作性弱,要来回折腾HDFS与本地上的文件 2、zip文件解压出来加载回HDFS后占用的空间较大
通过正面这种形式能够解决以上两个问题
import java.io.File; import java.io.IOException; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; public class filesmerge { //判断是否递归执行 public static boolean isRecur = false; /** * @author JueFan * @param inputDir zip文件的存储地址 * @param hdfsFile 解压结果的存储地址 * @param hdfs 分布式文件系统数据流 * @param pcgroupText 需要解压缩的文件关键名 */ public static void merge(Path inputDir, Path hdfsFile, FileSystem hdfs, Text pcgroupText) { try { //文件系统地址inputDir下的FileStatus FileStatus[] inputFiles = hdfs.listStatus(inputDir); for (int i = 0; i < inputFiles.length; i++) { if (!hdfs.isFile(inputFiles[i].getPath())) { if (isRecur){ merge(inputFiles[i].getPath(), hdfsFile, hdfs,pcgroupText); return ; } else { System.out.println(inputFiles[i].getPath().getName() + "is not file and not allow recursion, skip!"); continue; } } //判断文件名是否在需要解压缩的关键名内 if(inputFiles[i].getPath().getName().contains(pcgroupText.toString()) == true){ //输出待解压的文件名 System.out.println(inputFiles[i].getPath().getName()); //将数据流指向待解压文件 FSDataInputStream in = hdfs.open(inputFiles[i].getPath()); /** *数据的解压执行过程 */ ZipInputStream zipInputStream = null; try{ zipInputStream = new ZipInputStream(in); ZipEntry entry; //解压后有多个文件一并解压出来并实现合并 //合并后的地址 FSDataOutputStream mergerout = hdfs.create(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")))); while((entry = zipInputStream.getNextEntry()) != null){ byte[] buffer1 = new byte[2048]; int nNumber; while((nNumber = zipInputStream.read(buffer1, 0, buffer1.length)) != -1) mergerout.write(buffer1, 0, nNumber); } mergerout.flush(); mergerout.close(); zipInputStream.close(); }catch(IOException e){ continue; } in.close(); /** *将解压合并后的数据压缩成gzip格式 */ GZIPOutputStream gzipOutputStream = null; try{ FSDataOutputStream outputStream = null; outputStream = hdfs.create(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")) + ".gz")); FSDataInputStream inputStream = null; gzipOutputStream = new GZIPOutputStream(outputStream); inputStream = hdfs.open(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")))); byte[] buffer = new byte[1024]; int len; while((len = inputStream.read(buffer)) > 0){ gzipOutputStream.write(buffer, 0, len); } inputStream.close(); gzipOutputStream.finish(); gzipOutputStream.flush(); outputStream.close(); }catch (Exception exception){ exception.printStackTrace(); } gzipOutputStream.close(); //删除zip文件解压合并后的临时文件 String tempfiles = hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")); try{ if(hdfs.exists(new Path(tempfiles))){ hdfs.delete(new Path(tempfiles), true); } }catch(IOException ie){ ie.printStackTrace(); } } } }catch (IOException e) { e.printStackTrace(); } } public static void errorMessage(String str) { System.out.println("Error Message: " + str); System.exit(1); } @SuppressWarnings("null") public static void main(String[] args) throws IOException { if (args.length == 0) errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); if (args[0].matches("^-[rR]$")) { isRecur = true; } if ((isRecur && args.length != 4) || ( !isRecur && args.length != 3)) { errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); } Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); Path inputDir; Path hdfsFile; Text pcgroupText; if(isRecur){ inputDir = new Path(args[1]); hdfsFile = new Path(args[2]); pcgroupText = new Text(args[3]); } else{ inputDir = new Path(args[0]); hdfsFile = new Path(args[1]); pcgroupText = new Text(args[2]); } if (!hdfs.exists(inputDir)) { errorMessage("hdfsTargetDir not exist!"); } if (hdfs.exists(hdfsFile)) { errorMessage("hdfsFileName exist!"); } merge(inputDir, hdfsFile, hdfs, pcgroupText); System.exit(0); } }
具体的执行方式如下:
[root@JueFan pconline]# hadoop jar zip_to_gzip.jar filesmerge /zip/(待转换文件路径,在HDFS上) /user/juefan/pconline/(转换完成后的文件存储地址,也在HDFS上) pconline(待转换的文件名包含的字符)
如果要实现递归的话,可以在filesmerge后面加上 -r
本文的前三个要点转载自三江小渡的博客:http://blog.pureisle.net/archives/1701.html
该博客的第四个要点没有写出具体实现,本人就帮忙完善了
相关文章推荐
- java/python中获取当前系统时间,并与字符串相互转换格式,或者转化成秒数,天数等整数
- 不用格式化转换文件系统格式
- Mahout分布式运行实例:基于矩阵分解的协同过滤评分系统(一个命令实现文件格式的转换)
- [开发总结]系统架构及数据模型----AutoDesk文件格式转换篇(五)
- 用openoffice 在windows和linux系统中 转换文件格式 注意地方!
- Java实现文件压缩与解压[zip格式,gzip格式]
- Java实现文件压缩与解压[zip格式,gzip格式]
- Spark读取HDFS文件,文件格式为GB2312,转换为UTF-8
- Java实现文件压缩与解压[zip格式,gzip格式]
- 如何把FAT32格式的文件系统转换成NTFS?
- linux下转换windows文件格式为unix sed -i 's/\r//' <filename> 转化为unix格式
- Java实现文件压缩与解压[zip格式,gzip格式]
- linux系统下文件编码格式的转换
- 数据分析:基于Python的自定义文件格式转换系统
- Java实现文件压缩与解压[zip格式,gzip格式]
- Java实现文件压缩与解压[zip格式,gzip格式]
- Java实现文件压缩与解压[zip格式,gzip格式]
- Java实现文件压缩与解压的示例[zip格式,gzip格式]
- 使用ffmpeg转换文件格式,及ffmpeg参数说明
- hadoop之hdfs文件系统