您的位置:首页 > 产品设计 > UI/UE

storm 开发系列二 Clojue版本

2015-10-17 19:37 531 查看
对于第一个简单的topology,现在用clojure实现一遍。正好可以对比一下。

创建工程

用lein app模板创建工程

demo$ lein new app hello_storm_clj
Generating a project called hello_storm_clj based on the 'app' template.
注意,app是lein提供的模板名称,以下是所有模板
Subtasks available:
default A general project template for libraries.
plugin A leiningen plugin project template.
app An application project template.
template A meta-template for 'lein new' templates.

支持cider

在project.clj中添加一行配置,使得可以使用Emacs cider进行开发
:plugins [[cider/cider-nrepl "0.10.0-SNAPSHOT"]]

运行

hello_storm_clj$ lein run
Hello, World!一切正常。

编写程序

程序都写在core.clj文件中

编写hello-spout

(defspout hello-spout ["word"]
[conf context collector]
(let [values ["china" "usa" "japan" "russia" "england"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth values)])
)
)))
说明

1. defspout是定义spout的函数
第一个参数是spout名称,第二个是output declaration,,所以用vector [ ....]的形式表示多个keys,可以有多个,比如["key1" "key2], 第三个参数是个vector, 里面包含了topology config, topology context 和 output collector。
2. 变量values是个vector, 里面准备好了几个字符串
3. (spout ...) 里面运行的是nextTuple动作,和Java一样

4. emit-spout! 里面使用了rand-nth随机选取values的元素,然后发射出去

编写hello-bolt

(defbolt hello-bolt ["word"] [tuple collector]
(let [v (.getString tuple 0)]
(emit-bolt! collector [(str "hello," v)] :anchor tuple)
(ack! collector tuple)
))

说明

1. (defbolt ...) 定义一个bolt
第一个参数是bolt名称,第二个是output declaration, 和前面spout一样,第三个参数也是vector, 包含了上游传递进来的tuple和输出需要的collector。

2. emit-bolt!

注意,第二个参数是vector, 用来发射多个values

最后两个参数是用来跟踪tuple, 将收到的tuple和发出的tuple通过:anchor链接起来,起到跟踪和保证消息被处理的机制。如果不需要保证消息一定被处理,这两个参数可以不用。而变成
(emit-bolt! collector (str "hello, " value))

组装topology

(defn mk-topology []
(topology
{"a" (spout-spec hello-spout :p 10)}
{"b" (bolt-spec {"a" :shuffle} hello-bolt :p 5)}))
10 和 5 分别是parralism。

添加依赖

在project.clj中添加依赖,依赖参考pom.xml的maven依赖设置,org.apache.storm是groupId, storm-core是artifactId

:dependencies [[org.clojure/clojure "1.5.1"] [org.apache.storm/storm-core "0.9.5"]]注意,这里clojure jar的版本必须降级使用,高版本会报bug

291 [Thread-8] ERROR backtype.storm.event - Error when processing event
java.lang.IllegalStateException: Attempting to call unbound fn: #'backtype.storm.util/some?

本地运行

(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "hello_clj" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
然后启动M-x cider-jack-in, 运行(run-local!),观察buffer . * *nrepl-server l的输出,内容很多,下面只是一部分
35197 [Thread-28-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
35197 [Thread-44-b] INFO backtype.storm.daemon.executor - Processing received message source: a:7, stream: default, id: {}, ["china"]
35223 [Thread-30-a] INFO backtype.storm.daemon.task - Emitting: a default ["japan"]
35223 [Thread-32-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
35223 [Thread-46-b] INFO backtype.storm.daemon.executor - Processing received message source: a:8, stream: default, id: {}, ["japan"]
35223 [Thread-46-b] INFO backtype.storm.daemon.executor - Processing received message source: a:9, stream: default, id: {}, ["china"]
35234 [Thread-34-a] INFO backtype.storm.daemon.task - Emitting: a default ["japan"]
35235 [Thread-42-b] INFO backtype.storm.daemon.executor - Processing received message source: a:10, stream: default, id: {}, ["japan"]
35240 [Thread-36-a] INFO backtype.storm.daemon.task - Emitting: a default ["china"]
执行成功。

参考文档

https://storm.apache.org/documentation/Clojure-DSL.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm clojure