您的位置:首页 > 其它

Spark 导出 TXT 、CSV 文件 小试牛刀

2018-01-19 18:47 295 查看
/**
 注意1: Spark saveAsTextFile 保存出来的是一个文件夹,所以才有以下获取文件夹里面的文件并整合成一个文件的操作;
注意2: 在没有main函数的情况下  实现类一定要 implements Serializable,不然会报错不能序列化
*/

import org.apache.spark.api.java.JavaRDD;
import pass.common.response.Response;
import pass.computation.action.response.ActionResponse;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

public class SaveFileAction {

/**
* @param file
* @return 成功返回:以part开头的文件列表
*/
public static List<String> getFileList(File file) {

List<String> result = new ArrayList<String>();

if (!file.isDirectory()) {
System.out.println(file.getAbsolutePath());
result.add(file.getAbsolutePath());
} else {
File[] directoryList = file.listFiles(new FileFilter() {
public boolean accept(File file) {
if (file.isFile() && file.getName().indexOf("part") == 0) {
return true;
} else {
return false;
}
}
});
for (int i = 0; i < directoryList.length; i++) {
result.add(directoryList[i].getPath());
}
}

return result;
}

/**
* 删除单个文件
*
* @param sPath 被删除文件path
* @return 删除成功返回true,否则返回false
*/
public boolean deleteFile(String sPath) {
boolean flag = false;
File file = new File(sPath);
// 路径为文件且不为空则进行删除
if (file.isFile() && file.exists()) {
file.delete();
flag = true;
}
return flag;
}

public boolean deleteDirectory(String sPath) {
//如果sPath不以文件分隔符结尾,自动添加文件分隔符
if (!sPath.endsWith(File.separator)) {
sPath = sPath + File.separator;
}
File dirFile = new File(sPath);
//如果dir对应的文件不存在,或者不是一个目录,则退出
if (!dirFile.exists() || !dirFile.isDirectory()) {
return false;
}
boolean flag = true;
//删除文件夹下的所有文件(包括子目录)
File[] files = dirFile.listFiles();
for (int i = 0; i < files.length; i++) {
//删除子文件
if (files[i].isFile()) {
flag = deleteFile(files[i].getAbsolutePath());
if (!flag) break;
} //删除子目录
else {
flag = deleteDirectory(files[i].getAbsolutePath());
if (!flag) break;
}
}
if (!flag) return false;
//删除当前目录
if (dirFile.delete()) {
return true;
} else {
return false;
}
}

/**
* @param saveRDD
* @param fileType "TXT"、"CSV"
* @param filePath
* @return
* @throws IOException
*/
public ActionResponse saveFile(JavaRDD saveRDD, String fileType, String filePath) {
ActionResponse response = new ActionResponse();
response.setTaskStatus(Response.FAILURE);
if (null == saveRDD) {
response.setErrorMsg("分布式内存数据集不能为空!");
return response;
}
if (null == fileType || fileType.length() == 0) {
response.setErrorMsg("文件格式不能为空!");
return response;
}
if (null == filePath || filePath.length() == 0) {
response.setErrorMsg("文件不能为空!");
return response;
}

//写入hdfs
if (filePath.contains("hdfs://")) {
saveRDD.saveAsTextFile(filePath);
response.setTaskStatus(Response.SUCCESS);
response.setErrorMsg(null);
return response;
} else {
if (new File(filePath).exists()) {
deleteDirectory(filePath);
if (deleteDirectory(filePath)) {
System.out.println("删除已经存在的文件夹!" + filePath);
} else {
System.out.println("删除不了!!!" + filePath);
}
}

saveRDD.saveAsTextFile(filePath);
// 获取以part开头的文件内容
File f = new File(filePath);
List<String> list2 = new ArrayList<String>();
list2 = getFileList(f);
String str = "";
try {
for (String l : list2) {
BufferedReader br = new BufferedReader(new FileReader(new File(l)));
String s = "";
while ((s = br.readLine()) != null) {
str += s + "\r\n"; //加"\r\n" 写入下面的文件的时候才能换行
}
br.close();
}
String outPath = filePath + "." + fileType.toLowerCase();

if (new File(outPath).exists()) {
deleteFile(outPath);
if (deleteFile(outPath)) {
System.out.println("删除已经存在的文件!");
} else {
System.out.println("删除不了!!!" + outPath);
}
}
//写入 outPath
File fileText = new File(outPath);
FileWriter fileWriter = new FileWriter(fileText);
fileWriter.write(str);
fileWriter.close();
} catch (IOException e) {
response.setTaskStatus(Response.FAILURE);
response.setErrorMsg("写入文件失败");
return response;
}
response.setTaskStatus(Response.SUCCESS);
response.setErrorMsg(null);
return response;
}

}
}

 public class SaveAsFileTest implements Serializable{

@Test
public void saveTXTFile() throws IOException {
SparkConf conf = new SparkConf()
.setAppName("map")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List list1 = Arrays.asList("1","2","3","a","4","c");

String fileType1 = "TXT";

String filePath1 = "C:\\Users\\Administrator\\Desktop\\t2";
String filePath2 = "hdfs://master:9000/hbase/test";
SaveFileAction saveFileAction = new SaveFileAction();

JavaRDD saveRDD1 =
9650
sc.parallelize(list1);
Response response = saveFileAction.saveFile(saveRDD1,fileType1,filePath1);

System.out.println("message: "+response.getErrorMsg());

sc.close();

}

@Test
public void saveCSVFile() throws IOException {
SparkConf conf = new SparkConf()
.setAppName("map")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

String fileType2 = "CSV";
String filePath1 = "C:\\Users\\Administrator\\Desktop\\t2";
String filePath2 = "hdfs://master:9000/hbase/test";
SaveFileAction saveFileAction = new SaveFileAction();

JavaRDD saveRDD1 = sc.parallelize(list1);
JavaRDD saveRDD2 = sc.textFile( "C:\\Users\\Administrator\\Desktop\\im.csv");
Response response = saveFileAction.saveFile(saveRDD2,fileType2,filePath1);
System.out.println("message: "+response.getErrorMsg());

sc.close();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: