您的位置:首页 > 其它

storm关于fieldsGrouping的理解

2015-09-09 16:37 267 查看
在学习storm分组的时候看到有个fieldsGrouping,也就是按字段进行分组。看了一上午还是似懂非懂的,于是自己写了个例子跑了几遍,加深了下理解。我们先看一个例子

这个是Toplogy的main方法,我们看到bolt接收的数据是按spot的field0字段进行分组,并且我们启用3个bolt线程来接收元组数据。

public class Toplogy {

public static void main(String[] args) {

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new Spot(), 1);
builder.setBolt("bolt",new Bolt() , 3).fieldsGrouping("spout", new Fields("field0"));

Config conf = new Config();
conf.setDebug(false);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("toplogy", conf, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();
}


Spout的nextTuple和declareOutputFields方法。还有就是等下我们需要发送的数据,array0和array1两个数组,他们都有11个元素。在这里我们遍历array0,并把他的每一个元素都发射出去。输出字段定义为“field0”,我们的分组也是按这个字段分的组。

public class Spot implements IRichSpout{

int count = 0;

String[] array0 =  {"aa","bb","cc","aa","bb","bb","bb","cc","dd","","jj","mm"};
String[] array1 =  {"11","22","33","11","55","22","66","11","33","","44","22"};

@Override
public void nextTuple() {

if(count>=array.length){
Utils.sleep(10000);
return;

}
collector.emit(new Values(array0[count]),array0[count]);
count++;

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("field0"));

}
}


这个是bolt的execute方法,我们把接收到的元组打印出来。我们分别打印了接收到的元组数据值和打印该值的线程名字

public class Bolt implements IRichBolt{

@Override
public void execute(Tuple input) {
System.out.println("tuple0->"+input.getString(0)+" "+ Thread.currentThread().getName());

}
}

下面是运行项目后的输出结果,我整理了下顺序,把相同的线程打印的数据放在了一起,我们会看到值相同的元组一定是在同一个bolt线程输出的,比如bb都是由Thread-13-bolt线程输出。但同一个bolt线程却可以接收不同的元组。这就是按字段分组的意义:值相同的元组,一定让同一个bolt线程来处理!
tuple0->bb  Thread-13-bolt

tuple0->bb  Thread-13-bolt

tuple0->bb  Thread-13-bolt

tuple0->bb  Thread-13-bolt

tuple0->aa  Thread-9-bolt

tuple0->aa  Thread-9-bolt

tuple0->dd     Thread-9-bolt

tuple0->jj       Thread-9-bolt

tuple0->mm  Thread-9-bolt

tuple0->cc  Thread-11-bolt

tuple0->cc  Thread-11-bolt

有人可能注意到,在spot里还有个array1没有用到,因为我们的事情还没有做完,上面讨论了元组(tuple)和输出域(declareOutputFields)只有一个值的情况,下面我们来讨论下有2个值是什么情况。

在这之前先说明一个情况,就是发射元组值的数量一定要和declareOutputFields方法里申明的输出域的值的数量要一致,不然会报错,我们下面的例子就都是2个值。

下面是spot的代码,发射的元组有两个值。

public class Spot implements IRichSpout{

int count = 0;

String[] array0 = {"aa","bb","cc","aa","bb","bb","bb","cc","dd","","jj","mm"};
String[] array1 = {"11","22","33","11","55","22","66","11","33","","44","22"};

@Override
public void nextTuple() {

if(count>=array.length){
Utils.sleep(10000);
return;

}
collector.emit(new Values(array0[count],array1[count]),array0[count]);
count++;

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("field0","field1"));

}
}
bolt把接收到的tuple的两个值和线程名字打印出来

public class Bolt implements IRichBolt{

@Override
public void execute(Tuple input) {
System.out.println("tuple0->"+input.getString(0)+"  "+"tuple1->"+input.getString(1)+"  "+Thread.currentThread().getName());

}
}
Toplogy代码我就不贴出来了,因为在Toplogy里设置字段分组时有3种情况。

1.builder.setBolt("bolt",new Bolt() , 3).fieldsGrouping("spout", new Fields("field0"));//用域的第一个数据分组,也就是array0里的数据
2.builder.setBolt("bolt",new Bolt() , 3).fieldsGrouping("spout", new Fields("field1"));//用域的第二个数据分组,也就是array2里的数据
3.builder.setBolt("bolt",new Bolt() , 3).fieldsGrouping("spout", new Fields("field0","field1"));//用域的两个数据分组

先看第一种情况的结果:元组的第一个数据相同时,那么他是被发送到同一个bolt线程处理的。

tuple0->aa  tuple1->11  Thread-9-bolt

tuple0->aa  tuple1->11  Thread-9-bolt

tuple0->dd     tuple1->33  Thread-9-bolt

tuple0->jj        tuple1->44  Thread-9-bolt

tuple0->mm   tuple1->22  Thread-9-bolt

tuple0->cc  tuple1->33  Thread-11-bolt

tuple0->cc  tuple1->11  Thread-11-bolt

tuple0->bb  tuple1->22  Thread-13-bolt

tuple0->bb  tuple1->55  Thread-13-bolt

tuple0->bb  tuple1->22  Thread-13-bolt

tuple0->bb  tuple1->66  Thread-13-bolt

这是第二种情况的结果:元组的第二个数据相同时,那么他是被发送到同一个bolt线程处理的。

tuple0->aa  tuple1->11  Thread-9-bolt

tuple0->aa  tuple1->11  Thread-9-bolt

tuple0->cc  tuple1->11  Thread-9-bolt

tuple0->bb  tuple1->22  Thread-13-bolt

tuple0->bb  tuple1->22  Thread-13-bolt

tuple0->mm  tuple1->22  Thread-13-bolt

tuple0->dd  tuple1->33  Thread-11-bolt

tuple0->cc  tuple1->33  Thread-11-bolt

tuple0->jj  tuple1->44  Thread-9-bolt

tuple0->bb  tuple1->66  Thread-11-bolt

tuple0->bb  tuple1->55  Thread-13-bolt

这是第三种情况的结果:元组的两个数据都相同时,是发送到同一个bolt线程的处理的,如果都不相同则是随机发送的。

tuple0->aa  tuple1->11  Thread-13-bolt

tuple0->aa  tuple1->11  Thread-13-bolt

tuple0->bb  tuple1->22  Thread-9-bolt

tuple0->bb  tuple1->22  Thread-9-bolt

tuple0->bb  tuple1->55  Thread-9-bolt

tuple0->bb  tuple1->66  Thread-13-bolt

tuple0->jj  tuple1->44  Thread-13-bolt

tuple0->cc  tuple1->33  Thread-11-bolt

tuple0->cc  tuple1->11  Thread-9-bolt

tuple0->dd  tuple1->33  Thread-9-bolt

tuple0->mm  tuple1->22  Thread-11-bolt
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: