您的位置:首页 > 其它

流式计算Storm和Kafka知识点

2018-03-06 17:47 363 查看

企业级消息队列(Kafka)

Kafka是什么? 消息队列
为什么要有消息队列? 解耦、异构、并行
Kafka数据生成方式producer-->kafka---->保存到本地
consumer---主动拉取数据

Kafka核心概念producer(生产者)消息不丢失
数据分发策略

kafka brokerstopic(主题):一类消息,比如用户信息、商品信息、订单信息
partition为什么要有partition? 数据太大,单个节点无法存储。
partition的物理形态? 文件夹
partition的数量设计?如果集群很小(10),将partition设置为broker的数量
如果集群大于10台,就要根据数据量来设计

replication为什么要有replication? 保证数据安全,数据容错的考虑。
设置几个副本?数据的越重要,副本数可以设的越高。但是,数据冗余高,ack时间越长,效率越低。
平常设置2个。

segment(段)为什么要分段? 如果文件巨大,删除麻烦、查找麻烦。分段大小,1G。这个可以设置。
删除数据的过期时间,168小时,等于7天。注意,在规划Kafka集群的是,要考虑数据存储几天的。Kafka集群的数量,建议3-5台。 24T * 5 = 120T
segment物理形态:有个log文件和index索引文件。log 存放的原始数据
index 存放的是offset和物理地址值
数据查找中会有二分查找法(掌握)

pagecache现代操作系统提供缓存技术,Kafka将当前生产的数据保存在缓存中。
由于生产和消费时间差很小,所以消费者消费数据的时候,基本上都是从内存中获取数据。

sendfile作为消费者,想消费历史的数据。senfile技术不经过应用,在操作系统层面,读取完数据之后,直接输出到网卡。

partition isr如果partition有多个副本,需要选择一个leader出来,负责数据的读写读写读写操作。
这个leader有可能挂掉,因为压力大。
如果使用投票机制,会有相对来说比较大时间消费,时刻准备好备胎。
满足什么样条件才能isr成员?同步leader的数据,在某个时间阈值和数量阈值内。不满足条件就踢出ISR。

partition leaderleader是针对partition的描述。
负责数据的读写。

consumerconsumerGroup,消费数据都是以消费组的形式 出现的。
消费组中的成员,消费数据是互不干扰的。当有一个消费者挂了之后,会在确定消费者无法重新消费后,触发负载均衡。
两个不同的消费组,消费同一个topic的数据,都是完整。注意:在实际开发过程,要将自己的消费组设计成唯一的。
consumer消费offset管理。0.8版本,offset是有zookeeper进行管理的。
0.8+,可以选择使用kafka的consumer_offset的topic进行管理。

Kafka常见问题?Kafka为什么那么快? pageche、sendfile
Kafka消费不丢失机制? producer、broker、consumer。
Kafka消费数据全局有序? 单个partition是有序,全局有序违背设计的初衷。

流式计算框架(Storm)

流式计算框架的组成:一般flume+kafka+storm+redis,每个组件都可以被替换掉。
Storm是什么? 流式计算框架,一旦启动,永不停止。
Storm架构是什么?client:用来创建一个stormtopology,然后将stormtopology序列化之后,通过rpc的提交到nimbus。
nimbus:发布rpc的服务,接受client的任务提交,对任务进行校验。将任务放入任务队列中(阻塞队列)。后台线程读取队列,获得任务信息,进行任务分配。获取当前集群中,空闲的worker资源。
获取当前任务需要多少个Task。Task数量就是所有component(组件)的并行数量加上每个worker会启动的一个ackerBolt之和。
将任务信息保存到zookeeper。

zookeeper:zookeeper保存任务信息及节点各种其他信息。
supervisor:通过watch机制,得到任务信息,然后启动属于自己的worker。
worker:被supervisor启动,负责具体的任务执行。Task:本质上是个线程,是个executor。分为三种类型 SpoutTask、BoltTask、AckerTask。

Storm的编程模型?Spout extends baseRichSpoutopen :初始化方法
nextTuple :有个while一直调用该方法,调用一次发送一次数据。
field:声明输出的字段名称和数量

bolt1 extends baseRichBolt (手动ack)prepare:初始化方法
execute: 执行方法
field:声明输出的字段名称和数量

bolt2 extends baseBasicBolt(自动ack)execute: 执行方法
field:声明输出的字段名称和数量

驱动类:topologybuilder
运行模式:本地模式、集群模式

Storm组件的并行度怎么设置?Spout是根据上游kafka的topic的分区数量设置。
Bolt1是根据Spout发送的数据/bolt1处理每条数据量(单位时间 1S)
Bolt2是根据Spout发送的数据/bolt1处理每条数据量(单位时间 1S)

Spout的worker的数量怎么设置?根据所有组件的并行度之和,进行设置。
可以一个worker有两个Spout或者多个Spout。
如果Spout下游不同层级的所有的bolt的数量很多情况下,运算压力,可以考虑worker和spout数量保持一致。压力更大,只能修改partition的数量。
如果修改不了partition数量,只能曾加worker数,任由数据充斥在网络中。

Spout上下游衔接策略(StreamGrouping)localorshuffle 分组策略是任何时候的第一选择。
fieldGroup 字段分组。

Storm的原理

任务提交流程用来创建一个stormtopology,然后将stormtopology序列化之后,通过rpc的提交到nimbus。
扩展:RPC框架,动态+反射技术+网络通信技术。

集群启动流程Java系统流程: Java -jar、Java -server、Java -client
手动启动:nimbus、supervisor
自动启动:supervisor根据任务信息启动worker。

任务执行流程与nimbus、supervisor没有半毛钱关系,都在worker。
SpoutTask.open() 一般用来打开外部的数据源
while 方式调用 nextTuple方法。发送数据,需要考虑数据的分组策略。发送数据都是发送Tuple,会携带当前Tuple要发送给哪个taskid。
然后根据task分配信息,得到taskid所在的worker。
通过网络请求将tuple发送给远端的worker。
远端的worker有个接受的线程,根据taskid找到对应的Bolt的输入队列(无锁队列,每秒处理600万订单),将tuple放到bolt的输入中。
每个Task都是一个线程,后台不停的消费输入队列的内容。消费到消息之后,会调用bolt的execute方法,将数据传入给bolt。

Bolt的execute方法接收到了Tuple,经过一顿处理。然后向下游发送数据Tuple。发送数据时,会根据下游的分组策略。比如:localorshuffle。
如果是localorshuffle方式,直接找到当前worker中的对应Task,进行分发。将数据放入响应bolt的输入队列。
每个Task都是一个线程,后台不停的消费输入队列的内容。消费到消息之后,会调用bolt的execute方法,将数据传入给bolt。

如此循环。

消息不丢失机制如何开启消息不丢失机制?spout端发送数据的时候,要加上messageid.
spout要重写ack和fail方法。
在topologybuiler的config文件中设置setNumAckers的数量大于1.默认是1
在下游个每个层级bolt上,需要增加锚点。

现象是什么?当消息处理成功,会调用ack方法。
当消息处理失败超时处理,默认30S,会调用faile方法,并传入messageid。
真的异常了,消息重发。 消息重发,需要手动设置。消息重发,最好是在spout发送tuple的时候,将tuple本身当做messageid传入,失败后,直接发送messageid

实现机制?异或机制。相同为0,不同为1。
需要上游发送时候的时候,发一个状态。并且需要下游处理完数据之后,发送一个状态。这两个状态值是一样的。
每个层级都会产生新的锚点id。64位长整型。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Storm