您的位置:首页 > 其它

mapreduce自定义类型-空指针异常之坑NullPointerException

2017-10-19 14:29 766 查看
大数据小白一个。在使用mapreduce处理公司实际业务的过程中,有个mapreduce需要用到自定义类型,打包运行时,却遇到空指针NullPointerException异常,耽误了好长时间才找出问题的根源,特以此博客记录,留作学习使用。

场景:从hbase的一张表(activity_statistics)读取数据, 进行处理后, 写入另一张hbase表(activity_scores),mapper阶段的输出使用自定义类型UserActivityScore。

先说解决方法:使用自定义类型,需实现WritableComparable接口,除了要重写 write 、 readFields方法,还要有无参的构造方法,并对自定义类型中的参数进行初始化,本人就是定义了变量但是没有进行初始化,从而导致了空指针异常,耽误了好长时间。下面直接上代码。

GradeRunnder.java

package com.dxyun.dxdp.activity.scores.runner;

import com.dxyun.dxdp.activity.scores.entity.UserActivityScore;
import com.dxyun.dxdp.activity.scores.mapper.GradeMapper;
import com.dxyun.dxdp.activity.scores.reducer.GradeReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GradeRunner implements Tool {
private Configuration conf;

@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(conf, "activity_grade");
job.setJarByClass(GradeRunner.class);

//args[0]:要读取的hbase表   mapper输出的key、value类型分别为Text 和 自定义类型UserActivityScore
TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), GradeMapper.class, Text.class, UserActivityScore.class, job);
//args[1]:要写入的hbase表
TableMapReduceUtil.initTableReducerJob(args[1], GradeReducer.class, job);

boolean flag = job.waitForCompletion(true);

return flag ? 0 : 1;
}

@Override
public void setConf(Configuration conf) {
......此处省略mapreduce参数配置
this.conf = conf;
}

@Override
public Configuration getConf() {
return conf;
}

public static void main(String[] args) {
try {
int flag = ToolRunner.run(new GradeRunner(), args);
System.exit(flag ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}


UserActivityScore
package com.dxyun.dxdp.activity.scores.entity;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class UserActivityScore implements WritableComparable<UserActivityScore> {
    private String userID;
    private String gmf_sj;
    private String gmf_email;
    private String scorePerYear;
    private String scorePerMonth;
    private String scorePerHourPeriod;
    private String scoreLast7days;
    private String scoreLast30days;
    private String scoreLast1year;

//无参构造方法
    public UserActivityScore() {
    	  //变量初始化,一定要有,不然空指针异常,本人就吃了没有初始化的亏。
        this.userID = "";
        this.gmf_sj = "";
        this.gmf_email = "";
        this.scorePerYear = "";
        this.scorePerMonth = "";
        this.scorePerHourPeriod = "";
        this.scoreLast7days = "";
        this.scoreLast30days = "";
        this.scoreLast1year = "";
    }

    public String getUserID() {
        return userID;
    }

    public String getGmf_sj() {
        return gmf_sj;
    }

    public String getGmf_email() {
        return gmf_email;
    }

    public String getScorePerYear() {
        return scorePerYear;
    }

    public String getScorePerMonth() {
        return scorePerMonth;
    }

    public String getScorePerHourPeriod() {
        return scorePerHourPeriod;
    }

    public String getScoreLast7days() {
        return scoreLast7days;
    }

    public String getScoreLast30days() {
        return scoreLast30days;
    }

    public String getScoreLast1year() {
        return scoreLast1year;
    }

    public void setUserID(String userID) {
        this.userID = userID;
    }

    public void setGmf_sj(String gmf_sj) {
        this.gmf_sj = gmf_sj;
    }

    public void setGmf_email(String gmf_email) {
        this.gmf_email = gmf_email;
    }

    public void setScorePerYear(String scorePerYear) {
        this.scorePerYear = scorePerYear;
    }

    public void setScorePerMonth(String scorePerMonth) {
        this.scorePerMonth = scorePerMonth;
    }

    public void setScorePerHourPeriod(String scorePerHourPeriod) {
        this.scorePerHourPeriod = scorePerHourPeriod;
    }

    public void setScoreLast7days(String scoreLast7days) {
        this.scoreLast7days = scoreLast7days;
    }

    public void setScoreLast30days(String scoreLast30days) {
        this.scoreLast30days = scoreLast30days;
    }

    public void setScoreLast1year(String scoreLast1year) {
        this.scoreLast1year = scoreLast1year;
    }

    @Override
    public int compareTo(UserActivityScore o) {
        return 0;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(userID);
        out.writeUTF(gmf_sj);
        out.writeUTF(gmf_email);
        out.writeUTF(scorePerYear);
        out.writeUTF(scorePerMonth);
        out.writeUTF(scorePerHourPeriod);
        out.writeUTF(scoreLast7days);
        out.writeUTF(scoreLast30days);
        out.writeUTF(scoreLast1year);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.userID = in.readUTF();
        this.gmf_sj = in.readUTF();
        this.gmf_email = in.readUTF();
        this.scorePerYear = in.readUTF();
        this.scorePerMonth = in.readUTF();
        this.scorePerHourPeriod = in.readUTF();
        this.scoreLast7days = in.readUTF();
        this.scoreLast30days = in.readUTF();
        this.scoreLast1year = in.readUTF();
    }
}


GradeMapper.java

package com.dxyun.dxdp.activity.scores.mapper;

import com.dxyun.dxdp.activity.scores.entity.UserActivityScore;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

public class GradeMapper extends TableMapper<Text, UserActivityScore> {
@Override
protected void map(ImmutableBytesWritable rowkey, Result value, Context context) throws IOException, InterruptedException {
String userID = Bytes.toString(value.getValue("user_info".getBytes(), "userID".getBytes()));
String gmf_sj = Bytes.toString(value.getValue("user_info".getBytes(), "gmf_sj".getBytes()));
String gmf_email = Bytes.toString(value.getValue("user_info".getBytes(), "gmf_email".getBytes()));
String invoiceNumPerYear = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerYear".getBytes()));
String invoiceNumPerMonth = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerMonth".getBytes()));
String invoiceNumPerHourPeriod = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerHourPeriod".getBytes()));
String invoiceNumLast7days = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast7days".getBytes()));
String invoiceNumLast30days = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast30days".getBytes()));
String invoiceNumLast1year = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast1year".getBytes()));

//计算分数
String scorePerYear = getScores(invoiceNumPerYear);
String scorePerMonth = getScores(invoiceNumPerMonth);
String scorePerHourPeriod = getScores(invoiceNumPerHourPeriod);
String scoreLast7days = getScoreByNum(invoiceNumLast7days);
String socreLast30days = getScoreByNum(invoiceNumLast30days);
String scoreLast1year = getScoreByNum(invoiceNumLast1year);

UserActivityScore userActivityScore = new UserActivityScore();
if (null != userID && !(("").equals(userID))) {
userActivityScore.setUserID(userID);
}
if (null != gmf_sj && !(("").equals(gmf_sj))) {
userActivityScore.setGmf_sj(gmf_sj);
}
if (null != gmf_email && !(("").equals(gmf_email))) {
userActivityScore.setGmf_email(gmf_email);
}
if (null != scorePerYear && !(("").equals(scorePerYear))) {
userActivityScore.setScorePerYear(scorePerYear);
}
if (null != scorePerMonth && !(("").equals(scorePerMonth))) {
userActivityScore.setScorePerMonth(scorePerMonth);
}
if (null != scorePerHourPeriod && !(("").equals(scorePerHourPeriod))) {
userActivityScore.setScorePerHourPeriod(scorePerHourPeriod);
}
if (null != scoreLast7days && !(("").equals(scoreLast7days))) {
userActivityScore.setScoreLast7days(scoreLast7days);
}
if (null != socreLast30days && !(("").equals(socreLast30days))) {
userActivityScore.setScoreLast30days(socreLast30days);
}
if (null != scoreLast1year && !(("").equals(scoreLast1year))) {
userActivityScore.setScoreLast1year(scoreLast1year);
}

//rowkey 作为key , 自定义类型userActivityScore 作为value 输出
context.write(new Text(Bytes.toString(rowkey.get())), userActivityScore);
}

public String getScores(String str) {
......跟业务相关,省略
return result;
}

public String getScoreByNum(String invoiceNum) {
......跟业务相关,省略
return invoiceNum;
}
}
GradeReducer.java 
package com.dxyun.dxdp.activity.scores.reducer;

import com.dxyun.dxdp.activity.scores.entity.UserActivityScore;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GradeReducer extends TableReducer<Text, UserActivityScore, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<UserActivityScore> values, Context context) throws IOException, InterruptedException {
int index = 0;
String userID = "";
String gmf_sj = "";
String gmf_email = "";
String scorePerYear = "";
String scorePerMonth = "";
String scorePerHourPeriod = "";
String scoreLast7days = "";
String scoreLast30days = "";
String scoreLast1year = "";

....业务代码,用于获取mapper阶段的计算结果,省略。

Put put = new Put(key.toString().getBytes());
if (null != userID && !(("").equals(userID))) {
put.addColumn("user_info".getBytes(), "userID".getBytes(), userID.getBytes());
}
if (null != gmf_sj && !(("").equals(gmf_sj))) {
put.addColumn("user_info".getBytes(), "gmf_sj".getBytes(), gmf_sj.getBytes());
}
if (null != gmf_email && !(("").equals(gmf_email))) {
put.addColumn("user_info".getBytes(), "gmf_email".getBytes(), gmf_email.getBytes());
}
if (null != scorePerYear && !(("").equals(scorePerYear))) {
put.addColumn("user_info".getBytes(), "scorePerYear".getBytes(), scorePerYear.getBytes());
}
if (null != scorePerMonth && !(("").equals(scorePerMonth))) {
put.addColumn("user_info".getBytes(), "scorePerMonth".getBytes(), scorePerMonth.getBytes());
}
if (null != scorePerHourPeriod && !(("").equals(scorePerHourPeriod))) {
put.addColumn("user_info".getBytes(), "scorePerHourPeriod".getBytes(), scorePerHourPeriod.getBytes());
}
if (null != scoreLast7days && !(("").equals(scoreLast7days))) {
put.addColumn("user_info".getBytes(), "scoreLast7days".getBytes(), scoreLast7days.getBytes());
}
if (null != scoreLast30days && !(("").equals(scoreLast30days))) {
put.addColumn("user_info".getBytes(), "socreLast30days".getBytes(), scoreLast30days.getBytes());
}
if (null != scoreLast1year && !(("").equals(scoreLast1year))) {
put.addColumn("user_info".getBytes(), "scoreLast1year".getBytes(), scoreLast1year.getBytes());
}

context.write(null, put);
}
}

附:不进行变量初始化时的报错信息:
17/10/19 14:10:52 INFO mapreduce.Job:  map 0% reduce 0%
17/10/19 14:11:00 INFO mapreduce.Job: Task Id : attempt_1507929502667_0059_m_000000_0, Status : FAILED
Error: java.lang.NullPointerException
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
at com.dxyun.dxdp.activity.scores.entity.UserActivityScore.write(UserActivityScore.java:108)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1164)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:721)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.dxyun.dxdp.activity.scores.mapper.GradeMapper.map(GradeMapper.java:66)
at com.dxyun.dxdp.activity.scores.mapper.GradeMapper.map(GradeMapper.java:17)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)







                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐