Java 编程读写 Hadoop Sequence 类型文件
2014-01-16 14:16
627 查看
Reading & Writing Hadoop Sequence Files in Java
原文链接:http://noushinb.blogspot.jp/2013/04/reading-writing-hadoop-sequence-files.html
In this blog I'll show you how to write a simple hadoop client application in Java. This app will handle reading, writing and copying Hadoop Sequence Files on local or remote Hadoop file systems (HDFS).
1. HadoopClient.java
package com.noushin.hadoop.client; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; /** * This class handles interactions with Hadoop. * * @author nbashir * */ @Component public class HadoopClient { private static Configuration conf = new Configuration(); private final static Logger logger = Logger.getLogger(HadoopClient.class); /** * Convert the lines of text in a file to binary and write to a Hadoop * sequence file. * * @param dataFile File containing lines of text * @param sequenceFileName Name of the sequence file to create * @param hadoopFS Hadoop file system * * @throws IOException */ public static void writeToSequenceFile(File dataFile, String sequenceFileName, String hadoopFS) throws IOException { IntWritable key = null; BytesWritable value = null; conf.set("fs.defaultFS", hadoopFS); Path path = new Path(sequenceFileName); if ((conf != null) && (dataFile != null) && (dataFile.exists())) { SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path), SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new GzipCodec()), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); List<String> lines = FileUtils.readLines(dataFile); for (int i = 0; i < lines.size(); i++) { value = new BytesWritable(lines.get(i).getBytes()); key = new IntWritable(i); writer.append(key, value); } IOUtils.closeStream(writer); } } /** * Read a Hadoop sequence file on HDFS. * * @param sequenceFileName Name of the sequence file to read * @param hadoopFS Hadoop file system * * @throws IOException */ public static void readSequenceFile(String sequenceFileName, String hadoopFS) throws IOException { conf.set("fs.defaultFS", hadoopFS); Path path = new Path(sequenceFileName); SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf); while (reader.next(key, value)) { logger.info("key : " + key + " - value : " + new String(value.getBytes())); } IOUtils.closeStream(reader); } /** * Copy a local sequence file to a remote file on HDFS. * * @param from Name of the sequence file to copy * @param to Name of the sequence file to copy to * @param remoteHadoopFS HDFS host URI * * @throws IOException */ public static void copySequenceFile(String from, String to, String remoteHadoopFS) throws IOException { conf.set("fs.defaultFS", remoteHadoopFS); FileSystem fs = FileSystem.get(conf); Path localPath = new Path(from); Path hdfsPath = new Path(to); boolean deleteSource = true; fs.copyFromLocalFile(deleteSource, localPath, hdfsPath); logger.info("Copied SequenceFile from: " + from + " to: " + to); } /** * Print all the values in Hadoop HDFS configuration object. * * @param conf */ public static void listHadoopConfiguration(Configuration conf) { int i = 0; logger.info("------------------------------------------------------------------------------------------"); Iterator iterator = conf.iterator(); while (iterator.hasNext()) { i++; iterator.next(); logger.info(i + " - " + iterator.next()); } logger.info("------------------------------------------------------------------------------------------"); } }
2. HadoopClientTest.java
package com.noushin.hadoop.client; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration public class HadoopClientTest { @Autowired HadoopClient hadoopClient; String sequenceFileName = "/tmp/nb.sgz"; String hadoopLocalFS = "file:///"; String hadoopRemoteFS = "hdfs://stage-hadoop01:8020"; @Test public void testConfig() { Configuration conf = new Configuration(); HadoopClient.listHadoopConfiguration(conf); } @Test public void testWriteSequenceFile() { String dataFileName = "/tmp/test.txt"; try { int numOfLines = 20; String baseStr = "....Test..."; List<String> lines = new ArrayList<String>(); for (int i = 0; i < numOfLines; i++) lines.add(i + baseStr + UUID.randomUUID()); File dataFile = new File(dataFileName); FileUtils.writeLines(dataFile, lines, true); Thread.sleep(2000); HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void testReadSequenceFile() { try { HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS); } catch (IOException e) { e.printStackTrace(); } } @Test public void testCopySequenceFileToRemoteHDFS() { String tempFileName = "/tmp/local-test.txt"; String sequenceFileName = "/tmp/seqfile-record-compressed.sgz"; String hadoopLocalFS = "file:///"; String hadoopRemoteFS = "hdfs://stage-hadoop01:8020"; try { int numOfLines = 5; String baseStr = "....Test..."; List<String> lines = new ArrayList<String>(); for (int i = 0; i < numOfLines; i++) lines.add(i + baseStr + UUID.randomUUID()); File dataFile = new File(tempFileName); FileUtils.writeLines(dataFile, lines, true); Thread.sleep(2000); HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS); HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS); HadoopClient.copySequenceFile(sequenceFileName, sequenceFileName, hadoopRemoteFS); HadoopClient.readSequenceFile(sequenceFileName, hadoopRemoteFS); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
3. pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.noushin.hadoop</groupId> <artifactId>client</artifactId> <version>0.0.1-SNAPSHOT</version> <name>hdpc</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop-client.version>2.0.0-cdh4.2.0</hadoop-client.version> <junit.version>4.10</junit.version> <log4j.version>1.2.17</log4j.version> <spring.version>3.2.0.RELEASE</spring.version> </properties> <dependencies> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <!-- Hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-client.version}</version> <scope>provided</scope> </dependency> <!-- Logging --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <!-- Spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>org.springframework.test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> </dependencies> </project>
If you are using Eclipse for development and testing like I do, you need to add the following step, so you can compress your sequence file using GZip.
If you notice, I am using Hadoop by Cloudera in my pom file. To use GZip, I need to add a native library to my development environment which is Ubuntu 12.10.
sudo apt-get update; sudo apt-get install hadoop
This will install Hadoop native libraries in /usr/lib/hadoop/lib/native. Now, In Eclipse, edit ->Properties->Java Build Path->Libraries->Maven Dependencies->Native library location, and set "Location Path" to /usr/lib/hadoop/lib/native.
Please make sure the version of Hadoop client dependecy you use in your pom file matches the version of Hadoop you downloaded to your system, otherwise you will get a run time error:
ERROR nativeio.NativeIO: Unable to initialize NativeIO libraries
To verify a sequence file was created on HDFS, log into one of your hadoop nodes and run this command:
hadoop fs -ls /tmp/nb.sgz
And, if you run into a problem and need to see what Hadoop is doing, turn on debugging for Hadoop classes by adding the following entry to your log4j.properties: #Turn on hadoop logging
log4j.logger.org.apache.hadoop=DEBUG
To run Hive: login using a hadoop user such as oozie_job, so that environment is set up.
$ sudo su - oozie_job
To use hive:
$ hive
now you can query data using sql like commands:
DESCRIBE my_transactions;
SELECT * FROM my_transactions WHERE year=2013 AND month=3 AND day=14;
To see where a partition is pointing to:
DESCRIBE EXTENDED my_transactions PARTITION(year=2013, month=3, day=28);
To create a partition, so Hive can find data for its queries:
ALTER TABLE my_transactions ADD PARTITION(year=2013, month=3, day=26) LOCATION '/tmp/2013/03/26';
To drop a partition and point it to a new location:
ALTER TABLE my_transactions DROP PARTITION(year=2013, month=3, day=26);
Posted by Noushin
Bashir at 2:37
PM
Labels: GZip, Hadoop, Sequence
File
3 comments:
Krishna
seoJuly
24, 2013 at 2:09 AM
The Information which you provided is very much useful for Hadoop
Online TrainingLearners Thank You for Sharing Valuable Information
Reply
magnifictrainingJuly
24, 2013 at 2:10 AM
Fantastic article ! You havemade some very astute statements and I appreciate the the effort you have put into your writing. Its clear that you know what you are writing about. I am excited to read more of your sites content.
Hadoop online training
Reply
sudheerAugust
20, 2013 at 3:38 AM
The information which you have provided is very good and easily understood.
It is very useful who is looking for Hadoop
Online Training.
Reply
相关文章推荐
- Hadoop中文件读写(Java)
- Hadoop中文件读写(Java) &lt;转&gt;
- Java 文件读写(txt类型读写并追加内容)
- java编程注意事项之读写文件
- java读写各类型文件操作总结
- 理解JDIC中的文件类型关联-Java基础-Java-编程开发
- 标C编程笔记day05 函数声明、文件读写、联合类型、枚举类型
- Java核心编程之文件随机读写类RandomAccessFile详解
- 基于Java语言的安卓编程学习之文件读写(上)
- Hadoop 数据类型与文件结构剖析 Sequence, Map, Set, Array, BloomMap Files
- java实现对properties类型文件的读写
- Hadoop中文件读写(Java) <转>
- 基于Java语言的安卓编程之二十三文件读写(下)
- Hadoop编程学习(四):使用FileSystem类进行文件读写及查看文件信息
- Hadoop编程学习(四):使用FileSystem类进行文件读写及查看文件信息
- Hadoop中文件读写(Java) <转>
- Hadoop 数据类型与文件结构剖析 Sequence, Map, Set, Array, BloomMap Files
- Hadoop中文件读写(Java)
- Hadoop中文件读写(Java)
- Java拷贝指定目录的指定类型文件到指定目录