您的位置:首页 > 编程语言 > Java开发

scala和java解压zip的文件,并上传到hdfs服务器

2015-09-17 16:58 686 查看
之前一直使用的事java开发的,后来学习hadoop,spark以后,需要做一个解压文件到服务器的例子,由于学习scala时间不长,故先用java代码写出来,在改成scala的代码,过程中出现了很多问题,我将会在最后说明,请大家注意不要少了包。

环境声明:windows8.1,eclipse,idea14,hadoop2.6,scala2.11,请自行配置好hadoop的环境。

java代码如下:

<span style="font-size:12px;">public class TestFile {
public static void main(String[] args) {
String zipDir = "G:/zip";
File zipFile = new File(zipDir);
File[] zipFiles = zipFile.listFiles();
for(File zip:zipFiles)
{
if(zip.getName().endsWith(".zip"))
{
System.out.println("name is "+zip.getName()+" , path = "+zip.getAbsolutePath());
//解压zip压缩文件
boolean result = unZipFile(zip,zipDir);
if(result)
{
String filedir = zip.getName().substring(0, zip.getName().indexOf("."));
String localDir = zipDir+filedir;
System.out.println("文件f.getName()解压成功 filedir = "+filedir);
//上传文件到HDFS
String cloudDir = "hdfs://10.132.10.235:9000/zcd/";
upload(localDir,cloudDir);
//删除本地目录
File fileDir = new File(localDir);
deleteFile(fileDir);
fileDir.delete();
zip.delete();

}
else
{
System.out.println("文件f.getName()解压失败");
}
}
}
}

private static boolean unZipFile(File zipFile, String descDir)
{
boolean flag = true;

try {
File pathFile = new File(descDir);
if(!pathFile.exists()){
pathFile.mkdirs();
}
ZipFile zip = new ZipFile(zipFile);
for(Enumeration entries = zip.getEntries();entries.hasMoreElements();){
ZipEntry entry = (ZipEntry)entries.nextElement();
String zipEntryName = entry.getName();
InputStream in = zip.getInputStream(entry);
String outPath = (descDir+zipEntryName).replaceAll("\\*", "/");;
//判断路径是否存在,不存在则创建文件路径
File file = new File(outPath.substring(0, outPath.lastIndexOf('/')));
if(!file.exists()){
file.mkdirs();
}
//判断文件全路径是否为文件夹,如果是上面已经上传,不需要解压
if(new File(outPath).isDirectory()){
continue;
}
//输出文件路径信息
System.out.println(outPath);

OutputStream out = new FileOutputStream(outPath);
byte[] buf1 = new byte[1024];
int len;
while((len=in.read(buf1))>0){
out.write(buf1,0,len);
}
in.close();
out.close();
}
} catch (ZipException e) {
flag = false;
e.printStackTrace();
} catch (FileNotFoundException e) {
flag = false;
e.printStackTrace();
} catch (IOException e) {
flag = false;
e.printStackTrace();
}
System.out.println("******************解压完毕********************");
return flag;
}

private static void upload(String localDir,String cloudDir)
{
try {
System.out.println("hadoop上传文件开始...");
// 获取一个conf对象
Configuration conf = new Configuration();
File fileDir = new File(localDir);
File[] files = fileDir.listFiles();
InputStream in = null;
FileSystem fs = null;
OutputStream out = null;
for(File file:files)
{
// 本地文件存取的位置
String LOCAL_SRC = file.getAbsolutePath();
// 存放到云端HDFS的位置
String CLOUD_DEST = cloudDir+file.getName();
in = new BufferedInputStream(new FileInputStream(LOCAL_SRC));
// 文件系统
fs = FileSystem.get(URI.create(CLOUD_DEST), conf);
// 输出流
out = fs.create(new Path(CLOUD_DEST));
// 连接两个流,形成通道,使输入流向输出流传输数据
IOUtils.copyBytes(in, out, 1024, true);
}
in.close();
fs.close();
out.close();
System.out.println("hadoop上传文件结束...");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

private static void deleteFile(File file)
{
if(file.exists())
{
if(file.isFile())
{
file.delete();
}
else if(file.isDirectory())
{
File files[] = file.listFiles();
for(int i=0;i<files.length;i++)
{
deleteFile(files[i]);
}
}
}
}
}</span>


scala代码 如下:

object zip {
def main(args: Array[String]) {
val zipDir = "G:/zip"
val zipFile = new File(zipDir)
val zipFiles = zipFile.listFiles()
for(zip <- zipFiles)
{
if(zip.getName.endsWith(".zip"))
{
println("name is "+zip.getName+" , path = "+zip.getAbsolutePath)
//解压zip压缩文件
val result = unZipFile(zip,zipDir)
if(result)
{
val filedir = zip.getName.substring(0, zip.getName.indexOf("."))
val localDir = zipDir+filedir
println("文件f.getName()解压成功 filedir = = "+filedir)
//上传文件到HDFS
val cloudDir = "hdfs://10.132.10.235:9000/zcd/"
upload(localDir,cloudDir);
//删除本地目录
val fileDir = new File(localDir)
deleteFile(fileDir)
fileDir.delete()
zip.delete()
}
else{
println("文件f.getName()解压失败")
}
}
}
}

/**
* 解压
* @param zipFile
* @param descDir
* @return
*/
def unZipFile(zipFile:File , descDir:String ):Boolean ={
var flag = true
    try {
val pathFile:File = new File(descDir)
if (!pathFile.exists) {
pathFile.mkdirs
}
val zip = new ZipFile(zipFile)
val entries = zip.getEntries
while (entries.hasMoreElements){
// val entry = entries.nextElement.asInstanceOf[ZipFile]
val entry = entries.nextElement
val zipEntryName = entry.getName
val in = zip.getInputStream(entry)
val outPath = (descDir + zipEntryName).replaceAll("\\*", "/")
//判断路径是否存在,不存在则创建文件路径
val file = new File(outPath.substring(0, outPath.lastIndexOf('/')))
if (!file.exists) {
file.mkdirs
}
//判断文件全路径是否为文件夹,如果是上面已经上传,不需要解压
if (!new File(outPath).isDirectory()) {
//输出文件路径信息
println(outPath)
val out = new FileOutputStream(outPath)
//val writer = new PrintWriter(new File(outPath))
val buf1 = Array[Byte]()
while((in.read(buf1))>0){
in.read
out.write(buf1)
}
// out.close
}
in.close
}
}catch{
case e: ZipException =>
flag = false
      case e: FileNotFoundException =>
flag = false
      case e: IOException =>
flag = false
}
println("******************解压完毕********************")
flag
}

/**
* 上传
* @param localDir
* @param cloudDir
*/
def upload(localDir:String,cloudDir:String): Unit ={
try {
println("hadoop上传文件开始...");
// 获取一个conf对象
val conf = new Configuration();
val fileDir = new File(localDir);
val files = fileDir.listFiles();
var in:InputStream = null
      var fs:FileSystem=null
      var out:OutputStream=null
      for(file <- files){
// 本地文件存取的位置
val LOCAL_SRC = file.getAbsolutePath
// 存放到云端HDFS的位置
val CLOUD_DEST = cloudDir+file.getName
in = new BufferedInputStream(new FileInputStream(LOCAL_SRC))
// 文件系统
fs = FileSystem.get(URI.create(CLOUD_DEST), conf)
// 输出流
out = fs.create(new Path(CLOUD_DEST))
// 连接两个流,形成通道,使输入流向输出流传输数据
IOUtils.copyBytes(in, out, 1024, true)
}
in.close
fs.close
out.close
println("hadoop上传文件结束...");
} catch {
case e: FileNotFoundException =>
e.printStackTrace
case e: IllegalArgumentException =>
e.printStackTrace
case e: IOException =>
e.printStackTrace
}
}
/**
* 删除
* @param file
*/
def deleteFile(file:File ):Unit={
if(file.exists()){
if(file.isFile()){
file.delete()
}else if(file.isDirectory()){
val files = file.listFiles()
for(file <- files){
deleteFile(file)
}
}
}
}
}


在这里主要说scala代码编写过程中出现的问题:

scala可以自动解析对象的类型,所以不需要我们自己声明对象的类型,一般都用val来声明,除非是一定要改变的值,就用var,必须初始化。

在scala中一开始这句话是报错的,提示不能解析,也就是找不到entries找不到nextElement 这个方法。

val entry = entries.nextElement

我们需要做如下的装换:

val entry = entries.nextElement.asInstanceOf[ZipFile]

上面这个原因是因为我使用了一个ant1.8.jar的jar包,最后发现是这个包的原因,在scala下运行还有警告。用ant.jar替换ant1.8.jar包,就可以了。

在声明对象的时候,对象是必须初始化的,如果可以为null,但是有时候你必须声明它的类型,可以如下声明,加上它的类型即可。

var in:InputStream = null


往hdfs写文件时可以用方法即可

// 本地文件存取的位置
val LOCAL_SRC = file.getAbsolutePath
// 存放到云端HDFS的位置
val CLOUD_DEST = cloudDir+file.getName
in = new BufferedInputStream(new FileInputStream(LOCAL_SRC))
// 文件系统
fs = FileSystem.get(URI.create(CLOUD_DEST), conf)
// 输出流
out = fs.create(new Path(CLOUD_DEST))
// 连接两个流,形成通道,使输入流向输出流传输数据
IOUtils.copyBytes(in, out, 1024, true)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: