您的位置:首页 > 其它

实时日志分析平台搭建笔记(二)

2017-07-04 00:00 302 查看
接上一篇。ES采用两台服务器的集群,创建一个indices,名称为access-log,jstorm和spark处理后的日志写入其中。





注意:创建完access-log后,在写入数据前,需要运行以下命令,"time"为时间字段,"vulType"为_type

curl -XPUT '192.168.32.32:9200/access-log?pretty' -H 'Content-Type: application/json' -d'
{
"mappings": {
"vulType": {
"properties": {
"time": {
"type":   "date",
"format": "date_time_no_millis"
}
}
}
}
}
'

伪造一些日志,可以在kibana中看到jstorm执行后的结果







以上是jstorm实时检测的流程。还有一路是spark通过MR分析多条日志,进行统计规则的检测。

spark首先去hbase中读取数据,随后进行Map/Reduce,统计出触发风险规则的日志。例如测试统计三分钟内访问次数超过10次的IP。

获取hbase中的accesslog

public List<String> getTableByDate(String startRow,String endRow){
Scan s = new Scan();
List<String> logList = new ArrayList<String>();
s.setStartRow(Bytes.toBytes(startRow));
s.setStopRow(Bytes.toBytes(endRow));
try {
ResultScanner resultScanner = table.getScanner(s);
for(Result rs:resultScanner){
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> navigableMap = rs.getMap();
for(Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry:navigableMap.entrySet()){
NavigableMap<byte[], NavigableMap<Long, byte[]>> map =entry.getValue();
for(Map.Entry<byte[], NavigableMap<Long, byte[]>> en:map.entrySet()){
NavigableMap<Long, byte[]> ma = en.getValue();
for(Map.Entry<Long, byte[]>e: ma.entrySet()){
logList.add(Bytes.toString(e.getValue()));
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
return null;
}
return logList;

随后将accesslog映射到类AccessLog中

public void analysesLog(String startKey,String endKey){
logList = hbaseOperator.getTableByDate(startKey, endKey);
listLength = logList.size();
accesslogList = new ArrayList<AccessLog>(listLength);
String patternstr = "((\\d+\\.){3}\\d+)\\s(\\S+)\\s\\[(.+)\\]\\s\"((\\S+)\\s(.*)\\s(\\S+))\"\\s\"(.*)\"\\s(\\d+)\\s(\\d+)\\s(\\S+)\\s\"(.*)\"\\s\"(.*)\"";
pattern = Pattern.compile(patternstr);
for(int i=0;i<listLength;i++){
m = pattern.matcher(logList.get(i));
if(m.find()){
AccessLog accessLog = new AccessLog();
accessLog.setLogType("access-log");
accessLog.setClientIp(m.group(1));
accessLog.setClientUser(m.group(3));
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss ZZ",Locale.ENGLISH);
Date time = null;
try {
time = sdf.parse(m.group(4));
accessLog.setTime(sdf2.format(time));
} catch (ParseException e) {
accessLog.setTime(sdf2.format(new Date()));
//logger.error("[LogSplit.execute]:" + e.getMessage());
}
accessLog.setMethod(m.group(6));
accessLog.setUrl(m.group(7));
accessLog.setVersion(m.group(8));
accessLog.setRequestBody(m.group(9));
accessLog.setStatus(m.group(10));
accessLog.setHttpBytes(m.group(11));
accessLog.setRequestTime(m.group(12));
accessLog.setReferer(m.group(13));
accessLog.setUserAgent(m.group(14));
accesslogList.add(accessLog);
}
}
analysesIp();

}

映射完成后调用analysesIp()进行Map/Reduce操作,并将命中的IP写入到ES中

public void analysesIp(){
if(!accesslogList.isEmpty()){
List<String> ipList = new ArrayList<String>(listLength);
Iterator<AccessLog> iterator = accesslogList.iterator();
while (iterator.hasNext()) {
ipList.add(iterator.next().getClientIp());
}
JavaRDD<String> ipRdd = sparkContext.parallelize(ipList);
JavaPairRDD<String, Integer> clientIpRdd = ipRdd.mapToPair(initCount);
JavaPairRDD<String, Integer> sumRdd = clientIpRdd.reduceByKey(sum);
Map<String, Integer> ipMap = sumRdd.collectAsMap();
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
AccessLog accessLog = new AccessLog();
accessLog.setLogType("accesslog");
accessLog.setVulType("IP访问异常");
accessLog.setVulTypeId(Integer.toString(RuleLength.START_RULE_LENGTH));
for(Entry<String, Integer> entry:ipMap.entrySet()){
if(entry.getValue() > 30){
accessLog.setTime(sdf2.format(new Date()));
accessLog.setClientIp(entry.getKey());
accessLog.setMsg("源IP在3分钟内共访问了" + entry.getValue() + "次");
elasticSearch.inputData(accessLog);
}
}

}
}

用到两个简单的map/reduce函数,一个进行初始化,将单个IP初始化一个元组(IP,1)

第二个将相同的IP进行累加,并记录出现的次数,累加后的效果为(IP1,10) (IP2,3)

private static PairFunction<String, String, Integer> initCount = new PairFunction<String, String, Integer>() {

/**
*
*/
private static final long serialVersionUID = -6290488020645730311L;

public Tuple2<String, Integer> call(String x){
return new Tuple2<String, Integer>(x, 1);
}
};

private static Function2<Integer, Integer, Integer> sum = new Function2<Integer, Integer, Integer>() {

/**
*
*/
private static final long serialVersionUID = 391813718009018019L;

@Override
public Integer call(Integer x, Integer y) throws Exception {

return x + y;
}
};

写入ES后,可看到的效果如下

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  日志分析 安全