Storm 准备(一)
2016-04-05 03:03
651 查看
hadoop解决了分布式存储与计算问题,但是大部分都是进行的离线计算时间周期比较长。企业里急切的希望解决实时的进行大数据分析storm在这样的环境下诞生了(PPT资料)
如何搭建一个企业的实时数据平台:
1.收集数据 flume工具(分级,安全,压缩)
2.汇总 flume 消息队列KAFKA(基于硬盘但是速度很快) hadoop集群
3.实时处理引擎
4.结果存储 mysql mongodb Redis HBase
5.应用
流式处理,KAFKA消息队列来从硬盘级来解决大数据的实时处理,基于硬盘的为啥数据处理还是那么快:分布式的消息队列partition分区是队列形式从队列头部取数据打打减少了寻址过程。
kafka在storm中用来收集汇总数据:zookeeper在kafak中的作用如下图:(协调kafka)
kafka基本组建:
tipic话题:是指特定类型的消息流。
producer 生产者:能够发送消息到话题的任何对象
Broker代理:或者Kafka集群,保存已发布的消息到一组服务器中
consumer消费者:可以订阅多个话题,并从Broker拉取数据,从而消费这些已发送的消息。
kafka的特点:
1.消息保存在磁盘,O(1)时间复杂度
2.不使用内存?使用磁盘缓存
3.消费状态保存在消费客户端
4.可以保存足够大的未处理消息
在kafka中,一个partition中的消息只会背group中的一个consumer消费;每个group中的consumer消息消费相互独立;我们可以认为一个group是一个订阅者,一个tpic中的每个partions,只会被一个订阅者中的consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。从topic角度来说,消息仍然不是有序的。
部署zookeeper的步骤:
第一步解压zookeeper配置conf/zoo.cfg文件
第二步:启动服务,用自带的客户端连接确认是否安装成功
第三步:可选 安装zk的ui软件 zkui
安装部署Kafka:
用shell程序类来模拟一个生产者
shelllogger.sh
启动looger
如何搭建一个企业的实时数据平台:
1.收集数据 flume工具(分级,安全,压缩)
2.汇总 flume 消息队列KAFKA(基于硬盘但是速度很快) hadoop集群
3.实时处理引擎
4.结果存储 mysql mongodb Redis HBase
5.应用
流式处理,KAFKA消息队列来从硬盘级来解决大数据的实时处理,基于硬盘的为啥数据处理还是那么快:分布式的消息队列partition分区是队列形式从队列头部取数据打打减少了寻址过程。
kafka在storm中用来收集汇总数据:zookeeper在kafak中的作用如下图:(协调kafka)
kafka基本组建:
tipic话题:是指特定类型的消息流。
producer 生产者:能够发送消息到话题的任何对象
Broker代理:或者Kafka集群,保存已发布的消息到一组服务器中
consumer消费者:可以订阅多个话题,并从Broker拉取数据,从而消费这些已发送的消息。
kafka的特点:
1.消息保存在磁盘,O(1)时间复杂度
2.不使用内存?使用磁盘缓存
3.消费状态保存在消费客户端
4.可以保存足够大的未处理消息
在kafka中,一个partition中的消息只会背group中的一个consumer消费;每个group中的consumer消息消费相互独立;我们可以认为一个group是一个订阅者,一个tpic中的每个partions,只会被一个订阅者中的consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。从topic角度来说,消息仍然不是有序的。
部署zookeeper的步骤:
第一步解压zookeeper配置conf/zoo.cfg文件
对应代码修改成如下: # set server server.1=192.168.145.129:4887:5887 #server.2=cluster-node-02:4887:5887 #server.3=cluster-node-03:4887:5887 #server.4=cluster-node-04:4887:5887
第二步:启动服务,用自带的客户端连接确认是否安装成功
bin/zkServer.sh start zk客户端:bin/zkCli.sh[root@localhost zookeeper-3.4.6]# vim conf/zoo.cfg [root@localhost zookeeper-3.4.6]# bin/zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg Starting zookeeper ... STARTED 修改conf/zoo.cfg配置文件中的dataDir、dataLogDir、server.1 # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. dataDir=/home/shaka/dep/zookeeper-3.3.6/data (改成自己的目录) # the port at which the clients will connect clientPort=2181 # set logs dataLogDir=/home/shaka/dep/zookeeper-3.3.6/logs (改成自己的目录) # set server server.1=hostname:4887:5887 (改成自己的hostname或IP) #server.2=10.162.219.52:4887:5887 #server.3=10.163.15.119:4887:5887 # add by shaka # set max client connects maxClientCnxns=300
[root@localhost zookeeper-3.4.6]# bin/zkCli.sh Connecting to localhost:2181 Welcome to ZooKeeper! JLine support is enabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [consumers, config, admin, brokers, zookeeper, controller_epoch]
命令行工具的一些简单操作如下: 1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容 2. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串 3. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串 4. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置 5. 删除文件: delete /zk 将刚才创建的 znode 删除 6. 退出客户端: quit 7. 帮助命令: help
第三步:可选 安装zk的ui软件 zkui
首先安装编译工具mvn 配置环境变量: export MVN_HOME=/usr/local/apache-maven-3.2.1 PATH=$PATH:$HOME/bin:$MVN_HOME/bin #set java environment export PATH 到解压的zkui目录下执行:(需要联网下载jar包) mvn clean install 在zkui下面建立连接文件: ln -s target/zkui-2.0......jar zkui-2.0....jar 启动zkui vim config.cfg zkServer=192.168.145.129:2181 启动:nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar & 查看:http://123.56.76.213:9090 或者 jps -l
安装部署Kafka:
step1:拷贝软件包 step2:解压 tar -xvf kafka_2.9.2-0.8.1.1.tgz 或者用 unzip kafka_2.9.2-0.8.1.1.zip ,修改配置文件conf/server.properties: broker.id=0; host.name=192.169.145.129; zookeeper.connect=192.168.145.129:2181可逗号分隔配置多个 step3: 确保有执行权限 chmod +x sbin/* step 修改配置文件vim log4j.properties log4j.appender.D.File = /data1/home/shaka/kafka/kafka_2.9.2-0.8.1.1/logs/debug.log log4j.appender.E.File = /data1/home/shaka/kafka/kafka_2.9.2-0.8.1.1/logs/error.log Step5:启动kafka服务 sbin/start-kafka.sh 查看是否启动: jsp -l step6.创建topic主题: bin/kafka-create-topic.sh --zookeeper 192.168.145.129:2181 --replica 1 --partition 1 --topic mykafka 测试例子: 创建topic bin/kafka-topics.sh --create --zookeeper 192.168.145.129:2181 --replication-factor 1 --partitions 2 --topic topic-ydcun-name (IP不能是localhost)</span> 启动consumer bin/kafka-console-consumer.sh --zookeeper 192.168.145.129:2181 --topic topic-ydcun (IP不能localhost) 启动productor bin/kafka-console-producer.sh --broker-list 192.168.145.129:9092 --topic topic-ydcun 在productor端输入,看consumer端的输出。
用shell程序类来模拟一个生产者
修改logger: 启动 :nohup sh shelllogger.sh >> shelllogger.log 2>&1 & 查看启动: ps -axu|grep shell kill : 查看: tail -f access.log
shelllogger.sh
#!/bin/sh # start cmd: # nohup sh shellcrawler.sh >> shellcrawler.log 2>&1 & # set timer g_getTime="" function getTime { g_getTime=`date '+%Y%m%d %H:%M:%S'` } #getTime && echo "[$g_getTime] [$0:$LINENO:$FUNCNAME] - " # set function function crawler { int=1 while(( $int<=1000000000 )) do log="insert into test(name,content) values('ydcun','hello word');" let "int++" echo $log >> access.log sleep 1s #usleep 1000 done } # main crawler
启动looger
nohup tail -f ../logger/access.log | bin/kafka-console-producer.sh --broker-list 192.168.145.129:9092 --topic topic-ydcun > logs/producer.log 2>&1 &
相关文章推荐
- LeetCode 225. Implement Stack using Queues(使用队列来实现栈)
- C#实现的ReplaceFirst和ReplaceLast
- Android 获取内存、内部存储、外部存储空间大小
- lintcode-medium-Reorder List
- android的Environment类
- 用OpenInventor实现的NeHe OpenGL教程-第四十七课
- 用OpenInventor实现的NeHe OpenGL教程-第四十八课
- 用OpenInventor实现的NeHe OpenGL教程-第四十六课
- 用OpenInventor实现的NeHe OpenGL教程-第四十五课
- 用OpenInventor实现的NeHe OpenGL教程-第四十四课
- Swift学习笔记之变量与常量声明
- LeetCode 224. Basic Calculator(基本计算器)
- Java动态代理机制详解(JDK 和CGLIB,Javassist,ASM)
- django之创建第7-5-第二种传值方式(time/1232/xiaodneg)
- 静电场分析
- 软考中高项学员:2016年4月4日作业
- Android Volley完全解析(一),初识Volley的基本用法
- 用OpenInventor实现的NeHe OpenGL教程-第四十三课
- LeetCode 191. Number of 1 Bits
- 用OpenInventor实现的NeHe OpenGL教程-第四十二课