storm trident实战 filter,function的使用
2017-01-10 13:25
357 查看
一、storm trident filter
Filter通过返回true,和false。来判断是否对信息过滤
1.1 代码
Java代码
public static void main(String[] args) throws InterruptedException,
AlreadyAliveException, InvalidTopologyException,
AuthorizationException {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"),
1, new Values(1, 2), new Values(4, 1),
new Values(3, 0));
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a"), new MyFilter())
.each(new Fields("a", "b"), new PrintFilterBolt(),new Fields(""));
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(1);
config.setDebug(false);
StormSubmitter.submitTopology("trident_filter", config,
topology.build());
}
MyFilter:
Java代码
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
public class MyFilter extends BaseFilter {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) == 1;
}
}
PrintFilterBolt:
Java代码
public class PrintFilterBolt extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
int firstIndex = tuple.getInteger(0);
int secondIndex = tuple.getInteger(1);
List<Integer> list = new ArrayList<Integer>();
list.add(firstIndex);
list.add(secondIndex);
System.out.println("after storm filter opertition change is : "
+ list.toString());
}
}
运行结果:
Java代码
2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]
二、Storm trident function
函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。
2.1 代码
Java代码
public static void main(String[] args) throws InterruptedException,
AlreadyAliveException, InvalidTopologyException,
AuthorizationException {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"),
1, new Values(1, 2, 3), new Values(4, 1, 6),
new Values(3, 0, 8));
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("b"), new MyFunction(), new Fields("d"))
.each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(),
new Fields(""));
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(1);
config.setDebug(false);
StormSubmitter.submitTopology("trident_function", config,
topology.build());
}
MyFunction:
Java代码
public class MyFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 1L;
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
PrintFunctionBolt:
伦理片 http://www.dotdy.com/
Java代码
public class PrintFunctionBolt extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
int firstIndex = tuple.getInteger(0);
int secondIndex = tuple.getInteger(1);
int threeIndex = tuple.getInteger(2);
int fourIndex = tuple.getInteger(3);
List<Integer> list = new ArrayList<Integer>();
list.add(firstIndex);
list.add(secondIndex);
list.add(threeIndex);
list.add(fourIndex);
System.out.println("after storm function opertition change is : " +list.toString());
}
}
运行效果:
Java代码
2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000
2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0]
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1]
2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]
Filter通过返回true,和false。来判断是否对信息过滤
1.1 代码
Java代码
public static void main(String[] args) throws InterruptedException,
AlreadyAliveException, InvalidTopologyException,
AuthorizationException {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"),
1, new Values(1, 2), new Values(4, 1),
new Values(3, 0));
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a"), new MyFilter())
.each(new Fields("a", "b"), new PrintFilterBolt(),new Fields(""));
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(1);
config.setDebug(false);
StormSubmitter.submitTopology("trident_filter", config,
topology.build());
}
MyFilter:
Java代码
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
public class MyFilter extends BaseFilter {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) == 1;
}
}
PrintFilterBolt:
Java代码
public class PrintFilterBolt extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
int firstIndex = tuple.getInteger(0);
int secondIndex = tuple.getInteger(1);
List<Integer> list = new ArrayList<Integer>();
list.add(firstIndex);
list.add(secondIndex);
System.out.println("after storm filter opertition change is : "
+ list.toString());
}
}
运行结果:
Java代码
2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]
二、Storm trident function
函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。
2.1 代码
Java代码
public static void main(String[] args) throws InterruptedException,
AlreadyAliveException, InvalidTopologyException,
AuthorizationException {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"),
1, new Values(1, 2, 3), new Values(4, 1, 6),
new Values(3, 0, 8));
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("b"), new MyFunction(), new Fields("d"))
.each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(),
new Fields(""));
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(1);
config.setDebug(false);
StormSubmitter.submitTopology("trident_function", config,
topology.build());
}
MyFunction:
Java代码
public class MyFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 1L;
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
PrintFunctionBolt:
伦理片 http://www.dotdy.com/
Java代码
public class PrintFunctionBolt extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
int firstIndex = tuple.getInteger(0);
int secondIndex = tuple.getInteger(1);
int threeIndex = tuple.getInteger(2);
int fourIndex = tuple.getInteger(3);
List<Integer> list = new ArrayList<Integer>();
list.add(firstIndex);
list.add(secondIndex);
list.add(threeIndex);
list.add(fourIndex);
System.out.println("after storm function opertition change is : " +list.toString());
}
}
运行效果:
Java代码
2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000
2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0]
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1]
2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]
相关文章推荐
- Flex 使用ArrayCollection的FilterFunction进行数据过滤
- Flex使用ArrayCollection的filterFunction属性过滤DataGrid
- scala-04For与Function进阶实战、Lazy的使用
- Go实战--go中函数(function)和方法(method)的使用(The way to go)
- Spring Boot实战之Filter实现使用JWT进行接口认证
- Flex 使用ArrayCollection的FilterFunction进行数据过滤
- Spring Boot实战之Filter实现使用JWT进行接口认证 jwt(json web token) 用户发送按照约定,向服务端发送 Header、Payload 和 Signature,
- For与Function进阶实战、Lazy的使用笔记总结
- Dt大数据梦工厂王家林老师 Scala实战详解之第4讲 For与Function进阶实战、Lazy的使用
- angularjs实战之filter使用一
- Flex 使用ArrayCollection的FilterFunction进行数据过滤
- Scala学习第四天:For与Function进阶实战、Lazy的使用
- Scala学习回顾(四)---- For与Function进阶实战、Lazy的使用
- for与function进阶实战、lazy关键字的使用
- Scala深入浅出实战经典《第88讲:Scala中使用For表达式实现map、flatMap、filter》笔记
- flex datagrid使用arraycollection中的filterFunction属性进行过滤
- 王家林亲传《DT大数据梦工厂》第四讲For与Function进阶实战、Lazy的使用
- 第4讲:For与Function进阶实战、Lazy的使用
- JavaEE开发实战--Filter的使用
- AngularJS实战之filter的使用二