您的位置:首页 > 产品设计 > UI/UE

Java 编程读写 Hadoop Sequence 类型文件

2014-01-16 14:16 627 查看


Reading & Writing Hadoop Sequence Files in Java

原文链接:
http://noushinb.blogspot.jp/2013/04/reading-writing-hadoop-sequence-files.html
In this blog I'll show you how to write a simple hadoop client application in Java. This app will handle reading, writing and copying Hadoop Sequence Files on local or remote Hadoop file systems (HDFS).

1. HadoopClient.java

package com.noushin.hadoop.client;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

/**
* This class handles interactions with Hadoop.
*
* @author nbashir
*
*/
@Component
public class HadoopClient {

private static Configuration conf = new Configuration();
private final static Logger logger = Logger.getLogger(HadoopClient.class);

/**
* Convert the lines of text in a file to binary and write to a Hadoop
* sequence file.
*
* @param dataFile File containing lines of text
* @param sequenceFileName Name of the sequence file to create
* @param hadoopFS Hadoop file system
*
* @throws IOException
*/
public static void writeToSequenceFile(File dataFile, String sequenceFileName, String hadoopFS) throws IOException {

IntWritable key = null;
BytesWritable value = null;

conf.set("fs.defaultFS", hadoopFS);
Path path = new Path(sequenceFileName);

if ((conf != null) && (dataFile != null) && (dataFile.exists())) {
SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new GzipCodec()),
SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));

List<String> lines = FileUtils.readLines(dataFile);

for (int i = 0; i < lines.size(); i++) {
value = new BytesWritable(lines.get(i).getBytes());
key = new IntWritable(i);
writer.append(key, value);
}
IOUtils.closeStream(writer);
}
}

/**
* Read a Hadoop sequence file on HDFS.
*
* @param sequenceFileName Name of the sequence file to read
* @param hadoopFS Hadoop file system
*
* @throws IOException
*/
public static void readSequenceFile(String sequenceFileName, String hadoopFS) throws IOException {
conf.set("fs.defaultFS", hadoopFS);
Path path = new Path(sequenceFileName);
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
logger.info("key : " + key + " - value : " + new String(value.getBytes()));
}
IOUtils.closeStream(reader);
}

/**
* Copy a local sequence file to a remote file on HDFS.
*
* @param from Name of the sequence file to copy
* @param to Name of the sequence file to copy to
* @param remoteHadoopFS HDFS host URI
*
* @throws IOException
*/
public static void copySequenceFile(String from, String to, String remoteHadoopFS) throws IOException {
conf.set("fs.defaultFS", remoteHadoopFS);
FileSystem fs = FileSystem.get(conf);

Path localPath = new Path(from);
Path hdfsPath = new Path(to);
boolean deleteSource = true;

fs.copyFromLocalFile(deleteSource, localPath, hdfsPath);
logger.info("Copied SequenceFile from: " + from + " to: " + to);
}

/**
* Print all the values in Hadoop HDFS configuration object.
*
* @param conf
*/
public static void listHadoopConfiguration(Configuration conf) {
int i = 0;
logger.info("------------------------------------------------------------------------------------------");
Iterator iterator = conf.iterator();
while (iterator.hasNext()) {
i++;
iterator.next();
logger.info(i + " - " + iterator.next());
}
logger.info("------------------------------------------------------------------------------------------");
}
}

2. HadoopClientTest.java

package com.noushin.hadoop.client;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class HadoopClientTest {

@Autowired
HadoopClient hadoopClient;

String sequenceFileName = "/tmp/nb.sgz";
String hadoopLocalFS = "file:///";
String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";

@Test
public void testConfig() {
Configuration conf = new Configuration();
HadoopClient.listHadoopConfiguration(conf);
}

@Test
public void testWriteSequenceFile() {
String dataFileName = "/tmp/test.txt";

try {
int numOfLines = 20;
String baseStr = "....Test...";
List<String> lines = new ArrayList<String>();
for (int i = 0; i < numOfLines; i++)
lines.add(i + baseStr + UUID.randomUUID());

File dataFile = new File(dataFileName);
FileUtils.writeLines(dataFile, lines, true);
Thread.sleep(2000);
HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
}
catch (IOException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}

@Test
public void testReadSequenceFile() {

try {
HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
}
catch (IOException e) {
e.printStackTrace();
}
}

@Test
public void testCopySequenceFileToRemoteHDFS() {
String tempFileName = "/tmp/local-test.txt";
String sequenceFileName = "/tmp/seqfile-record-compressed.sgz";
String hadoopLocalFS = "file:///";
String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";

try {
int numOfLines = 5;
String baseStr = "....Test...";
List<String> lines = new ArrayList<String>();
for (int i = 0; i < numOfLines; i++)
lines.add(i + baseStr + UUID.randomUUID());

File dataFile = new File(tempFileName);
FileUtils.writeLines(dataFile, lines, true);
Thread.sleep(2000);
HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
HadoopClient.copySequenceFile(sequenceFileName, sequenceFileName, hadoopRemoteFS);
HadoopClient.readSequenceFile(sequenceFileName, hadoopRemoteFS);
}
catch (IOException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}

3. pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.noushin.hadoop</groupId>
<artifactId>client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hdpc</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop-client.version>2.0.0-cdh4.2.0</hadoop-client.version>
<junit.version>4.10</junit.version>
<log4j.version>1.2.17</log4j.version>
<spring.version>3.2.0.RELEASE</spring.version>
</properties>

<dependencies>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-client.version}</version>
<scope>provided</scope>
</dependency>

<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>

<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>org.springframework.test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

If you are using Eclipse for development and testing like I do, you need to add the following step, so you can compress your sequence file using GZip.

If you notice, I am using Hadoop by Cloudera in my pom file. To use GZip, I need to add a native library to my development environment which is Ubuntu 12.10.

sudo apt-get update; sudo apt-get install hadoop

This will install Hadoop native libraries in /usr/lib/hadoop/lib/native. Now, In Eclipse, edit ->Properties->Java Build Path->Libraries->Maven Dependencies->Native library location, and set "Location Path" to /usr/lib/hadoop/lib/native.

Please make sure the version of Hadoop client dependecy you use in your pom file matches the version of Hadoop you downloaded to your system, otherwise you will get a run time error:

ERROR nativeio.NativeIO: Unable to initialize NativeIO libraries

To verify a sequence file was created on HDFS, log into one of your hadoop nodes and run this command:

hadoop fs -ls /tmp/nb.sgz

And, if you run into a problem and need to see what Hadoop is doing, turn on debugging for Hadoop classes by adding the following entry to your log4j.properties: #Turn on hadoop logging

log4j.logger.org.apache.hadoop=DEBUG

To run Hive: login using a hadoop user such as oozie_job, so that environment is set up.

$ sudo su - oozie_job

To use hive:

$ hive

now you can query data using sql like commands:

DESCRIBE my_transactions;

SELECT * FROM my_transactions WHERE year=2013 AND month=3 AND day=14;

To see where a partition is pointing to:

DESCRIBE EXTENDED my_transactions PARTITION(year=2013, month=3, day=28);

To create a partition, so Hive can find data for its queries:

ALTER TABLE my_transactions ADD PARTITION(year=2013, month=3, day=26) LOCATION '/tmp/2013/03/26';

To drop a partition and point it to a new location:

ALTER TABLE my_transactions DROP PARTITION(year=2013, month=3, day=26);


Posted by Noushin
Bashir at 2:37
PM

Labels: GZip, Hadoop, Sequence
File


3 comments:



Krishna
seoJuly
24, 2013 at 2:09 AM

The Information which you provided is very much useful for Hadoop
Online TrainingLearners Thank You for Sharing Valuable Information
Reply



magnifictrainingJuly
24, 2013 at 2:10 AM

Fantastic article ! You havemade some very astute statements and I appreciate the the effort you have put into your writing. Its clear that you know what you are writing about. I am excited to read more of your sites content.

Hadoop online training
Reply



sudheerAugust
20, 2013 at 3:38 AM

The information which you have provided is very good and easily understood.

It is very useful who is looking for Hadoop
Online Training.
Reply
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: