您的位置:首页 > 产品设计 > UI/UE

MapReuce笔记五之SequenceFile,MapFile

2016-10-12 18:04 411 查看

SequenceFile

SequenceFile是hadoop中提供的一种二进制文件支持,可以将小文件序列化到大文件中。文件名称为key文件内容为value,优点是支持压缩格式(CompressionType.BLOCK和CompressionType.RECORD),反序列化速度快。缺点是上传后的小文件不好定位检索时需要遍历所有的小文件。

以下是使用sequenceFile上传小文件到hdfs然后再读取的代码实例

package mapreduce;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class SequenceFileExample {

/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//1:获取FileSystem对象
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.79.139:9000");
FileSystem fileSystem = FileSystem.get(uri, conf);

//2:sequenceFile写操作,从windows上传文件到hdfs
sequenceFileWrite(conf, fileSystem);

//3:sequenceFile读 操作
sequenceFileRead(conf, fileSystem);
}

@SuppressWarnings("deprecation")
private static void sequenceFileRead(Configuration conf,
FileSystem fileSystem) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, new Path("/sequence.seq"), conf);
Text key = new Text();
Text value = new Text();
while(reader.next(key, value))
{
System.out.println(key.toString() + " " + value.toString());
}
IOUtils.closeStream(reader);
}

@SuppressWarnings("deprecation")
private static void sequenceFileWrite(Configuration conf,
FileSystem fileSystem) throws IOException {
/**
* writer方法显示过时了但是仍可以使用
* 第三个参数是上传后在hdfs中的文件名
* 第四第五个参数分别是key和value的类型,这里我指定为Text
*/
SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem, conf,
new Path("/sequence.seq"), Text.class, Text.class);

//3:上传文件
//这里的F:/sequencefile文件夹下有存放了数千个小文件
Collection<File> listFiles = FileUtils.listFiles(new File("F:/sequencefile"), null, true);
Text key = null;
Text value = null;
for(File file : listFiles)
{
key = new Text(file.getName());
value = new Text(FileUtils.readFileToString(file));
writer.append(key, value);
}

IOUtils.closeStream(writer);
fileSystem.close();
System.out.println("ok");
}

}



Hdfs的sequence.seq文件就是我通过sequenceFile上传的文件

SequenceFile压缩

         SequenceFile支持两种格式压缩CompressionType.BLOCK和CompressionType.RECORD,下面是我写的例子,不过这个例子我没有实验。

package mapreduce;

import java.io.File;
import java.net.URI;
import java.util.Collection;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.zookeeper.common.IOUtils;

public class SequenceFileZipExample {

@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
//1:获取FileSystem对象
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.79.139:9000");
FileSystem fileSystem = FileSystem.get(uri, conf);

//2:sequenceFile写操作
Path path = new Path("/sequence.block");
FSDataOutputStream out = fileSystem.create(path);
//type 可以为CompressionType.RECORD;CompressionType.BLOCK
CompressionType type = CompressionType.BLOCK;
SequenceFile.Writer createWriter = SequenceFile.createWriter(conf, out, Text.class, Text.class, type, new GzipCodec());
Collection<File> listFiles = FileUtils.listFiles(new File("F:/sequencefile"), null, true);
Text key = null;
Text value = null;
for(File file : listFiles)
{
key = new Text(file.getName());
value = new Text(FileUtils.readFileToString(file));
createWriter.append(key, value);
}
IOUtils.closeStream(createWriter);
System.out.println("ok");
}

}


MapFile

MapFile是排序的sequenceFile,采用data+index目录结构。Index是文件的数据索引记录每个record的key值以及该record在文件中的偏移位置,所以mapFile的检索效率高可以快速的定位record位置,但同时也会消耗多余的空间来存储index数据。

另外需要注意两点:

1.mapfile并不会记录所有record的key值,因为那样量就太大了,默认是每128条record存储一个key,这个条数可以通过MapFile.Writer().setIndexInterval(interval)修改。

2.mapfile的keyclass必须实现WritableComparable,这样就可以进行比较。

以下是使用mapFile读写文件的例子

package mapreduce;

import java.io.File;
import java.net.URI;
import java.util.Collection;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;

public class MapFileExample {

@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
//1:获取FileSystem
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.79.139:9000");
FileSystem fs = FileSystem.get(uri, conf);
System.out.println(fs);

//2:MapFile上传文件
MapFile.Writer writer = new MapFile.Writer(conf, fs, "/mapfile.map", Text.class, Text.class);;
Collection<File> listFiles = FileUtils.listFiles(new File("F:/sequencefile"), null, true);
Text key = null;
Text value = null;
for(File file : listFiles)
{
key = new Text(file.getName());
value = new Text(FileUtils.readFileToString(file));
writer.append(key, value);
}
IOUtils.closeStream(writer);

//3:MapFile读文件
MapFile.Reader reader = new MapFile.Reader(fs, "/mapfile.map", conf);
Text keyr = new Text();
Text valuer = new Text();
while(reader.next(keyr, valuer))
{
System.out.println(keyr.toString() + " " + valuer.toString());
}
IOUtils.closeStream(reader);

IOUtils.closeStream(fs);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息