您的位置:首页 > 编程语言

Storm项目:流数据监控2《代码详解…

2014-11-25 10:52 495 查看
博客公告:(1)本博客所有博客文章搬迁至《博客虫》http://www.blogchong.com/

(2)文章对应的源码下载链接参考博客虫网站首页的“代码GIT”直通车;

(3)更多的相关文章更新,以及代码等,请关注博客虫网站,网站中有技术Q群,以及代码共享链接。

 

 

 该文档为实实在在的原创文档,转载请注明:

http://blog.sina.com.cn/s/blog_8c243ea30101ir4t.html

 

 
目录

Storm项目:流数据监控 <2>. 1

---流数据监控代码详解... 1

1 项目概述... 2

1.1 数据流向... 2

1.2 代码树... 2

2 代码详解... 3

2.1 Package storm.. 3

2.2 Package storm.xml 3

2.3 Package storm.spout. 4

2.4 Package storm.base. 4

2.5 Package storm.bolt. 6

3 文档总结... 11

 

1 项目概述

 

1.1 数据流向

 

流数据监控为storm模拟项目,模拟数据源从log文件中读取数据,并逐条发射到监控Bolt中,MonitorsBolt读取配置文件MonitorBolt.xml中的匹配规则,包括正则匹配、范围匹配、常规模糊匹配及常规完全匹配,多个条件可以组合多种匹配方式,多个条件字段可以有两种不同的逻辑关系。MonitorBolt在处理数据之后(过滤出符合匹配规则的数据),发射到数据持久化Bolt中,MysqlBolt读取配置文件MysqlBolt.xml中mysql相关信息,包括mysql的host及端口,username及password,database及from,最后将数据插入mysql中。

 

1.2 代码树





 
图1.2 代码树
源码简单说明:

Storm包中为总体运行的Topology,Storm.base目前只有myslq预处理的一个类,storm.bolt为bolt处理类,包括monitorbolt及printbolt,storm.spout包中为spout源数据接口,storm.source为构造源数据的一个类(这个可以忽略),storm.xml为配置文件读取类,domain.log为源数据,MonitorBolt.xml及MyslqBolt.xml分别为配置文件。

 

2 代码详解

2.1 Package storm

 

Toplology.java:

// 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。

builder.setSpout("readlog", new ReadLogSpout(),
1);

//创建monitor监控过滤节点,指定该节点接收喷发节点的策略为随机方式。

builder.setBolt("monitor", new
MonitorBolt("MonitorBolt.xml"), 3)

        
.shuffleGrouping("readlog");

//创建mysql数据存储节点,并传入配置文件

builder.setBolt("mysql", new
MysqlBolt("MysqlBolt.xml"), 3)

                  
.shuffleGrouping("monitor");

注:该部分代码显示了整个topology的结构,每个节点与节点之间的关系(发布与订阅),并且指明了每个节点的喷发方式。

 

2.2 Package storm.xml

 

MonitorXml.java:

import java.io.File;

import javax.xml.parsers.DocumentBuilderFactory;

import javax.xml.parsers.DocumentBuilder;

import org.w3c.dom.Document;

import org.w3c.dom.Element;

import org.w3c.dom.NodeList;

File file = new File(fd);

//创建xml文件模板

DocumentBuilderFactory dbf =
DocumentBuilderFactory.newInstance();

DocumentBuilder db = dbf.newDocumentBuilder();

Document doc = db.parse(file);

//将Parameter里的项存入list中

NodeList nl = doc.getElementsByTagName("Parameter");

//从list的item中获取参数值

Element e = (Element) nl.item(0);

MatchLogic =
e.getElementsByTagName_r("MatchLogic").item(0)

                  
.getFirstChild().getNodeValue();

MatchType =
e.getElementsByTagName_r("MatchType").item(0)

                  
.getFirstChild().getNodeValue();

MatchField =
e.getElementsByTagName_r("MatchField").item(0)

                  
.getFirstChild().getNodeValue();

FieldValue =
e.getElementsByTagName_r("FieldValue").item(0)

                  
.getFirstChild().getNodeValue();

注:MyslqXml.java与MonitorXml.java核心代码相似,主要是调用java中解析xml的类,主要类见如上import。

 

2.3 Package storm.spout

 

ReadLogSpout.java

public void open(Map conf, TopologyContext
context,SpoutOutputCollector collector) {

                  this.collector = collector;

                  try {

                           this.fis = new FileInputStream("domain.log");

                           this.isr = new InputStreamReader(fis, "UTF-8");

                           this.br = new BufferedReader(isr);

                  
} catch (Exception e) {

                           
e.printStackTrace();

                  
}

}

public void nextTuple() {

                  
String str = "";

                  try {

                           
//逐行读取发射,直到末尾

                           while ((str = this.br.readLine()) != null)
{

                                    this.collector.emit(new Values(str));

                                    
Thread.sleep(100);

                           
}

                  
} catch (Exception e) {

                           
// TODO Auto-generated catch block

                           
e.printStackTrace();

                  
}

        
}

注:该类为产生源数据的类,该类逐行读取log文件中的数据,发射到下一级处理Bolt中,读取文件时注意编码转换。
 

2.4 Package storm.base

 

MysqlOpt.java

import java.io.Serializable;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.SQLException;

public class MysqlOpt implements
Serializable {

        public Connection conn = null;

        
PreparedStatement statement = null;

        
// 连接数据库

        public boolean connSQL(String host_p, String
database, String username,

                           
String password) {

                  
String url = "jdbc:mysql://" + host_p + "/" + database

                                    
+ "?characterEncoding=UTF-8";

                  try {

                           
//使用jdbc驱动

                           
Class.forName("com.mysql.jdbc.Driver");

                           
conn = DriverManager.getConnection(url, username,
password);

                           return true;

                  
} catch (ClassNotFoundException cnfex) {

                           
System.out

                                              
.println("MysqlBolt-- Error: Loading JDBC/ODBC dirver
failed!");

                           
cnfex.printStackTrace();

                  
} catch (SQLException sqlex) {

                           
System.out.println("MysqlBolt-- Error: Connect database
failed!");

                           
sqlex.printStackTrace();

                  
}

                  return false;

        
}

        
// 插入数据

        public boolean insertSQL(String sql) {

                  try {

                           
statement = conn.prepareStatement(sql);

                           
statement.executeUpdate();

                           return true;

                  
} catch (SQLException e) {

                           
System.out.println("MysqlBolt-- Error: Insert database
failed!");

                           
e.printStackTrace();

                  
} catch (Exception e) {

                           
System.out.println("MysqlBolt-- Error: Insert failed!");

                           
e.printStackTrace();

                  
}

                  return false;

        
}

        
// 关闭连接

        public void deconnSQL() {

                  try {

                           if (conn != null)

                                    
conn.close();

                  
} catch (Exception e) {

                           
System.out.println("MysqlBolt-- Error: Deconnect database
failed!");

                           
e.printStackTrace();

                  
}

        
}

}

注:该类是mysql的操作类,包括mysql的链接、数据插入及数据库关闭等操作,供Mysqlbolt调用。
 

2.5 Package storm.bolt

 

Monitorbolt.java:

        public void prepare(Map stormConf,
TopologyContext context,

                           
OutputCollector collector) {

                  
System.out.println("MonitorBolt  
--      
Start!");

                  this.collector = collector;

                  
// 从conf中获取参数

                  new MonitorXml(this.monitorXml).read();

                  this.MatchLogic = MonitorXml.MatchLogic;

                  this.MatchType = MonitorXml.MatchType;

                  this.MatchField = MonitorXml.MatchField;

                  this.FieldValue = MonitorXml.FieldValue;

        
}

        public void execute(Tuple input) {

                  
//订阅str

                  
String str = input.getString(0);

                  if (this.flag_par == false) {

                           
System.out

                                              
.println("Mon
ed03
itorBolt-- Erre: can't get the path of
Monitor.xml!");

                  
} else {

                           
//调用Monitor进行条件判断,除了str,其他参数为配置文件中读取的列表

                           boolean moni = Monitor(str, this.MatchLogic,
this.MatchType,

                                              this.MatchField, this.FieldValue);

                           if (moni == true) {

                                    
// System.out.println("Monitor!!!");

                                    this.collector.emit(new Values(str));

                           
}

                  
}

        
}

        private boolean Monitor(String str, String logic,
String type,

                           
String field, String value) {

                  
//将列表拆分

                  
String[] types = type.split("::");

                  
String[] fields = field.split("::");

                  
String[] values = value.split("::");

                  int flag_init = types.length;

                  int flag = 0;//判断标志

                  if (logic.equals("AND")) {//逻辑AND

                           for (int i = 0; i < flag_init; i++) {

                                    if (types.equals("regular")) {

                                              
//调用正则匹配方法regular

                                              boolean regu = regular(str, fields[i], values[i]);

                                              if (regu == true) {

                                                       
flag++;

                                              
}

                                    
} else if (types[i].equals("range")) {

                                              
//调用范围匹配方法range

                                              boolean ran = range(str, fields[i], values[i]);

                                              if (ran == true) {

                                                       
flag++;

                                              
}

                                    
} else if (types[i].equals("routine0")) {

                                              
//调用常规模糊匹配方法routine0

                                              boolean rou0 = routine0(str, fields[i], values[i]);

                                              if (rou0 == true) {

                                                       
flag++;

                                              
}

                                    
} else if (types[i].equals("routine1")) {

                                              
//调用常规完全匹配方法routine1

                                              boolean rou1 = routine1(str, fields[i], values[i]);

                                              if (rou1 == true) {

                                                       
flag++;

                                              
}

                                    
}

                           
}

                           if (flag == flag_init) {

                                    
//所有条件都满足时

                                    return true;

                           
} else {

                                    return false;

                           
}

                  
} else if (logic.equals("OR")) {//逻辑OR

                           for (int i = 0; i < flag_init; i++) {

                                    if (types[i].equals("regular")) {

                                              boolean regu = regular(str, fields[i], values[i]);

                                              if (regu == true) {

                                                       
flag++;

                                              
}

                                    
} else if (types[i].equals("range")) {

                                              boolean ran = range(str, fields[i], values[i]);

                                              if (ran == true) {

                                                       
flag++;

                                              
}

                                    
} else if (types[i].equals("routine0")) {

                                              boolean rou0 = routine0(str, fields[i], values[i]);

                                              if (rou0 == true) {

                                                       
flag++;

                                              
}

                                    
} else if (types[i].equals("routine1")) {

                                              boolean rou1 = routine1(str, fields[i], values[i]);

                                              if (rou1 == true) {

                                                       
flag++;

                                              
}

                                    
}

                           
}

                           if (flag != 0) {

                                    return true;

                           
} else {

                                    return false;

                           
}

                  
}

                  return false;

        
}

        
// 正则匹配判断

        private boolean regular(String str, String field,
String value) {

                  
String[] strs = str.split("\t");

                  
Pattern p = Pattern.[i]compile
(value);

                  
Matcher m = p.matcher(strs[Integer.parseInt(field) -
1]);

                  boolean result = m.matches();

                  if (result == true) {

                           return true;

                  
} else {

                           return false;

                  
}

        
}

        
// 范围匹配

        private boolean range(String str, String field,
String value) {

                  
String[] strs = str.split("\t");

                  
String[] values = value.split(",");

                  int strss =
Integer.parseInt(strs[Integer.parseInt(field) -
1]);

                  if (values.length == 1) {

                           if (strss > Integer.parseInt(values[0])) {

                                    return true;

                           
} else {

                                    return false;

                           
}

                  
} else if (values.length == 2 &&
values[0].length() == 0) {

                           if (strss < Integer.parseInt(values[1])) {

                                    return true;

                           
} else {

                                    return false;

                           
}

                  
} else if (values.length == 2 &&
values[0].length() != 0) {

                           if (strss > Integer.parseInt(values[0])

                                              
&& strss < Integer.parseInt(values[1])) {

                                    return true;

                           
} else {

                                    return false;

                           
}

                  
} else {

                           return false;

                  
}

        
}

        
// 常规模糊匹配

        private boolean routine0(String str, String field,
String value) {

                  
String[] strs = str.split("\t");

                  
String strss = strs[Integer.parseInt(field) - 1];

 

                  if (strss.contains(value) && !strss.equals(value))
{

                           return true;

                  
} else {

                           return false;

                  
}

        
}

        
// 常规完全匹配

        private boolean routine1(String str, String field,
String value) {

                  
String[] strs = str.split("\t");

                  
String strss = strs[Integer.parseInt(field) - 1];

 

                  if (strss.equals(value)) {

                           return true;

                  
} else {

                           return false;

                  
}

        
}

注1该类主要设计了匹配规则,支持多种匹配方式,包括正则、范围、常规模糊及完全匹配,且支持两种逻辑判断关系。
MyslqBolt.java:

        public void prepare(Map stormConf,
TopologyContext context,

                           
OutputCollector collector) {

                  
System.out.println("MysqlBolt      
--      
Start!");

                  this.collector = collector;

                  
// 初始化mysql

                  
Loading();

        
}

        
// 参数初始化

        public void Loading() {

                  new MysqlXml(this.mysqlXml).read();

                  
String host_port = MysqlXml.Host_port; //
mysql地址及端口

                  
String database = MysqlXml.Database; // 数据库名

                  String username = MysqlXml.Username; // 用户名

                  
String password = MysqlXml.Password; // 密码

                  this.from = MysqlXml.From; // 表名

                  if (this.mysql.connSQL(host_port, database, username,
password) == false) {

                           
System.out

                                              
.println("MysqlBolt--Config errer, Please check Mysql-conf: "

                                                                
+ this.mysqlXml);

                           
flag_xml = false;

                  
} else {

                           
System.out.println("MysqlBolt-- Conf Loaded: " +
this.mysqlXml);

                  
}

        
}

        public void execute(Tuple input) {

                  
String str = input.getString(0);

                  if (this.flag_par == false) {

                           
System.out

                                              
.println("MysqlBolt-- Erre: can't get the path of Mysql.xml!");

                  
} else {

                           if (this.flag_xml == true) {

                                    
String insert = send_str(str);

                                    if (this.mysql.insertSQL(insert) == false)
{

                                              
System.out

                                                                
.println("MysqlBolt-- Erre: can't insert tuple into
database!");

                                              
System.out.println("MysqlBolt-- Error Tuple: " + str);

                                    
}

                           
}

                  
}

        
}

        
//插入mysql语句构造方法

        public String send_str(String str) {

                  
String send_tmp = null;

                  
String field[] = str.split("\t");

                  for (int i = 0; i < field.length; i++) {

                           if (i == 0) {

                                    
send_tmp = "'" + field[0] + "', '";

                           
} else if (i == (field.length - 1)) {

                                    
send_tmp = send_tmp + field[i] + "'";

                           
} else {

                                    
send_tmp = send_tmp + field[i] + "', '";

                           
}

                  
}

                  
String send = "insert into " + this.from + " values (" +
send_tmp

                                    
+ ");";

                  return send;

        
}

注2该类主要用于数据存储,调用了base包中的mysqlOpt类中的多个方法,对mysql进行连接,数据插入及数据库关闭等等。
 

3 文档总结

 

该文档为流数据监控模拟项目的代码解析部分,鉴于篇幅只解析部分核心代码,如果需要完整源代码可以到博客http://blog.sina.com.cn/huangchongyuan中留言,或者加入qq群191321336。有什么疑问,欢迎留言,一起讨论,一起学习。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: