用Storm轻松实时大数据分析【翻译】
2015-11-02 17:20
288 查看
简单易用,Storm让大数据分析变得轻而易举。
如今,公司在日常运作中经常会产生TB(terabytes)级的数据。数据来源包括从网络传感器捕获的,到Web,社交媒体,交易型业务数据,以及其他业务环境中创建的数据。考虑到数据的生成量,实时计算(real-timecomputation)已成为很多组织面临的一个巨大挑战。我们已经有效地使用了一个可扩展的实时计算系统——开源的
本文介绍了如何使用Storm。示例项目称之为“超速警报系统(SpeedingAlertSystem),”分析实时数据,当车速超过一个预定义的阈值(threshold)时,触发一个trigger,相关数据就会保存到数据库中。
[title2]什么是Storm[/title2]
Hadoop依靠批量处理(batchprocessing),而Storm是一个实时的(real-time),分布式的(distributed),容错的(fault-tolerant),计算系统。像Hadoop,它可以保证可靠性处理大量的数据,但不能实时;也就是说,每个消息都将被处理。Storm也提供这些特性,如容错,分布式计算,这些使它适合在不同机器上处理大规模数据。它还具有如下特性:简扩展。若想扩展,你只需添加设备和改变topology的并行性设置。用于集群协调的HadoopZookeeper用在Storm使得它非常容易扩展。
保证每个消息都被处理。
Storm集群(cluster)很容易管理。
容错性:一旦topology被提交,Storm运行topology,直到它被杀掉或集群被关闭。此外,如果执行期间发生错误,那么重新分配的任务由Storm处理。
Storm的topology可以用任何语言定义,但通常还是用Java。
文章接下来的部分,你首先需要安装和建立Storm。步骤如下:
Storm官方站点下载
解压,将bin/添加到你的环境变量PATH,保证bin/storm脚本可执行。
Hadoop与Storm的概念对比,如下表所示:
Hadoop | Storm |
JobTracker | Nimbus |
TaskTracker | Supervisor |
Child | Worker |
Job | Topology |
Mapper/Reducer | Spout/Bolt |
1.信息流处理(Streamprocessing)。Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性。即Storm可以用来处理源源不断流进来的消息,处理之后将结果写入到某个存储中去。
2.连续计算(Continuouscomputation)。Storm可进行连续查询并把结果即时反馈给客户端。比如把Twitter上的热门话题发送到浏览器中。
3.分布式远程程序调用(DistributedRPC)。Storm可用来并行处理密集查询。Storm的拓扑结构是一个等待调用信息的分布函数,当它收到一条调用信息后,会对查询进行计算,并返回查询结果。举个例子DistributedRPC可以做并行搜索或者处理大集合的数据。
通过配置drpc服务器,将storm的topology发布为drpc服务。客户端程序可以调用drpc服务将数据发送到storm集群中,并接收处理结果的反馈。这种方式需要drpc服务器进行转发,其中drpc服务器底层通过thrift实现。适合的业务场景主要是实时计算。并且扩展性良好,可以增加每个节点的工作worker数量来动态扩展。
4.项目实施,构建Topology。
Storm组件
Storm集群主要由主节点(master)和工作节点(workernode)组成,它们通过Zookeeper进行协调。Nimbus类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。主节点(master)——Nimbus
主节点运行一个守护进程(daemon),Nimbus,它负责在集群中分布代码,分配任务(Task)并监测故障。它类似于Hadoop的JobTracker。工作者节点(workernode)——Supervisor
工作节点同样会运行一个守护进程,Supervisor,它监听已分配的工作,并按要求运行工作进程。每个工作节点都执行一个topology的子集。Nimbus和Supervisor之间的协调是由Zookeeper或其集群来管理。Zookeeper
Zookeeper负责Supervisor和Nimbus之间的协调。一个实时应用程序的逻辑被封装到一个Storm的“topology”中。一个topology是由一组spouts(数据源)和bolts(数据操作)组成,通过StreamGroupings连接(协调)。下面更进一步说明这些术语。Spout
简单来说,一个spout在topology中从一个源中读取数据。spout可以是可靠的,也可以是不可靠的。如果Storm处理失败,那么一个可靠的spout可以确保重新发送元组(它是一个数据项的有序列表)。一个不可靠的spout,元组一旦发送,它不会跟踪。spout中的主要方法是nextTuple()。该种方法或者向topology发出一个新元组,或是直接返回,如果没有什么可发。Bolt
bolt负责所有处理处理topology发生的一切。bolt可做从过滤到连接,聚合,写文件/数据库等等任何事。bolt从一个spout接收数据来处理,在复杂流转换中,它可以进一步发出元组到另一个bolt。bolt中主要方法是execute(),它接受一个元组作为输入。在spout和bolt,发动元组到更多的流,可以在declareStream()中声明和指定流。StreamGrouping
streamgrouping定义流在bolt任务之间如何被划分。Storm提供了随机分组(Shufflegrouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。
字段分组(Fieldsgrouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
全部分组(Allgrouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
全局分组(Globalgrouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
无分组(Nonegrouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
直接分组(Directgrouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
另外,还涉及其他概念。
Task
worker中每一个spout/bolt的线程称为一个task。在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。Tuple
一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以,tuple中只要按序填入各个value就行了,所以就是一个valuelist。Topology
storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。一个topology是spouts和bolts组成的图,通过streamgroupings将图中的spouts和bolts连接起来,如下图:一个topology会一直运行,直到你kill掉它,Storm自动地重新分配执行失败的任务,并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。
运行一个topology很简单。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:
[code]stormjarall-my-code.jarbacktype.storm.MyTopologyarg1arg2
[/code]
这个命令会运行主类:backtype.strom.MyTopology,参数是arg1,arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。stormjar负责连接到Nimbus并且上传jar包。
Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务,你可以提交由任何语言创建的topology。上面的方面是用JVM-based语言提交的最简单的方法。
Stream
源源不断传递的tuple就组成了stream。消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列,而这些tuple序列会以一种分布式的方式并行地创建和处理。通过对stream中tuple序列中每个字段命名来定义stream。在默认的情况下,tuple的字段类型可以是:integer,long,short,byte,string,double,float,boolean和bytearray。你也可以自定义类型(只要实现相应的序列化器)。每个消息流在定义的时候会被分配给一个id,因为单向消息流使用的相当普遍,OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会分配个值为‘default’默认的id。
Storm提供的最基本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。
实现
对于我们的示例中,我们设计了一个spout和bolt的topology,可以处理大量规模数据(日志文件),设计触发一个报警,当一个特定值超过预设阈值时。使用Storm的topology,日志文件按行读取,topology监控到来的数据。在Storm组件,spout读取到来的数据。它不仅从现存的文件中读取数据,也监控新文件。一旦文件被修改,spout读取新条目,转换为元组(一个可以被bolt读取的格式)后,把元组发出到bolt执行阈值分析,查找任何超过阈值的记录。阈值分析(ThresholdAnalysis)
本节主要集中两种类型的阈值(threshold)分析:瞬时阈值(instantthershold)和时间序列阈值(timeseriesthreshold)。瞬时阈值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个trigger。举个例子当车辆超越80公里每小时,则触发trigger。
时间序列阈值监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在5分钟内,时速超过80公里每小时两次及以上的车辆。
清单1显示一个我们使用的日志文件,它包含车辆数据信息,例如车辆号码,速度,位置。
清单1:日志文件,通过检查点的车辆信息
[code]AB123,60,Northcity
BC123,70,Southcity
CD234,40,Southcity
DE123,40,Eastcity
EF123,90,Southcity
GH123,50,Westcity
[/code]
创建相应的XML文件,它由到来的数据格式组成。用于解析日志文件。架构XML及其相应的描述如下表所示。
XML文件和日志文件都被spout随时监测,本例使用的topology如下图所示。
图1:Storm中创建的topology,以处理实时数据
如图1所示,FilelistenerSpout接收输入日志,并逐行读取,把数据发送给ThresoldCalculatorBolt进一步的阈值处理。一旦处理完成,根据阈值计算的行被发动到DBWriterBolt,持久化到数据库(或发出报警)。这个过程的具体实现将在下面介绍。
Spout实现
spout把日志文件和XML描述符文件作为输入。该XML文件指定了日志文件的格式。现在考虑一个例子的日志文件,它包含车辆信息,如车辆号码,速度,位置等三个信息。如图2所示。图2:数据从日志文件到spout的流程图
列表2显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。
列表2:用以描述日志文件的XML文件
[code]<TUPLEINFO>
<FIELDLIST>
<FIELD>
<COLUMNNAME>vehicle_number</COLUMNNAME>
<COLUMNTYPE>string</COLUMNTYPE>
</FIELD>
<FIELD>
<COLUMNNAME>speed</COLUMNNAME>
<COLUMNTYPE>int</COLUMNTYPE>
</FIELD>
<FIELD>
<COLUMNNAME>location</COLUMNNAME>
<COLUMNTYPE>string</COLUMNTYPE>
</FIELD>
</FIELDLIST>
<DELIMITER>,</DELIMITER>
</TUPLEINFO>
[/code]
构造函数用参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存与日志文件相关的必要信息,如字段、分隔符、字段类型等。该对象通过
Spout实现步骤:
监听一个单独日志文件的变化。监控目录是否添加新的日志文件。
声明字段后,把spout读取行转换成tuple。
声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的方式。
Spout代码如下列表3所示。
列表3:Spout中open、nextTuple和delcareOutputFields方法
[code]publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector)
{
_collector=collector;
try
{
fileReader=newBufferedReader(newFileReader(newFile(file)));
}
catch(FileNotFoundExceptione)
{
System.exit(1);
}
}
publicvoidnextTuple()
{
protectedvoidListenFile(Filefile)
{
Utils.sleep(2000);
RandomAccessFileaccess=null;
Stringline=null;
try
{
while((line=access.readLine())!=null)
{
if(line!=null)
{
String[]fields=null;
if(tupleInfo.getDelimiter().equals("|"))
fields=line.split("\\"+tupleInfo.getDelimiter());
elsefields=line.split(tupleInfo.getDelimiter());
if(tupleInfo.getFieldList().size()==fields.length)
_collector.emit(newValues(fields));
}
}
}
catch(IOExceptionex){}
}
}
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{
String[]fieldsArr=newString[tupleInfo.getFieldList().size()];
for(inti=0;i<tupleInfo.getFieldList().size();i++)
{
fieldsArr[i]=tupleInfo.getFieldList().get(i).getColumnName();
}
declarer.declare(newFields(fieldsArr));
}
[/code]
declareOutputFileds()决定tuple发送的格式,这样,Bolt就能用类似的方式编码tuple。Spout持续监听添加到日志文件的数据,一旦有数据添加,它就读取并把数据发送给bolt处理。
Bolt实现
Spout输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如图3中的两个Bolt。图3:Spout到Bolt的数据流程
ThresholdCalculatorBolt
Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:临界值检查
阈值栏数检查(拆分成字段的数目)
阈值数据类型(拆分后字段的类型)
阈值出现的频数
阈值时间段检查
列表4中的类,定义用来保存这些值。
[code]publicclassThresholdInfoimplementsSerializable
{
privateStringaction;
privateStringrule;
privateObjectthresholdValue;
privateintthresholdColNumber;
privateIntegertimeWindow;
privateintfrequencyOfOccurence;
}
[/code]
基于字段中提供的值,阈值检查将被在execute()方法执行,如列表5所示。代码大部分的功能是解析和检测到来的值。
列表5:阈值检测代码段
[code]publicvoidexecute(Tupletuple,BasicOutputCollectorcollector)
{
if(tuple!=null)
{
List<Object>inputTupleList=(List<Object>)tuple.getValues();
intthresholdColNum=thresholdInfo.getThresholdColNumber();
ObjectthresholdValue=thresholdInfo.getThresholdValue();
StringthresholdDataType=
tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();
IntegertimeWindow=thresholdInfo.getTimeWindow();
intfrequency=thresholdInfo.getFrequencyOfOccurence();
if(thresholdDataType.equalsIgnoreCase("string"))
{
StringvalueToCheck=inputTupleList.get(thresholdColNum-1).toString();
StringfrequencyChkOp=thresholdInfo.getAction();
if(timeWindow!=null)
{
longcurTime=System.currentTimeMillis();
longdiffInMinutes=(curTime-startTime)/(1000);
if(diffInMinutes>=timeWindow)
{
if(frequencyChkOp.equals("=="))
{
if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get()>frequency)
splitAndEmit(inputTupleList,collector);
}
}
elseif(frequencyChkOp.equals("!="))
{
if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get()>frequency)
splitAndEmit(inputTupleList,collector);
}
}
else
System.out.println("Operatornotsupported");
}
}
else
{
if(frequencyChkOp.equals("=="))
{
if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get()>frequency)
splitAndEmit(inputTupleList,collector);
}
}
elseif(frequencyChkOp.equals("!="))
{
if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get()>frequency)
splitAndEmit(inputTupleList,collector);
}
}
}
}
elseif(thresholdDataType.equalsIgnoreCase("int")||
thresholdDataType.equalsIgnoreCase("double")||
thresholdDataType.equalsIgnoreCase("float")||
thresholdDataType.equalsIgnoreCase("long")||
thresholdDataType.equalsIgnoreCase("short"))
{
StringfrequencyChkOp=thresholdInfo.getAction();
if(timeWindow!=null)
{
longvalueToCheck=
Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());
longcurTime=System.currentTimeMillis();
longdiffInMinutes=(curTime-startTime)/(1000);
System.out.println("Differenceinminutes="+diffInMinutes);
if(diffInMinutes>=timeWindow)
{
if(frequencyChkOp.equals("<"))
{
if(valueToCheck<Double.parseDouble(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get()>frequency)
splitAndEmit(inputTupleList,collector);
}
}
elseif(frequencyChkOp.equals(">"))
{
if(valueToCheck>Double.parseDouble(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get()>frequency)
splitAndEmit(inputTupleList,collector);
}
}
elseif(frequencyChkOp.equals("=="))
{
if(valueToCheck==Double.parseDouble(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get()>frequency)
splitAndEmit(inputTupleList,collector);
}
}
elseif(frequencyChkOp.equals("!="))
{
...
}
}
}
else
splitAndEmit(null,collector);
}
else
{
System.err.println("Emittingnullinbolt");
splitAndEmit(null,collector);
}
}
[/code]
根据阈值bolt发送的tuple被发送到下一个相应的Bolt,在我们的用例中是DBWriterBolt。
DBWriterBolt
已经处理的tuple必须被持久化,以便于触发tigger或者将来使用。DBWiterBolt完成的工作是将tuple持久化到数据库。表的建立是由prepare()完成,这也是topology调用的第一个方法。该方法的代码如列表6所示。列表6:创建表的代码
[code]publicvoidprepare(MapStormConf,TopologyContextcontext)
{
try
{
Class.forName(dbClass);
}
catch(ClassNotFoundExceptione)
{
System.out.println("Drivernotfound");
e.printStackTrace();
}
try
{
connectiondriverManager.getConnection(
"jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName,userName,pwd);
connection.prepareStatement("DROPTABLEIFEXISTS"+tableName).execute();
StringBuildercreateQuery=newStringBuilder(
"CREATETABLEIFNOTEXISTS"+tableName+"(");
for(Fieldfields:tupleInfo.getFieldList())
{
if(fields.getColumnType().equalsIgnoreCase("String"))
createQuery.append(fields.getColumnName()+"VARCHAR(500),");
else
createQuery.append(fields.getColumnName()+""+fields.getColumnType()+",");
}
createQuery.append("thresholdTimeStamptimestamp)");
connection.prepareStatement(createQuery.toString()).execute();
//InsertQuery
StringBuilderinsertQuery=newStringBuilder("INSERTINTO"+tableName+"(");
StringtempCreateQuery=newString();
for(Fieldfields:tupleInfo.getFieldList())
{
insertQuery.append(fields.getColumnName()+",");
}
insertQuery.append("thresholdTimeStamp").append(")values(");
for(Fieldfields:tupleInfo.getFieldList())
{
insertQuery.append("?,");
}
insertQuery.append("?)");
prepStatement=connection.prepareStatement(insertQuery.toString());
}
catch(SQLExceptione)
{
e.printStackTrace();
}
}
[/code]
数据的插入是分批次完成的。插入的逻辑由execute()方法提供,如列表7所示。大部分代码是解析各种不同输入类型。
列表7:数据插入的代码部分
[code]publicvoidexecute(Tupletuple,BasicOutputCollectorcollector)
{
batchExecuted=false;
if(tuple!=null)
{
List<Object>inputTupleList=(List<Object>)tuple.getValues();
intdbIndex=0;
for(inti=0;i<tupleInfo.getFieldList().size();i++)
{
Fieldfield=tupleInfo.getFieldList().get(i);
try{
dbIndex=i+1;
if(field.getColumnType().equalsIgnoreCase("String"))
prepStatement.setString(dbIndex,inputTupleList.get(i).toString());
elseif(field.getColumnType().equalsIgnoreCase("int"))
prepStatement.setInt(dbIndex,
Integer.parseInt(inputTupleList.get(i).toString()));
elseif(field.getColumnType().equalsIgnoreCase("long"))
prepStatement.setLong(dbIndex,
Long.parseLong(inputTupleList.get(i).toString()));
elseif(field.getColumnType().equalsIgnoreCase("float"))
prepStatement.setFloat(dbIndex,
Float.parseFloat(inputTupleList.get(i).toString()));
elseif(field.getColumnType().equalsIgnoreCase("double"))
prepStatement.setDouble(dbIndex,
Double.parseDouble(inputTupleList.get(i).toString()));
elseif(field.getColumnType().equalsIgnoreCase("short"))
prepStatement.setShort(dbIndex,
Short.parseShort(inputTupleList.get(i).toString()));
elseif(field.getColumnType().equalsIgnoreCase("boolean"))
prepStatement.setBoolean(dbIndex,
Boolean.parseBoolean(inputTupleList.get(i).toString()));
elseif(field.getColumnType().equalsIgnoreCase("byte"))
prepStatement.setByte(dbIndex,
Byte.parseByte(inputTupleList.get(i).toString()));
elseif(field.getColumnType().equalsIgnoreCase("Date"))
{
DatedateToAdd=null;
if(!(inputTupleList.get(i)instanceofDate))
{
DateFormatdf=newSimpleDateFormat("yyyy-MM-ddhh:mm:ss");
try
{
dateToAdd=df.parse(inputTupleList.get(i).toString());
}
catch(ParseExceptione)
{
System.err.println("Datatypenotvalid");
}
}
else
{
dateToAdd=(Date)inputTupleList.get(i);
java.sql.DatesqlDate=newjava.sql.Date(dateToAdd.getTime());
prepStatement.setDate(dbIndex,sqlDate);
}
}
catch(SQLExceptione)
{
e.printStackTrace();
}
}
Datenow=newDate();
try
{
prepStatement.setTimestamp(dbIndex+1,newjava.sql.Timestamp(now.getTime()));
prepStatement.addBatch();
counter.incrementAndGet();
if(counter.get()==batchSize)
executeBatch();
}
catch(SQLExceptione1)
{
e1.printStackTrace();
}
}
else
{
longcurTime=System.currentTimeMillis();
longdiffInSeconds=(curTime-startTime)/(60*1000);
if(counter.get()<batchSize&&diffInSeconds>batchTimeWindowInSeconds)
{
try{
executeBatch();
startTime=System.currentTimeMillis();
}
catch(SQLExceptione){
e.printStackTrace();
}
}
}
}
publicvoidexecuteBatch()throwsSQLException
{
batchExecuted=true;
prepStatement.executeBatch();
counter=newAtomicInteger(0);
}
[/code]
一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并执行。下面就来看一下执行步骤。
在本地集群上运行和测试topology
通过TopologyBuilder建立topology。使用StormSubmitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。
提交topology。
列表8:建立和执行topology
[code]publicclassStormMain
{
publicstaticvoidmain(String[]args)throwsAlreadyAliveException,
InvalidTopologyException,
InterruptedException
{
ParallelFileSpoutparallelFileSpout=newParallelFileSpout();
ThresholdBoltthresholdBolt=newThresholdBolt();
DBWriterBoltdbWriterBolt=newDBWriterBolt();
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("spout",parallelFileSpout,1);
builder.setBolt("thresholdBolt",thresholdBolt,1).shuffleGrouping("spout");
builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");
if(this.argsMain!=null&&this.argsMain.length>0)
{
conf.setNumWorkers(1);
StormSubmitter.submitTopology(
this.argsMain[0],conf,builder.createTopology());
}
else
{
Configconf=newConfig();
conf.setDebug(true);
conf.setMaxTaskParallelism(3);
LocalClustercluster=newLocalCluster();
cluster.submitTopology(
"Threshold_Test",conf,builder.createTopology());
}
}
}
[/code]
创建topology后,提交到本地集群。一旦topology被提交,除非被kill或者因为修改而关闭集群,否则它将一直运行。这也是Storm一大优势之一。
本例展示建立和使用Storm,一旦你理解topology、spout和bolt这些基本概念,将会很容易。如果你处理大数据,又不想用Hadoop,那么使用Storm是一个很好的选择。
[title2]Storm常见问题解答[/title2]
一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算?你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算。怎么实现spout可以参考官方的kestrelspout实现:
如果你的数据源不支持事务性消费,那么就无法得到storm提供的可靠处理的保证,也没必要实现ISpout接口中的ack和fail方法。
二、Storm为了保证tuple的可靠处理,需要保存tuple信息,这会不会导致内存OOM?
Storm为了保证tuple的可靠处理,acker会保存该节点创建的tupleid的xor值,这称为ackvalue,那么每ack一次,就将tupleid和ackvalue做异或(xor)。当所有产生的tuple都被ack的时候,ackvalue一定为0。这是个很简单的策略,对于每一个tuple也只要占用约20个字节的内存。对于100万tuple,也才20M左右。关于可靠处理看这个:
三、Storm计算后的结果保存在哪里?可以保存在外部存储吗?
Storm不处理计算结果的保存,这是应用代码需要负责的事情,如果数据不大,你可以简单地保存在内存里,也可以每次都更新数据库,也可以采用NoSQL存储。storm并没有像s4那样提供一个PersistAPI,根据时间或者容量来做存储输出。这部分事情完全交给用户。
数据存储之后的展现,也是你需要自己处理的,stormUI只提供对topology的监控和统计。
四、Storm怎么处理重复的tuple?
因为Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并重新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:
(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后续的批处理计算会更正实时计算的误差。
(2)使用第三方集中存储来过滤,比如利用mysql,memcached或者redis根据逻辑主键来去重。
(3)使用bloomfilter做过滤,简单高效。
五、Storm的动态增删节点
我在storm和s4里比较里谈到的动态增删节点,是指storm可以动态地添加和减少supervisor节点。对于减少节点来说,被移除的supervisor上的worker会被nimbus重新负载均衡到其他supervisor节点上。在storm0.6.1以前的版本,增加supervisor节点不会影响现有的topology,也就是现有的topology不会重新负载均衡到新的节点上,在扩展集群的时候很不方便,需要重新提交topology。因此我在storm的邮件列表里提了这个问题,storm的开发者nathanmarz创建了一个issue54并在0.6.1提供了rebalance命令来让正在运行的topology重新负载均衡,具体见:
和0.6.1的变更:
storm并不提供机制来动态调整worker和task数目。
六、StormUI里spout统计的completelatency的具体含义是什么?为什么emit的数目会是acked的两倍?
这个事实上是storm邮件列表里的一个问题。Storm作者marz的解答:
Thecompletelatencyisthetime fromthespoutemittingatupletothattuplebeingackedonthespout.Soittracksthetime forthewholetupletreetobeprocessed.
IfyoudiveintothespoutcomponentintheUI,you'llseethatalotoftheemitted/transferredisonthe__ack*stream. Thisisthespoutcommunicatingwiththeackerswhichtakecareoftrackingthetupletrees.
简单地说,completelatency表示了tuple从emit到被acked经过的时间,可以认为是tuple以及该tuple的后续子孙(形成一棵树)整个处理时间。其次spout的emit和transfered还统计了spout和acker之间内部的通信信息,比如对于可靠处理的spout来说,会在emit的时候同时发送一个_ack_init给acker,记录tupleid到taskid的映射,以便ack的时候能找到正确的ackertask。
[title2]其他开源的大数据解决方案[/title2]
自Google在2004年推出MapReduce范式以来,已诞生了多个使用原始MapReduce范式(或拥有该范式的质量)的解决方案。Google对MapReduce的最初应用是建立万维网的索引。尽管此应用程序仍然很流行,但这个简单模型解决的问题也正在增多。表1提供了一个可用开源大数据解决方案的列表,包括传统的批处理和流式处理应用程序。在将Storm引入开源之前将近一年的时间里,Yahoo!的S4分布式流计算平台已向Apache开源。S4于2010年10月发布,它提供了一个高性能计算(HPC)平台,向应用程序开发人员隐藏了并行处理的复杂性。S4实现了一个可扩展的、分散化的集群架构,并纳入了部分容错功能。
表1.开源大数据解决方案
解决方案 | 开发商 | 类型 | 描述 |
Storm | Twitter | 流式处理 | Twitter的新流式大数据分析解决方案 |
S4 | Yahoo! | 流式处理 | 来自Yahoo!的分布式流计算平台 |
Hadoop | Apache | 批处理 | MapReduce范式的第一个开源实现 |
Spark | UCBerkeleyAMPLab | 批处理 | 支持内存中数据集和恢复能力的最新分析平台 |
Disco | Nokia | 批处理 | Nokia的分布式MapReduce框架 |
HPCC | LexisNexis | 批处理 | HPC大数据集群 |