storm 开发系列二 Clojue版本
2015-10-17 19:37
531 查看
对于第一个简单的topology,现在用clojure实现一遍。正好可以对比一下。
用lein app模板创建工程
demo$ lein new app hello_storm_clj
Generating a project called hello_storm_clj based on the 'app' template.
注意,app是lein提供的模板名称,以下是所有模板
Subtasks available:
default A general project template for libraries.
plugin A leiningen plugin project template.
app An application project template.
template A meta-template for 'lein new' templates.
:plugins [[cider/cider-nrepl "0.10.0-SNAPSHOT"]]
Hello, World!一切正常。
[conf context collector]
(let [values ["china" "usa" "japan" "russia" "england"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth values)])
)
)))
说明
1. defspout是定义spout的函数
第一个参数是spout名称,第二个是output declaration,,所以用vector [ ....]的形式表示多个keys,可以有多个,比如["key1" "key2], 第三个参数是个vector, 里面包含了topology config, topology context 和 output collector。
2. 变量values是个vector, 里面准备好了几个字符串
3. (spout ...) 里面运行的是nextTuple动作,和Java一样
4. emit-spout! 里面使用了rand-nth随机选取values的元素,然后发射出去
(let [v (.getString tuple 0)]
(emit-bolt! collector [(str "hello," v)] :anchor tuple)
(ack! collector tuple)
))
说明
1. (defbolt ...) 定义一个bolt
第一个参数是bolt名称,第二个是output declaration, 和前面spout一样,第三个参数也是vector, 包含了上游传递进来的tuple和输出需要的collector。
2. emit-bolt!
注意,第二个参数是vector, 用来发射多个values
最后两个参数是用来跟踪tuple, 将收到的tuple和发出的tuple通过:anchor链接起来,起到跟踪和保证消息被处理的机制。如果不需要保证消息一定被处理,这两个参数可以不用。而变成
(emit-bolt! collector (str "hello, " value))
(topology
{"a" (spout-spec hello-spout :p 10)}
{"b" (bolt-spec {"a" :shuffle} hello-bolt :p 5)}))
10 和 5 分别是parralism。
:dependencies [[org.clojure/clojure "1.5.1"] [org.apache.storm/storm-core "0.9.5"]]注意,这里clojure jar的版本必须降级使用,高版本会报bug
291 [Thread-8] ERROR backtype.storm.event - Error when processing event
java.lang.IllegalStateException: Attempting to call unbound fn: #'backtype.storm.util/some?
(let [cluster (LocalCluster.)]
(.submitTopology cluster "hello_clj" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
然后启动M-x cider-jack-in, 运行(run-local!),观察buffer . * *nrepl-server l的输出,内容很多,下面只是一部分
35197 [Thread-28-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
35197 [Thread-44-b] INFO backtype.storm.daemon.executor - Processing received message source: a:7, stream: default, id: {}, ["china"]
35223 [Thread-30-a] INFO backtype.storm.daemon.task - Emitting: a default ["japan"]
35223 [Thread-32-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
35223 [Thread-46-b] INFO backtype.storm.daemon.executor - Processing received message source: a:8, stream: default, id: {}, ["japan"]
35223 [Thread-46-b] INFO backtype.storm.daemon.executor - Processing received message source: a:9, stream: default, id: {}, ["china"]
35234 [Thread-34-a] INFO backtype.storm.daemon.task - Emitting: a default ["japan"]
35235 [Thread-42-b] INFO backtype.storm.daemon.executor - Processing received message source: a:10, stream: default, id: {}, ["japan"]
35240 [Thread-36-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
执行成功。
创建工程
用lein app模板创建工程
demo$ lein new app hello_storm_cljGenerating a project called hello_storm_clj based on the 'app' template.
注意,app是lein提供的模板名称,以下是所有模板
Subtasks available:
default A general project template for libraries.
plugin A leiningen plugin project template.
app An application project template.
template A meta-template for 'lein new' templates.
支持cider
在project.clj中添加一行配置,使得可以使用Emacs cider进行开发:plugins [[cider/cider-nrepl "0.10.0-SNAPSHOT"]]
运行
hello_storm_clj$ lein runHello, World!一切正常。
编写程序
程序都写在core.clj文件中编写hello-spout
(defspout hello-spout ["word"][conf context collector]
(let [values ["china" "usa" "japan" "russia" "england"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth values)])
)
)))
说明
1. defspout是定义spout的函数
第一个参数是spout名称,第二个是output declaration,,所以用vector [ ....]的形式表示多个keys,可以有多个,比如["key1" "key2], 第三个参数是个vector, 里面包含了topology config, topology context 和 output collector。
2. 变量values是个vector, 里面准备好了几个字符串
3. (spout ...) 里面运行的是nextTuple动作,和Java一样
4. emit-spout! 里面使用了rand-nth随机选取values的元素,然后发射出去
编写hello-bolt
(defbolt hello-bolt ["word"] [tuple collector](let [v (.getString tuple 0)]
(emit-bolt! collector [(str "hello," v)] :anchor tuple)
(ack! collector tuple)
))
说明
1. (defbolt ...) 定义一个bolt
第一个参数是bolt名称,第二个是output declaration, 和前面spout一样,第三个参数也是vector, 包含了上游传递进来的tuple和输出需要的collector。
2. emit-bolt!
注意,第二个参数是vector, 用来发射多个values
最后两个参数是用来跟踪tuple, 将收到的tuple和发出的tuple通过:anchor链接起来,起到跟踪和保证消息被处理的机制。如果不需要保证消息一定被处理,这两个参数可以不用。而变成
(emit-bolt! collector (str "hello, " value))
组装topology
(defn mk-topology [](topology
{"a" (spout-spec hello-spout :p 10)}
{"b" (bolt-spec {"a" :shuffle} hello-bolt :p 5)}))
10 和 5 分别是parralism。
添加依赖
在project.clj中添加依赖,依赖参考pom.xml的maven依赖设置,org.apache.storm是groupId, storm-core是artifactId:dependencies [[org.clojure/clojure "1.5.1"] [org.apache.storm/storm-core "0.9.5"]]注意,这里clojure jar的版本必须降级使用,高版本会报bug
291 [Thread-8] ERROR backtype.storm.event - Error when processing event
java.lang.IllegalStateException: Attempting to call unbound fn: #'backtype.storm.util/some?
本地运行
(defn run-local! [](let [cluster (LocalCluster.)]
(.submitTopology cluster "hello_clj" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
然后启动M-x cider-jack-in, 运行(run-local!),观察buffer . * *nrepl-server l的输出,内容很多,下面只是一部分
35197 [Thread-28-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
35197 [Thread-44-b] INFO backtype.storm.daemon.executor - Processing received message source: a:7, stream: default, id: {}, ["china"]
35223 [Thread-30-a] INFO backtype.storm.daemon.task - Emitting: a default ["japan"]
35223 [Thread-32-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
35223 [Thread-46-b] INFO backtype.storm.daemon.executor - Processing received message source: a:8, stream: default, id: {}, ["japan"]
35223 [Thread-46-b] INFO backtype.storm.daemon.executor - Processing received message source: a:9, stream: default, id: {}, ["china"]
35234 [Thread-34-a] INFO backtype.storm.daemon.task - Emitting: a default ["japan"]
35235 [Thread-42-b] INFO backtype.storm.daemon.executor - Processing received message source: a:10, stream: default, id: {}, ["japan"]
35240 [Thread-36-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
执行成功。
参考文档
https://storm.apache.org/documentation/Clojure-DSL.html相关文章推荐
- Release Notes - Apache Storm - Version 0.9.2-incub
- C/C++实现对STORM运行信息查看及控制的方法
- 基于Storm的Nginx log实时监控系统
- 整合Kafka到Spark Streaming——代码示例和挑战
- Clojure基础环境搭建
- 大白话storm
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- storm
- Storm配置项详解
- Twitter Storm 安装篇
- Storm入门教程 Storm安装部署步骤
- Storm常见问题及解决方法收集
- storm 配置项详解
- storm 删数据后上传topology无法启动?
- TowerMadness之Brewing Storm攻略 Blizzardgale
- Ubuntu 12.04 中安装storm
- Storm是如何做到事务一致的研究
- 流式计算框架:Storm VS Spark Streaming