scala和java解压zip的文件,并上传到hdfs服务器
2015-09-17 16:58
686 查看
之前一直使用的事java开发的,后来学习hadoop,spark以后,需要做一个解压文件到服务器的例子,由于学习scala时间不长,故先用java代码写出来,在改成scala的代码,过程中出现了很多问题,我将会在最后说明,请大家注意不要少了包。
环境声明:windows8.1,eclipse,idea14,hadoop2.6,scala2.11,请自行配置好hadoop的环境。
java代码如下:
scala代码 如下:
在这里主要说scala代码编写过程中出现的问题:
scala可以自动解析对象的类型,所以不需要我们自己声明对象的类型,一般都用val来声明,除非是一定要改变的值,就用var,必须初始化。
在scala中一开始这句话是报错的,提示不能解析,也就是找不到entries找不到nextElement 这个方法。
我们需要做如下的装换:
上面这个原因是因为我使用了一个ant1.8.jar的jar包,最后发现是这个包的原因,在scala下运行还有警告。用ant.jar替换ant1.8.jar包,就可以了。
在声明对象的时候,对象是必须初始化的,如果可以为null,但是有时候你必须声明它的类型,可以如下声明,加上它的类型即可。
往hdfs写文件时可以用方法即可
环境声明:windows8.1,eclipse,idea14,hadoop2.6,scala2.11,请自行配置好hadoop的环境。
java代码如下:
<span style="font-size:12px;">public class TestFile { public static void main(String[] args) { String zipDir = "G:/zip"; File zipFile = new File(zipDir); File[] zipFiles = zipFile.listFiles(); for(File zip:zipFiles) { if(zip.getName().endsWith(".zip")) { System.out.println("name is "+zip.getName()+" , path = "+zip.getAbsolutePath()); //解压zip压缩文件 boolean result = unZipFile(zip,zipDir); if(result) { String filedir = zip.getName().substring(0, zip.getName().indexOf(".")); String localDir = zipDir+filedir; System.out.println("文件f.getName()解压成功 filedir = "+filedir); //上传文件到HDFS String cloudDir = "hdfs://10.132.10.235:9000/zcd/"; upload(localDir,cloudDir); //删除本地目录 File fileDir = new File(localDir); deleteFile(fileDir); fileDir.delete(); zip.delete(); } else { System.out.println("文件f.getName()解压失败"); } } } } private static boolean unZipFile(File zipFile, String descDir) { boolean flag = true; try { File pathFile = new File(descDir); if(!pathFile.exists()){ pathFile.mkdirs(); } ZipFile zip = new ZipFile(zipFile); for(Enumeration entries = zip.getEntries();entries.hasMoreElements();){ ZipEntry entry = (ZipEntry)entries.nextElement(); String zipEntryName = entry.getName(); InputStream in = zip.getInputStream(entry); String outPath = (descDir+zipEntryName).replaceAll("\\*", "/");; //判断路径是否存在,不存在则创建文件路径 File file = new File(outPath.substring(0, outPath.lastIndexOf('/'))); if(!file.exists()){ file.mkdirs(); } //判断文件全路径是否为文件夹,如果是上面已经上传,不需要解压 if(new File(outPath).isDirectory()){ continue; } //输出文件路径信息 System.out.println(outPath); OutputStream out = new FileOutputStream(outPath); byte[] buf1 = new byte[1024]; int len; while((len=in.read(buf1))>0){ out.write(buf1,0,len); } in.close(); out.close(); } } catch (ZipException e) { flag = false; e.printStackTrace(); } catch (FileNotFoundException e) { flag = false; e.printStackTrace(); } catch (IOException e) { flag = false; e.printStackTrace(); } System.out.println("******************解压完毕********************"); return flag; } private static void upload(String localDir,String cloudDir) { try { System.out.println("hadoop上传文件开始..."); // 获取一个conf对象 Configuration conf = new Configuration(); File fileDir = new File(localDir); File[] files = fileDir.listFiles(); InputStream in = null; FileSystem fs = null; OutputStream out = null; for(File file:files) { // 本地文件存取的位置 String LOCAL_SRC = file.getAbsolutePath(); // 存放到云端HDFS的位置 String CLOUD_DEST = cloudDir+file.getName(); in = new BufferedInputStream(new FileInputStream(LOCAL_SRC)); // 文件系统 fs = FileSystem.get(URI.create(CLOUD_DEST), conf); // 输出流 out = fs.create(new Path(CLOUD_DEST)); // 连接两个流,形成通道,使输入流向输出流传输数据 IOUtils.copyBytes(in, out, 1024, true); } in.close(); fs.close(); out.close(); System.out.println("hadoop上传文件结束..."); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void deleteFile(File file) { if(file.exists()) { if(file.isFile()) { file.delete(); } else if(file.isDirectory()) { File files[] = file.listFiles(); for(int i=0;i<files.length;i++) { deleteFile(files[i]); } } } } }</span>
scala代码 如下:
object zip { def main(args: Array[String]) { val zipDir = "G:/zip" val zipFile = new File(zipDir) val zipFiles = zipFile.listFiles() for(zip <- zipFiles) { if(zip.getName.endsWith(".zip")) { println("name is "+zip.getName+" , path = "+zip.getAbsolutePath) //解压zip压缩文件 val result = unZipFile(zip,zipDir) if(result) { val filedir = zip.getName.substring(0, zip.getName.indexOf(".")) val localDir = zipDir+filedir println("文件f.getName()解压成功 filedir = = "+filedir) //上传文件到HDFS val cloudDir = "hdfs://10.132.10.235:9000/zcd/" upload(localDir,cloudDir); //删除本地目录 val fileDir = new File(localDir) deleteFile(fileDir) fileDir.delete() zip.delete() } else{ println("文件f.getName()解压失败") } } } } /** * 解压 * @param zipFile * @param descDir * @return */ def unZipFile(zipFile:File , descDir:String ):Boolean ={ var flag = true try { val pathFile:File = new File(descDir) if (!pathFile.exists) { pathFile.mkdirs } val zip = new ZipFile(zipFile) val entries = zip.getEntries while (entries.hasMoreElements){ // val entry = entries.nextElement.asInstanceOf[ZipFile] val entry = entries.nextElement val zipEntryName = entry.getName val in = zip.getInputStream(entry) val outPath = (descDir + zipEntryName).replaceAll("\\*", "/") //判断路径是否存在,不存在则创建文件路径 val file = new File(outPath.substring(0, outPath.lastIndexOf('/'))) if (!file.exists) { file.mkdirs } //判断文件全路径是否为文件夹,如果是上面已经上传,不需要解压 if (!new File(outPath).isDirectory()) { //输出文件路径信息 println(outPath) val out = new FileOutputStream(outPath) //val writer = new PrintWriter(new File(outPath)) val buf1 = Array[Byte]() while((in.read(buf1))>0){ in.read out.write(buf1) } // out.close } in.close } }catch{ case e: ZipException => flag = false case e: FileNotFoundException => flag = false case e: IOException => flag = false } println("******************解压完毕********************") flag } /** * 上传 * @param localDir * @param cloudDir */ def upload(localDir:String,cloudDir:String): Unit ={ try { println("hadoop上传文件开始..."); // 获取一个conf对象 val conf = new Configuration(); val fileDir = new File(localDir); val files = fileDir.listFiles(); var in:InputStream = null var fs:FileSystem=null var out:OutputStream=null for(file <- files){ // 本地文件存取的位置 val LOCAL_SRC = file.getAbsolutePath // 存放到云端HDFS的位置 val CLOUD_DEST = cloudDir+file.getName in = new BufferedInputStream(new FileInputStream(LOCAL_SRC)) // 文件系统 fs = FileSystem.get(URI.create(CLOUD_DEST), conf) // 输出流 out = fs.create(new Path(CLOUD_DEST)) // 连接两个流,形成通道,使输入流向输出流传输数据 IOUtils.copyBytes(in, out, 1024, true) } in.close fs.close out.close println("hadoop上传文件结束..."); } catch { case e: FileNotFoundException => e.printStackTrace case e: IllegalArgumentException => e.printStackTrace case e: IOException => e.printStackTrace } } /** * 删除 * @param file */ def deleteFile(file:File ):Unit={ if(file.exists()){ if(file.isFile()){ file.delete() }else if(file.isDirectory()){ val files = file.listFiles() for(file <- files){ deleteFile(file) } } } } }
在这里主要说scala代码编写过程中出现的问题:
scala可以自动解析对象的类型,所以不需要我们自己声明对象的类型,一般都用val来声明,除非是一定要改变的值,就用var,必须初始化。
在scala中一开始这句话是报错的,提示不能解析,也就是找不到entries找不到nextElement 这个方法。
val entry = entries.nextElement
我们需要做如下的装换:
val entry = entries.nextElement.asInstanceOf[ZipFile]
上面这个原因是因为我使用了一个ant1.8.jar的jar包,最后发现是这个包的原因,在scala下运行还有警告。用ant.jar替换ant1.8.jar包,就可以了。
在声明对象的时候,对象是必须初始化的,如果可以为null,但是有时候你必须声明它的类型,可以如下声明,加上它的类型即可。
var in:InputStream = null
往hdfs写文件时可以用方法即可
// 本地文件存取的位置 val LOCAL_SRC = file.getAbsolutePath // 存放到云端HDFS的位置 val CLOUD_DEST = cloudDir+file.getName in = new BufferedInputStream(new FileInputStream(LOCAL_SRC)) // 文件系统 fs = FileSystem.get(URI.create(CLOUD_DEST), conf) // 输出流 out = fs.create(new Path(CLOUD_DEST)) // 连接两个流,形成通道,使输入流向输出流传输数据 IOUtils.copyBytes(in, out, 1024, true)
相关文章推荐
- list请求参数作为springmvc方法参数
- MyEclipse Servers视窗出现“Could not create the view: An unexpected exception was thrown”错误解决办法
- eclipse配置maven环境简记
- java 中 log4j 的概念、作用以及如何使用
- java 回调函数的写法
- Java字符串转换为ASCII
- 配置SDK时遇到的一系列问题
- Java中-----HTML(网页)的设计
- 五年磨一剑:Java 开源博客 Solo 1.0.0 发布了!
- 五年磨一剑:Java 开源博客 Solo 1.0.0 发布了!
- 五年磨一剑:Java 开源博客 Solo 1.0.0 发布了!
- 五年磨一剑:Java 开源博客 Solo 1.0.0 发布了!
- java解惑之try catch finally
- Struts 2 creating own interceptor
- 【译】StackOverflow——Java 中的 finally 代码块是否总会被执行?
- Java多线程中start()和run()的区别
- 使用myeclipse web browser 中文显示乱码
- java反射介绍
- java web项目读取文件
- java基础学习--最近几天看韩顺平资料学习心得(二)