您的位置:首页 > 大数据

大数据企业学习篇02_2------hadoop深入

2017-12-11 11:41 295 查看

一.HDFS架构及启动过程



namenode:

<1>单一节点,负责管理文件系统的命名空间以及客户端对文件的访问。(注意:文件的元数据操作包括打开,关闭,重命名文件及文件夹等)

<2>文件操作,namenode负责文件的元数据操作,datanode负责文件的读写请求。与文件内容相关的数据流不经过namenode,只会询问它与哪个datanode联系,否则namenode会成为系统的瓶颈

<3>副本存放在哪些datanode上由namenode决定,根据全局情况做出块放置决定,读取文件时namenode尽量让用户先读取最近的副本,降低快消耗和读取时延

<4>namenode全权管理块的复制,它周期性的从集群每个datanode接受心跳信号和块报告,接收到心跳信号意味着该datanode节点正常工作,块报告包含该datanode节点上所有数据块的列表

datanode:

<1>一个数据块在datanode以文件存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据校验和以及时间戳

<2>datanode启动后向namenode注册,通过后,周期性(一小时)向namenode上报所有的块信息。

<3>心跳是每三秒一次,心跳的返回结果带有namenode给datanode的命令,如复制块数据到另一台机器或删除某个数据块,如果超过10分钟没有收到某个datanode的心跳,则认为该节点不可用

<4>集群运行中可以加入和推迟一些机器,可以使用hdfs dfs balancer去平衡集群资源

文件:

<1>文件被切分成块(默认大小128M),以块为单位,每个有多个副本存储在不同机器的机器上,副本数可在文件生成时指定(默认是3),通过(hadoop fs -setrep xxx指定即可)

注意:副本的放置策略:在本地机架一个节点放置一个副本,本地机架另一个节点存放一个,另一个放在不同机架不同节点上

<2>可以创建、删除、移动、重命名文件,当文件创建、写入、关闭之后不能修改文件内容

数据损坏处理:

<1>当datanode读取block的时候,它会计算校验和checksum

<2>如果计算后的校验和checksum,与block创建时不一致,则表示块已损坏

<3>namenode标记该块已经损坏,然后复制block达到预期设置的文件备份数

<4>datanode在其文件创建后三周验证其checksum

二.NameNode的启动过程详解

<1>第一次启动NameNode时:

(1)format文件系统,其目的就是生成镜像文件fsimage

(2)start namenode,读取fsimage

(3)start datanode,向NameNode注册,获取心跳及块报告

(4)create dir /user/xiaojiangshi/ —> write edits

(5)put file /user/xiaojiangshi/ —> write edits

(6)rm -rf /user/xiaojiangshi/ —> write edits

<2>第二次启动NameNode时:

(1)start namenode

*读取fsimage、edits

*生成新的fsimage

*生成空的edits

(2)start datanode,向NameNode注册,获取心跳及块报告

(3)create dir /user/xiaojiangshi/ —> write edits

(4)put file /user/xiaojiangshi/ —> write edits

(6)rm -rf /user/xiaojiangshi/ —> write edits

三、secondary NameNode辅助功能详解



SecondaryNameNode的主要作用是用于合并fsimage和edits文件。在没有SecondaryNameNode守护进程的情况下,从namenode启动开始至namenode关闭期间所有的HDFS更改操作都将记录到edits文件,这样会造成巨大的edits文件,所带来的直接危害就是下次启动namenode过程会非常漫长。

在启动SecondaryNameNode守护进程后,每当满足一定的触发条件(每3600s、文件数量增加100w等),SecondaryNameNode都会拷贝namenode的fsimage和edits文件到自己的目录下,首先将fsimage加载到内存中,然后加载edits文件到内存中合并fsimage和edits文件为一个新的fsimage文件,然后将新的fsimage文件拷贝回namenode目录下。并且声称新的edits文件用于记录DFS的更改。

四.安全模式safemode

<1>作用:

等待datanode向它发送快报告

<2>注意:

*全新的HDFS集群,NameNode启动时不会进入安全模式,因为还没有block信息

*datanodes blocks/total blocks(可通过fsimage获得) = 99.999% ,此时安全模式才会退出

*可查看文件系统的文件,但是不等保证文件的访问,因为可能这个文件的block信息NameNode还不一定知道

*不能改变文件系统的命名空间(如创建文件夹、上传文件、删除文件)

<3>操作:

$>hdfs dfsadmin -safemode enter //进入安全模式
$>hdfs dfsadmin -safemode leave //离开安全模式
$>hdfs dfsadmin -safemode get   //查看安全模式的状态


五、Yarn架构

<1>Hadoop 1.x与Hadoop 2.x的区别



Hadoop 2.x中,mapreduce只复制数据处理,集群资源的管理由yarn负责。

<2>Yarn的架构图



Resource Manager:

*整个集群只有一个,负责整个集群的资源管理和任务调度

*功能:

*处理client请求

*启动/监控 Application Master

*监控nodemanager

NodeManager:

*整个集群有多个,负责每个节点上的资源管理和使用

*功能:

*处理来自resource manager的命令

*处理来自的Application Master命令

*每个节点的资源管理和任务管理

*注意:

*定时的向resource manager汇报本节点的资源使用情况和各个container的运行状态

Application Master:

*管理每个应用程序的一个实例

*功能:

*为应用程序申请资源,并进一步分配给内部任务

*任务监控和容错

*注意:

通过nodemanager监控container执行和资源使用(如cpu、内存等)

Container:

*Yarn中资源的抽象,封装某个节点上多维度资源(内存、CPU、磁盘、网络等)。当AM向RM申请资源时,RM向AM返回的资源就是用container表示的

*Yarn会为每个任务分配一个container,且该任务只能使用该container中描述的资源(资源独占性)

<3>Yarn资源管理

*最重
bd47
要和最基础的就是资源调度和资源隔离,资源调度由RM完成,资源隔离由各个NM完成

*资源调度:RM将某个NM上资源分配任务

资源隔离:NM需按照要求为任务提供相应的资源,甚至保证这些资源具有独占性,为任务运行提供基础保证

*yarn同时支持内存和CPU两种资源的调度,内存资源的多少会决定任务的生死,CPU资源只会决定任务运行的快慢

<4>Yarn资源可配置(yarn-site.xml)

<5>以Yarn为核心的生态系统



<6>小插曲:

Apache Slider可以将分布式应用部署到Yarn上

五.MapReduce 编程模型

<1>MapReduce原理

思想:分而治之,整个计算并行计算过程抽象为两个函数

map(分):对一些独立元素组成的列表的每一个元素进行指定操作,可以高度并行

reduce(合):对一个列表的元素进行合并

注意:一个简单的MapReduce程序只需指定map、reduce、input、output,剩下的由框架完成

<2>MapReduce的运行过程

*分为两个阶段:map和reduce

*map阶段由一定数量的map task组成

*输入数据格式解析:inputformat

*输入数据处理:Mapper

*数据分区:partitioner

*reduce阶段由一定数量的reduce task组成

*数据远程拷贝

*数据按照key排序

*数据处理:reduce

*输出数据格式:outputformat

<3>MapReduce编程模板(以Wordcount为例)

package com.Nenu.module.WordCount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* WordCount!
*
*/
public class WordCount extends Configured implements Tool {
// 1.mapper class
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text mapoutkey=new Text();
private final IntWritable mapoutvalue=new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//get linevalue
String linevalue=value.toString();

//split
StringTokenizer stringTokenizer=new StringTokenizer(linevalue);

//iterator
while(stringTokenizer.hasMoreTokens()){
//get word value
String outvalue=stringTokenizer.nextToken();
//set word value
mapoutkey.set(outvalue);
//out
context.write(mapoutkey, mapoutvalue);

}

}

}

// 2.reduce class
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable reduceoutvalue=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// get values
int sum=0;
for(IntWritable value:values){
sum+=value.get();
}

//set value
reduceoutvalue.set(sum);
//output
context.write(key, reduceoutvalue);

}

}

// driver ,component job
public int run(String[] args) throws Exception {
//1  get configuration
Configuration conf=getConf();
//2  create job
Job job=Job.getInstance(conf, "wordcount");
//run jar
job.setJarByClass(this.getClass());
//3  set job
//3.1 input
Path inpath=new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);

//3.2 map
job.setMapperClass(WordCountMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//3.3 reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//3.4 output
Path outpath=new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);

//4  submit job
boolean isSuccess=job.waitForCompletion(true);
return isSuccess ? 0 : 1 ;
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
int status=ToolRunner.run(conf, new WordCount(), args);
System.exit(status);

}
}


<3>wordcount处理过程

*1.将文件分成splits,由于测试文件较小,所以每个文件为1个split,并将文件按行切分成
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: