您的位置:首页 > 其它

storm trident实战 filter,function的使用

2017-01-10 13:25 357 查看
一、storm trident filter

      Filter通过返回true,和false。来判断是否对信息过滤

     1.1 代码

 

Java代码  


public static void main(String[] args) throws InterruptedException,  

        AlreadyAliveException, InvalidTopologyException,  

        AuthorizationException {  

    FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"),  

            1, new Values(1, 2), new Values(4, 1),  

            new Values(3, 0));  

    spout.setCycle(false);  

    TridentTopology topology = new TridentTopology();  

    topology.newStream("spout", spout)  

            .each(new Fields("a"), new MyFilter())  

            .each(new Fields("a", "b"), new PrintFilterBolt(),new Fields(""));  

    Config config = new Config();  

    config.setNumWorkers(2);  

    config.setNumAckers(1);  

    config.setDebug(false);  

    StormSubmitter.submitTopology("trident_filter", config,  

            topology.build());  

}  

    MyFilter:

Java代码  


import org.apache.storm.trident.operation.BaseFilter;  

import org.apache.storm.trident.tuple.TridentTuple;  

  

public class MyFilter extends BaseFilter {  

  

    /** 

     *  

     */  

    private static final long serialVersionUID = 1L;  

  

    @Override  

    public boolean isKeep(TridentTuple tuple) {  

        return tuple.getInteger(0) == 1;  

    }  

      

}  

    PrintFilterBolt:

Java代码  


public class PrintFilterBolt extends BaseFunction {  

    /** 

     *  

     */  

    private static final long serialVersionUID = 1L;  

  

    @Override  

    public void execute(TridentTuple tuple, TridentCollector collector) {  

        int firstIndex = tuple.getInteger(0);  

        int secondIndex = tuple.getInteger(1);  

        List<Integer> list = new ArrayList<Integer>();  

        list.add(firstIndex);  

        list.add(secondIndex);  

        System.out.println("after storm filter opertition change is : "  

                + list.toString());  

    }  

  

}  

   运行结果:

Java代码  


2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED  

2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)  

2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]  

二、Storm trident function

       函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。

    2.1 代码 

Java代码  


public static void main(String[] args) throws InterruptedException,  

            AlreadyAliveException, InvalidTopologyException,  

            AuthorizationException {  

  

        FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"),  

                1, new Values(1, 2, 3), new Values(4, 1, 6),  

                new Values(3, 0, 8));  

        spout.setCycle(false);  

        TridentTopology topology = new TridentTopology();  

        topology.newStream("spout", spout)  

                .each(new Fields("b"), new MyFunction(), new Fields("d"))  

                .each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(),  

                        new Fields(""));  

        Config config = new Config();  

        config.setNumWorkers(2);  

        config.setNumAckers(1);  

        config.setDebug(false);  

        StormSubmitter.submitTopology("trident_function", config,  

                topology.build());  

    }  

      

   MyFunction:

  

Java代码  


public class MyFunction extends BaseFunction {  

  

    /** 

     *  

     */  

    private static final long serialVersionUID = 1L;  

      

    public void execute(TridentTuple tuple, TridentCollector collector) {  

        for(int i=0; i < tuple.getInteger(0); i++) {  

            collector.emit(new Values(i));  

        }  

    }  

  

}  

   PrintFunctionBolt:

  伦理片 http://www.dotdy.com/  

Java代码  


public class PrintFunctionBolt extends BaseFunction {  

    /** 

     *  

     */  

    private static final long serialVersionUID = 1L;  

  

    @Override  

    public void execute(TridentTuple tuple, TridentCollector collector) {  

        int firstIndex = tuple.getInteger(0);  

        int secondIndex = tuple.getInteger(1);  

        int threeIndex = tuple.getInteger(2);  

        int fourIndex = tuple.getInteger(3);  

        List<Integer> list = new ArrayList<Integer>();  

        list.add(firstIndex);  

        list.add(secondIndex);  

        list.add(threeIndex);  

        list.add(fourIndex);  

        System.out.println("after storm function opertition change is : " +list.toString());  

    }  

  

}  

    运行效果:

Java代码  


2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000  

2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED  

2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)  

2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0]  

2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1]  

2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]  

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: