您的位置:首页 > 运维架构

精通HADOOP(八) - MAPREDUCE任务的基础知识 - 配置作业

2010-11-30 01:45 656 查看

1.1 配置作业

所有的Hadoop作业有一个用来配置实际上的MapReduce任务和提交它到Hadoop框架的主程序。JobConf对象是用来处理这些配置的。MapReduceIntro样例类为你使用JobConf类并且提交一个作业到Hadoop框架提供了一个模板。所有的代码都依赖于MapReduceIntroConfig类,如下列表2-4所示,这个类确保了你设置了正确的输入和输出目录。

列表2-4 MapReduceIntroConfig.java

package com.apress.hadoopbook.examples.ch2;
import java.io.IOException;
import java.util.Formatter;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
/** A simple class to handle the housekeeping for the MapReduceIntro
* example job.
*
*
*
* This job explicitly configures the job to run, locally and without a
* distributed file system, as a stand alone application.
*

*
* The input is read from the directory /tmp/MapReduceIntroInput and
* the output is written to the directory
* /tmp/MapReduceIntroOutput. If the directory
* /tmp/MapReduceIntroInput is missing or empty, it is created and
* some input data files generated. If the directory
* /tmp/MapReduceIntroOutput is present, it is removed.
*

*
* @author Jason Venner
*/
public class MapReduceIntroConfig {
/**
* Log4j is the recommended way to provide textual information to the user
* about the job.
*/
protected static Logger logger =
Logger.getLogger(MapReduceIntroConfig.class);
/** Some simple defaults for the job input and job output. */
/**
* This is the directory that the framework will look for input files in.
* The search is recursive if the entry is a directory.
*/
protected static Path inputDirectory =
new Path("file:///tmp/MapReduceIntroInput");
/**
* This is the directory that the job output will be written to. It must not
* exist at Job Submission time.
*/
protected static Path outputDirectory =
new Path("file:///tmp/MapReduceIntroOutput");
/**
* Ensure that there is some input in the [code]inputDirectory
,
* the
outputDirectory
does not exist and that this job will
* be run as a local stand alone application.
*
* @param conf
* The {@link JobConf} object that is required for doing file
* system access.
* @param inputDirectory
* The directory the input will reside in.
* @param outputDirectory
* The directory that the output will reside in
* @throws IOException
*/
protected static void exampleHouseKeeping(final JobConf conf,
final Path inputDirectory, final Path outputDirectory)
throws IOException {
/**
* Ensure that this job will be run stand alone rather than relying on
* the services of an external JobTracker.
*/
conf.set("mapred.job.tracker", "local");
/** Ensure that no global file system is required to run this job. */
conf.set("fs.default.name", "file:///");
/**
* Reduce the in ram sort space, so that the user does not need to
* increase the jvm memory size. This sets the sort space to 1 Mbyte,
* which is very small for a real job.
*/
conf.setInt("io.sort.mb", 1);
/**
* Generate some sample input if the
inputDirectory
is
* empty or absent.
*/
generateSampleInputIf(conf, inputDirectory);
/**
* Remove the file system item at
outputDirectory
if it
* exists.
*/
if (!removeIf(conf, outputDirectory)) {
logger.error("Unable to remove " + outputDirectory + "job aborted");
System.exit(1);
}
}
/**
* Generate
fileCount
files in the directory
*
inputDirectory
, where the individual lines of the file
* are a random integer TAB file name.
*
* The file names will be file-N where N is between 0 and
*
fileCount
- 1. There will be between 1 and
*
maxLines
+ 1 lines in each file.
*
* @param fs
* The file system that
inputDirectory
exists in.
* @param inputDirectory
* The directory to create the files in. This directory must
* already exist.
* @param fileCount
* The number of files to create.
* @param maxLines
* The maximum number of lines to write to the file.
* @throws IOException
*/
protected static void generateRandomFiles(final FileSystem fs,
final Path inputDirectory, final int fileCount, final int maxLines)
throws IOException {
final Random random = new Random();
logger .info( "Generating 3 input files of random data," +
"each record is a random number TAB the input file name");
for (int file = 0; file < fileCount; file++) {
final Path outputFile = new Path(inputDirectory, "file-" + file);
final String qualifiedOutputFile = outputFile.makeQualified(fs)
.toUri().toASCIIString();
FSDataOutputStream out = null;
try {
/**
* This is the standard way to create a file using the Hadoop
* Framework. An error will be thrown if the file already
* exists.
*/
out = fs.create(outputFile);
final Formatter fmt = new Formatter(out);
final int lineCount = (int) (Math.abs(random.nextFloat())
* maxLines + 1);
for (int line = 0; line < lineCount; line++) {
fmt.format("%d/t%s%n", Math.abs(random.nextInt()),
qualifiedOutputFile);
}
fmt.flush();
} finally {
/**
* It is very important to ensure that file descriptors are
* closed. The distributed file system code can run out of file
* descriptors and the errors generated in that case are
* misleading.
*/
out.close();
}
}
}
/**
* This method will generate some sample input, if the
*
inputDirectory
is missing or empty.
*
* This method also demonstrates some of the basic APIs for interacting
* with file systems and files. Note: the code has no particular knowledge
* of the type of file system.
*
* @param conf
* The Job Configuration object, used for acquiring the
* {@link FileSystem} objects.
* @param inputDirectory
* The directory to ensure has sample files.
* @throws IOException
*/
protected static void generateSampleInputIf(final JobConf conf,
final Path inputDirectory) throws IOException {
boolean inputDirectoryExists;
final FileSystem fs = inputDirectory.getFileSystem(conf);
if ((inputDirectoryExists = fs.exists(inputDirectory))
&& !isEmptyDirectory(fs, inputDirectory)) {
if (logger.isDebugEnabled()) {
logger
.debug("The inputDirectory "
+ inputDirectory
+ " exists and is either a"
+ " file or a non empty directory");
}
return;
}
/**
* We should only get here if
inputDirectory
does not
* exist, or is an empty directory.
*/
if (!inputDirectoryExists) {
if (!fs.mkdirs(inputDirectory)) {
logger.error("Unable to make the inputDirectory "
+ inputDirectory.makeQualified(fs) + " aborting job");
System.exit(1);
}
}
final int fileCount = 3;
final int maxLines = 100;
generateRandomFiles(fs, inputDirectory, fileCount, maxLines);
}
/**
* bean access getter to the {@link #inputDirectory} field.
*
* @return the value of inputDirectory.
*/
public static Path getInputDirectory() {
return inputDirectory;
}
/**
* bean access getter to the {@link outputDirectory} field.
*
* @return the value of outputDirectory.
*/
public static Path getOutputDirectory() {
return outputDirectory;
}
/**
* Determine if a directory has any non zero files in it or its descendant
* directories.
*
* @param fs
* The {@link FileSystem} object to use for access.
* @param inputDirectory
* The root of the directory tree to search
* @return true if the directory is missing or does not contain at least one
* non empty file.
* @throws IOException
*/
private static boolean isEmptyDirectory(final FileSystem fs,
final Path inputDirectory) throws IOException {
/**
* This is the standard way to read a directory's contents. This can be
* quite expensive for a large directory.
*/
final FileStatus[] statai = fs.listStatus(inputDirectory);
/**
* This method returns null under some circumstances, in particular if
* the directory does not exist.
*/
if ((statai == null) || (statai.length == 0)) {
if (logger.isDebugEnabled()) {
logger.debug(inputDirectory.makeQualified(fs).toUri()
+ " is empty or missing");
}
return true;
}
if (logger.isDebugEnabled()) {
logger.debug(inputDirectory.makeQualified(fs).toUri()
+ " is not empty");
}
/** Try to find a file in the top level that is not empty. */
for (final FileStatus status : statai) {
if (!status.isDir() && (status.getLen() != 0)) {
if (logger.isDebugEnabled()) {
logger.debug("A non empty file "
+ status.getPath().makeQualified(fs).toUri()
+ " was found");
return false;
}
}
}
/** Recurse if there are sub directories,
* looking for a non empty file.
*/
for (final FileStatus status : statai) {
if (status.isDir() && isEmptyDirectory(fs, status.getPath())) {
continue;
}
/**
* If status is a directory it must not be empty or the previous
* test block would have triggered.
*/
if (status.isDir()) {
return false;
}
}
/**
* Only get here if no non empty files were found in the entire subtree
* of
inputPath
.
*/
return true;
}
/**
* Ensure that the
outputDirectory
does not exist.
*
*
* The framework requires that the output directory not be present at job
* submission time.
*

*
* This method also demonstrates how to remove a directory using the
* {@link FileSystem} API.
*

*
* @param conf
* The configuration object. This is needed to know what file
* systems and file system plugins are being used.
* @param outputDirectory
* The directory that must be removed if present.
* @return true if the the
outputPath
is now missing, or
* false if the
outputPath
is present and was unable
* to be removed.
* @throws IOException
* If there is an error loading or configuring the FileSystem
* plugin, or other IO error when attempting to access or remove
* the
outputDirectory
.
*/
protected static boolean removeIf(final JobConf conf,
final Path outputDirectory) throws IOException {
/** This is standard way to acquire a FileSystem object. */
final FileSystem fs = outputDirectory.getFileSystem(conf);
/**
* If the
outputDirectory
does not exist this method is
* done.
*/
if (!fs.exists(outputDirectory)) {
if (logger.isDebugEnabled()) {
logger .debug("The output directory does not exist,"
+ " no removal needed.");
}
return true;
}
/**
* The getFileStatus command will throw an IOException if the path does
* not exist.
*/
final FileStatus status = fs.getFileStatus(outputDirectory);
logger.info("The job output directory "
+ outputDirectory.makeQualified(fs) + " exists"
+ (status.isDir() ? " and is not a directory" : "")
+ " and will be removed");
/**
* Attempt to delete the file or directory. delete recursively just in
* case
outputDirectory
is a directory with
* sub-directories.
*/
if (!fs.delete(outputDirectory, true)) {
logger.error("Unable to delete the configured output directory "
+ outputDirectory);
return false;
}
/** The outputDirectory did exist, but has now been removed. */
return true;
}
/**
* bean access setter to the {@link inputDirectory} field.
*
* @param inputDirectory
* The value to set inputDirectory to.
*/
public static void setInputDirectory(final Path inputDirectory) {
MapReduceIntroConfig.inputDirectory = inputDirectory;
}
/**
* bean access setter for the {@link outpuDirectory field.
*
* @param outputDirectory
* The value to set outputDirectory to.
*/
public static void setOutputDirectory(final Path outputDirectory) {
MapReduceIntroConfig.outputDirectory = outputDirectory;
}
}[/code]

首先你必须创建一个JobConf对象。传入一个包含你的map和reduce功能类的JAR文件是一个最佳实践。这确保框架在运行你的map和Reduce任务的时候能够找到这个JAR文件。

JobConf conf = new JobConf(MapReduceIntro.class);


既然你已经创建了JobConfig类的实例,conf,你需要为你的作业设置参数。这包含输入和输出目录位置,输入和输出格式,mapper和reducer类。

所有作业都需要一个map阶段,map阶段负责处理作业输入。map阶段的配置需要你指定输入位置和从输入文件产生键值对的类,mapper类,还可能有推荐的Map任务的数量,map输出类型,每个Map任务需要的线程数,表格2-2所示。

表格 2‑2 Map阶段的配置
元素

是否需要

缺省

输入路径



读取和转换输入路径元素到键值对的类



map输出关键字的类型



作业输出的关键字的类型

map输出键值的类型



作业输出的键值的类型

提供map功能的类



最小数量的Map任务



集群中缺省设置

每个map任务所用的线程数



1

大多数Hadoop核心作业的输入都是一套文件,这些文件或者是基于包含文本键值对的行的格式,或者是Hadoop指定的包含序列化键值对的二进制文件格式。处理文本键值对的类是KeValueTextInputFormat。处理Hadoop指定的二进制文件的类是SequenceFileInputFormat。

1.1.1 指定输入格式
Hadoop框架提供了大量的输入格式。文本输入格式和二进制输入格式是其中两种主要的格式。另外还存在一些其他的格式,

KeyValueTextInputFormat: 每行一个键值对。

TextInputFormant: 关键字是行数,键值是行本身。

NLineInputFormat: 相似于KeyValueTextInputFormat,但是输入的分割不是使用输入的第N个字节计算的,而是通过第N行来计算的。

MultiFileInputFormat: 可是让用户把多个文件结合成为一个输入分割的抽象类。

SequenceFIleInputFormat: 以Hadoop序列文件作为输入,包含序列化的键值对。

KeyValueTextInputFormat和SequenceFileInputFormat是最通用的输入格式。这章的样例程序使用了KeyValueTextInputFormat,因为它的输入文件是可以用肉眼看懂的。

下列的代码块给框架提供了作业输入的类型和位置信息。

/**
* This section is the actual job configuration portion /**
* Configure the inputDirectory and the type of input. In this case
* we are stating that the input is text, and each record is a
* single line, and the first TAB is the separator between the key
* and the value of the record.
*/
conf.setInputFormat(KeyValueTextInputFormat.class);
FileInputFormat.setInputPaths(conf,
MapReduceIntroConfig.getInputDirectory());


这行代码,conf.setInputFormat(KeyValueTextInputFormat.class),告诉框架所有的输入文件都是基于包含文本键值对的行的。

KeyValueTextInputFormat

KeyValueTextInputFormat格式类读取一个文本文件,然后,把它分割成为若干个记录,一行作为一个记录。通过每一行中的tab字符把每一行进一步分割成为键值对。如果某一行中没有tab字符,那么整个行会被认为是关键字,值对象则为空。如果tab字符是一行中的最后一个位置,框架将会忽略这个tab字符。

假设一个输入文件有如下三行数据,TAB表示US-ASCII水平字符(0x09),

key1TABvalue1

key2

key3TABvalue3TABvalue4

框架会传入如下键值对到你的mapper,

Key1, value1

Key2

Key3, value3TABvalue4

事实上传入到你的map功能的顺序是不确定的。在一个真正的运行环境中,运行map功能的多个机器中的哪台机器得到哪个键值也是不确定的。然而,输入中的一套连续的记录会被一个Map任务所处理是非常可能的。因为每一个任务会接收一个输入分割然后开始工作。

框架假设输入的字节是UTF-8字符编码。自从Hadoop 0.18.2,你不能改变KeyValueTextInputFormat类所处理的输入文件的字符集配置。

既然框架知道去哪里查找输入文件和用于去从输入产生键值对的类,你需要通知框架使用哪一个map功能。

/** Inform the framework that the mapper class will be the {@link
* IdentityMapper}. This class simply passes the input key-value
* pairs directly to its output, which in our case will be the
* shuffle.
*/
conf.setMapperClass(IdentityMapper.class);


请注意,这章的样例程序不使用可选的配置参数。如果map功能需要输入不同于作业输出的关键字和键值的类,你能够在这里设置这些类型。除此之外,Hadoop支持多线程的map功能。这对于不能完全利用分配给Map任务的资源来说是理想的。一个Map任务在多台机器上的服务器日志中执行DNS查找就可以利用多线程来提高效率性。

1.1.2 设置输出参数
尽管一个作业不需要产生任何输出,框架仍然需要你设置输出参数。框架从指定的作业中收集输出(如果Reduce任务不存在,则手机Map任务的输出,否则,则收集Reduce任务的输出),然后把它们放入配置的输出目录中。在把作业输出写入输出目录时,为了避免文件名的冲突,你需要保证作业启动之前输出目录不存在。

在我们的简单样例中,MapReduceIntroConfig类确保输出目录不存在,而且把输出目录指定给框架。它也指定输出格式,输出的键值对的类型。

Text类类似于Java语言的String类。它实现了WritableComparable接口(WritableComparable接口是关键字必须实现的)和Writable接口(Writable接口是键值所必需实现的)。Writable是WritableComparable的一个子集,也就是一个超接口类型。不像String类型,Text类是可变的,它有许多方法用来处理UTF-8字节数据。

Writable的主要优点是框架知道如何序列化和反序列化实现了Writable的对象。WritableComparable接口增加了compareTo方法,所以框架知道如何排序实现了WritableComparable的对象。如列表2-5和2-6所示Comparable和Writable代码。

下面的代码提供了一个样例,这个样例拥有MapReduce任务输出的最小化配置:

/** Configure the output of the job to go to the output directory.
* Inform the framework that the Output Key and Value classes will be
* {@link Text} and the output file format will {@link
* TextOutputFormat}. The TextOutput format class produces a record of
* output for each Key,Value pair, with the following format.
* Formatter.format( "%s/t%s%n", key.toString(), value.toString() );.
*
* In addition indicate to the framework that there will be
* 1 reduce. This results in all input keys being placed
* into the same, single, partition, and the final output
* being a single sorted file.
*/
FileOutputFormat.setOutputPath(conf,
MapReduceIntroConfig.getOutputDirectory());
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);


关于配置如下代码行,FileOutputFormat.setOutputPath(conf, MapReduceIntroConfig.getOutputDirectory()), 和这章前面讨论的输入样例是一致的。conf.setOutputKeyClass(Text.class)和conf.setOutputValueClass(Text.class)两行配置代码是新加入的。这些配置信息使框架知道Reduce任务输出的键值对是什么类型的。Reduce任务输出的键值对的类型也是Map任务输出的键值对的缺省类型。因此,你也能通过如下代码行,conf.setMapOutputKeyClass(Class),显式的设置Map任务输出的关键字的类型。你也能够通过如下代码行,conf.setMapOutputValueClass(Class),显式的设置Map任务输出的键值的类型。

列表2-5 WritableComparable.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
/**
* A {@link Writable} which is also {@link Comparable}.
*
* [code]WritableComparable
s can be compared to each other, typically
* via
Comparator
s. Any type which is to be used as a
*
key
in the Hadoop Map-Reduce framework should implement this
* interface.

*
* Example:

*
* public class MyWritableComparable implements WritableComparable {
* // Some data
* private int counter;
* private long timestamp;
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
* public int compareTo(MyWritableComparable w) {
* int thisValue = this.value;
* int thatValue = ((IntWritable)o).value;
* return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
* }
* }
*


*/
public interface WritableComparable extends Writable, Comparable {
}[/code]

列表2-6 Writable.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
/**
* A serializable object which implements a simple, efficient, serialization
* protocol, based on {@link DataInput} and {@link DataOutput}.
*
* Any [code]key
or
value
type in the Hadoop Map-Reduce
* framework implements this interface.

*
* Implementations typically implement a static
read(DataInput)

* method which constructs a new instance, calls {@link #readFields(DataInput)}
* and returns the instance.

*
* Example:

*
* public class MyWritable implements Writable {
* // Some data
* private int counter;
* private long timestamp;
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
* public static MyWritable read(DataInput in) throws IOException {
* MyWritable w = new MyWritable();
* w.readFields(in);
* return w;
* }
* }
*


*/
public interface Writable {
/**
* Serialize the fields of this object to
out
.
*
* @param out
DataOuput
to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from
in
.
*
*
For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.

*
* @param in
DataInput
to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}[/code]

1.1.3 配置Reduce阶段
为了配置Reduce阶段,用户必须为框架提供如下的5种信息,

1. Reduce任务的数量,如果是0,则没有Reduce阶段

2. 提供reduce方法的类型

3. Reduce任务输入的键值对的类型,缺省情况下,就是Reduce任务的输出类型

4. Reduce任务的输出关键字/键值类型

5. Reduce任务的输出文件类型

前面“设置输出参数”小节中讨论了如何设置输出键值对的类型和输出文件类型,这里我们仅仅讨论Reduce任务的数量和Reducer类的设置。

配置的Reduce任务数量决定运行reduce阶段的作业的输出文件的数量。改变Reduce任务数量对作业的整体效率会有很大的影响。对于每一个输出文件,对关键字排序花费的时间和关键字的数量是成正比的。除此之外,reduce任务的数量决定并行运行的reduce任务的最大数量。

框架通常会有一个配置的缺省的Reduce任务的数量。你可以使用mapred.reduce.tasks参数设置这个值,缺省是1。这会使框架产生仅仅一个排序的输出文件。因此,仅仅在一台机器上存在一个Reduce任务,它会处理所有的键值对。

Reduce任务的数量通常是在作业的配置阶段设置的,如下代码所示,

conf.setNumReduceTasks(1);

通常来讲,除非你有特殊的需求要求仅仅一个Reduce任务,你通常使用集群里的可同时执行的机器数量来设置这个值。在第9章,DataJoinReduceOutput是一个样例程序用来合并多个Reduce任务输出到一个单个文件。

集群执行时序

一个典型的集群是由M个TaskTracker机器组成,每个机器有C个CPU,每个CPU支持T个线程。

这在一个集群里会产生M*C*T个执行时序。在我的环境中,一个机器通常有8个CPU,每个CPU支持一个线程,对于一个小型集群会有10台TaskTracker机器。在这样的一个集群中,我们可以得到10*8*1=80个执行时序。如果你的任务不是CPU绑定的,你可以调整执行时序的数量来优化你的TaskTracker机器的CPU利用率。配置参数mapred.tasktracker.map.tasks.maximum控制同时执行在一个TaskTracker节点上的map任务最大数量。配置参数mapred.tasktracker.reduce.tasks.maximum控制同时执行在一个TaskTracker节点上的reduce任务最大数量。这需要打开per-job开关,这是目前Hadoop的一个缺点,因为缺省条件下最大值不是按照per-job配置的,而且,需要重启集群才能生效。

仅仅当Reducer任务数量不是0的情况下你才需要设置Reducer类型。有很多情况下你不需要设置Reducer类型,因为你经常会不需要排序输出结果,也不需要通过关键字对键值进行分组。Reducer类型的设置是非常简单的:

/** Inform the framework that the reducer class will be the
* {@link IdentityReducer}. This class simply writes an output record
* key/value record for each value in the key/value set it receives as
* input. The value ordering is arbitrary.
*/
conf.setReducerClass(IdentityReducer.class);


通用异常

对于框架来说,正确设置输出参数是至关紧要的。一个通用的错误就是,reduce任务遇到下列的异常,

java.io.IOException: Type mismatch in key from map: expected

org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.Text

这个错误表明框架使用了缺省的输出关键字类型,或者在配置过程中设置不正确。为了改正这个错误,使用下面的代码行:

conf.setOutputKeyClass( Text.class )

或者如果你的map输出和你的作业输出不同,使用下面的代码行:

conf.setMapOutputKeyClass( Text.class )

这个错误也可能出现在键值类上:

java.io.IOException: Type mismatch in value from map: expected

org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.Text

你也需要使用相关的setOutputValueClass() 或者 setMapOutputValue()类方法来改正这个错误。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐