Spark 导出 TXT 、CSV 文件 小试牛刀
2018-01-19 18:47
295 查看
/** 注意1: Spark saveAsTextFile 保存出来的是一个文件夹,所以才有以下获取文件夹里面的文件并整合成一个文件的操作; 注意2: 在没有main函数的情况下 实现类一定要 implements Serializable,不然会报错不能序列化 */ import org.apache.spark.api.java.JavaRDD; import pass.common.response.Response; import pass.computation.action.response.ActionResponse; import java.io.*; import java.util.ArrayList; import java.util.List; public class SaveFileAction { /** * @param file * @return 成功返回:以part开头的文件列表 */ public static List<String> getFileList(File file) { List<String> result = new ArrayList<String>(); if (!file.isDirectory()) { System.out.println(file.getAbsolutePath()); result.add(file.getAbsolutePath()); } else { File[] directoryList = file.listFiles(new FileFilter() { public boolean accept(File file) { if (file.isFile() && file.getName().indexOf("part") == 0) { return true; } else { return false; } } }); for (int i = 0; i < directoryList.length; i++) { result.add(directoryList[i].getPath()); } } return result; } /** * 删除单个文件 * * @param sPath 被删除文件path * @return 删除成功返回true,否则返回false */ public boolean deleteFile(String sPath) { boolean flag = false; File file = new File(sPath); // 路径为文件且不为空则进行删除 if (file.isFile() && file.exists()) { file.delete(); flag = true; } return flag; } public boolean deleteDirectory(String sPath) { //如果sPath不以文件分隔符结尾,自动添加文件分隔符 if (!sPath.endsWith(File.separator)) { sPath = sPath + File.separator; } File dirFile = new File(sPath); //如果dir对应的文件不存在,或者不是一个目录,则退出 if (!dirFile.exists() || !dirFile.isDirectory()) { return false; } boolean flag = true; //删除文件夹下的所有文件(包括子目录) File[] files = dirFile.listFiles(); for (int i = 0; i < files.length; i++) { //删除子文件 if (files[i].isFile()) { flag = deleteFile(files[i].getAbsolutePath()); if (!flag) break; } //删除子目录 else { flag = deleteDirectory(files[i].getAbsolutePath()); if (!flag) break; } } if (!flag) return false; //删除当前目录 if (dirFile.delete()) { return true; } else { return false; } } /** * @param saveRDD * @param fileType "TXT"、"CSV" * @param filePath * @return * @throws IOException */ public ActionResponse saveFile(JavaRDD saveRDD, String fileType, String filePath) { ActionResponse response = new ActionResponse(); response.setTaskStatus(Response.FAILURE); if (null == saveRDD) { response.setErrorMsg("分布式内存数据集不能为空!"); return response; } if (null == fileType || fileType.length() == 0) { response.setErrorMsg("文件格式不能为空!"); return response; } if (null == filePath || filePath.length() == 0) { response.setErrorMsg("文件不能为空!"); return response; } //写入hdfs if (filePath.contains("hdfs://")) { saveRDD.saveAsTextFile(filePath); response.setTaskStatus(Response.SUCCESS); response.setErrorMsg(null); return response; } else { if (new File(filePath).exists()) { deleteDirectory(filePath); if (deleteDirectory(filePath)) { System.out.println("删除已经存在的文件夹!" + filePath); } else { System.out.println("删除不了!!!" + filePath); } } saveRDD.saveAsTextFile(filePath); // 获取以part开头的文件内容 File f = new File(filePath); List<String> list2 = new ArrayList<String>(); list2 = getFileList(f); String str = ""; try { for (String l : list2) { BufferedReader br = new BufferedReader(new FileReader(new File(l))); String s = ""; while ((s = br.readLine()) != null) { str += s + "\r\n"; //加"\r\n" 写入下面的文件的时候才能换行 } br.close(); } String outPath = filePath + "." + fileType.toLowerCase(); if (new File(outPath).exists()) { deleteFile(outPath); if (deleteFile(outPath)) { System.out.println("删除已经存在的文件!"); } else { System.out.println("删除不了!!!" + outPath); } } //写入 outPath File fileText = new File(outPath); FileWriter fileWriter = new FileWriter(fileText); fileWriter.write(str); fileWriter.close(); } catch (IOException e) { response.setTaskStatus(Response.FAILURE); response.setErrorMsg("写入文件失败"); return response; } response.setTaskStatus(Response.SUCCESS); response.setErrorMsg(null); return response; } } } public class SaveAsFileTest implements Serializable{ @Test public void saveTXTFile() throws IOException { SparkConf conf = new SparkConf() .setAppName("map") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List list1 = Arrays.asList("1","2","3","a","4","c"); String fileType1 = "TXT"; String filePath1 = "C:\\Users\\Administrator\\Desktop\\t2"; String filePath2 = "hdfs://master:9000/hbase/test"; SaveFileAction saveFileAction = new SaveFileAction(); JavaRDD saveRDD1 = 9650 sc.parallelize(list1); Response response = saveFileAction.saveFile(saveRDD1,fileType1,filePath1); System.out.println("message: "+response.getErrorMsg()); sc.close(); } @Test public void saveCSVFile() throws IOException { SparkConf conf = new SparkConf() .setAppName("map") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String fileType2 = "CSV"; String filePath1 = "C:\\Users\\Administrator\\Desktop\\t2"; String filePath2 = "hdfs://master:9000/hbase/test"; SaveFileAction saveFileAction = new SaveFileAction(); JavaRDD saveRDD1 = sc.parallelize(list1); JavaRDD saveRDD2 = sc.textFile( "C:\\Users\\Administrator\\Desktop\\im.csv"); Response response = saveFileAction.saveFile(saveRDD2,fileType2,filePath1); System.out.println("message: "+response.getErrorMsg()); sc.close(); }
}
相关文章推荐
- 利用pig导出hbase文件为txt或者csv格式
- c#实现将Excel文件导出为csv和UTF8格式的txt文件
- 批量改名软件----Win7xfolder 的另一特色是可以批量导出文件的名字及其文件大小到 txt/CSV 文件,很方便
- Python处理unicode编码的txt文件(Python中文处理)——解决to_excel()和to_csv()导出文件内容为空的问题
- (qsf文件 、 tcl文件 和 csv(txt)文件的区别) FPGA管脚分配文件保存、导入导出方法
- 导出CSV格式文件,用Excel打开乱码的解决办法
- [转]PL/SQL Developer 导入导出csv文件
- 数据写入到csv文件或者通过浏览器导出到下载文件
- 应用Java泛型和反射导出CSV文件的方法
- MySql csv文件导入导出
- PL/SQL Developer 导入导出csv文件
- oracle数据库PLSQL导出csv文件,excel打开解决科学计数法问题
- nodejs利用csv导出文件
- PHP导入导出CSV文件
- oracle导出多CSV文件的靠谱的
- Spark Q&A : Spark利用databricks读取CSV文件报错 CSVFormat NoClassDefFoundError
- PHP导出一个txt文本文件
- mysql SQLyog导入导出csv文件,解决编码问题
- C# 导出csv文件
- 页面数据导出CSV格式文件