您的位置:首页 > 其它

基于ELK Stack和Spark Streaming的日志处理平台设计与实现

2015-10-13 15:55 681 查看
大数据时代,随着数据量不断增长,存储与计算集群的规模也逐渐扩大,几百上千台的云计算环境已不鲜见。现在的集群所需要解决的问题不仅仅是高性能、高可靠性、高可扩展性,还需要面对易维护性以及数据平台内部的数据共享性等诸多挑战。优秀的系统运维平台既能实现数据平台各组件的集中式管理、方便系统运维人员日常监测、提升运维效率,又能反馈系统运行状态给系统开发人员。例如采集数据仓库的日志可以按照时间序列查看各数据库实例各种级别的日志数量与占比,采集DB2表空间数据分析可得到数据库集群健康状态,分析应用服务器的日志可以查看出错最多的模块、下载最多的文件、使用最多的功能等。大数据时代的业务与运维将紧密的结合在一起。

一、日志

1.什么是日志

日志是带时间戳的基于时间序列的机器数据,包括IT系统信息(服务器、网络设备、操作系统、应用软件)、物联网各种传感器信息。日志可以反映用户行为,是真实数据。

2.日志处理方案演进

图 1. 日志处理方案经历的版本迭代

   


 

l 日志处理v1.0:日志没有集中式处理;只做事后追查,黑客入侵后删除日志无法察觉;使用数据库存储日志,无法胜任复杂事务处理。

l 日志处理v2.0:使用Hadoop平台实现日志离线批处理,缺点是实时性差;使用Storm流处理框架、Spark内存计算框架处理日志,但Hadoop/Storm/Spark都是编程框架,并不是拿来即用的平台。

l 日志处理v3.0:使用日志实时搜索引擎分析日志,特点:第一是快,日志从产生到搜索分析出结果只有数秒延时;第二是大,每天处理TB日志量;第三是灵活,可搜索分析任何日志。作为代表的解决方案有Splunk、ELK、SILK。

图 2. 深度整合ELK、Spark、Hadoop构建日志分析系统

 


二、ELK Stack

ELK Stack是开源日志处理平台解决方案,背后的商业公司是elastic(https://www.elastic.co/)。它由日志采集解析工具Logstash、基于Lucene的全文搜索引擎Elasticsearch、分析可视化平台Kibana组成。目前ELK的用户有Adobe、Microsoft、Mozilla、Facebook、Stackoverflow、Cisco、ebay、Uber等诸多厂商。

1.Logstash

Logstash是一种功能强大的信息采集工具,类似于Hadoop生态圈里的Flume,任何类型的事件流都可以被input、filter、output这三个Logstash中的插件处理和转换。另外还可以在配置文件中添加codec插件通过简单编码来简化处理过程。下面以DB2的一条日志为例。

图3.DB2数据库产生的半结构化日志样例

 


在使用logstash进行解析时,input组件中引入codec插件中的multiline,用来将多行文本封装为一个事件流,并使用正则表达式指定分离事件流的标志,在此用例中,该标志为以时间戳开始的行。在实际应用中,有时DB2的一条日志会超过500行,这超出了multiline组件默认的事件封装的最大行数,这需要我们在multiline中设置max_lines属性。通过脚本:

awk '/Event Id/{if(t++)if(c>m)m=c;c=0;next}{c++}END{print "Event ID times="t",max="m}' JobAllDetailLog.ods_dev.Batch\:\:Calculate_CISF_Tables_count.log JobAllDetailLog.*

查看两个Event Id之间最大的行数。

经过input读入预处理后的数据流入filter,filter中使用grok、mutate等插件来过滤文本和匹配字段,并且我们自己可以为事件流添加额外的字段信息:

filter {

  mutate{

    gsub => ['message', "\n", " "]

  }

  grok {

    match => { "message" => 

    "(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY}-%{HOUR}\.%{MINUTE}\.%{SECOND})%{INT:timezone}(?:%{SPACE}%{WORD:recordid}%{SPACE})(?:LEVEL%{SPACE}:%{SPACE}%{DATA:level}%{SPACE})(?:PID%{SPACE}:%{SPACE}%{INT:processid}%{SPACE})(?:TID%{SPACE}:%{SPACE}%{INT:threadid}%{SPACE})(?:PROC%{SPACE}:%{SPACE}%{DATA:process}%{SPACE})?(?:INSTANCE%{SPACE}:%{SPACE}%{WORD:instance}%{SPACE})?(?:NODE%{SPACE}:%{SPACE}%{WORD:node}%{SPACE})?(?:DB%{SPACE}:%{SPACE}%{WORD:dbname}%{SPACE})?(?:APPHDL%{SPACE}:%{SPACE}%{NOTSPACE:apphdl}%{SPACE})?(?:APPID%{SPACE}:%{SPACE}%{NOTSPACE:appid}%{SPACE})?(?:AUTHID%{SPACE}:%{SPACE}%{WORD:authid}%{SPACE})?(?:HOSTNAME%{SPACE}:%{SPACE}%{HOSTNAME:hostname}%{SPACE})?(?:EDUID%{SPACE}:%{SPACE}%{INT:eduid}%{SPACE})?(?:EDUNAME%{SPACE}:%{SPACE}%{DATA:eduname}%{SPACE})?(?:FUNCTION%{SPACE}:%{SPACE}%{DATA:function}%{SPACE})(?:probe:%{SPACE}%{INT:probe}%{SPACE})%{GREEDYDATA:functionlog}"

    }

  }

  date {

    match => [ "timestamp", "YYYY-MM-dd-HH.mm.ss.SSSSSS" ]

  }

 }

经如上filter插件处理后的事件流便就都处理成了json格式。

Output插件用于指定事件流的去向,可以是消息队列、全文搜索引擎、TCP Socket、Email等几十种目标端。

2.Elasticsearch

Elasticsearch是基于Lucene的近实时搜索平台,它能在一秒内返回你要查找的且已经在Elasticsearch做了索引的文档。它默认基于Gossip路由算法的自动发现机制构建配置有相同cluster name的集群,但是有的时候这种机制并不可靠,会发生脑裂现象。鉴于主动发现机制的不稳定性,用户可以选择在每一个节点上配置集群其他节点的主机名,在启动集群时进行被动发现。

Elasticsearch中的Index是一组具有相似特征的文档集合,类似于关系数据库模型中的数据库实例,Index中可以指定Type区分不同的文档,类似于数据库实例中的关系表,Document是存储的基本单位,都是JSON格式,类似于关系表中行级对象。我们处理后的JSON文档格式的日志都要在Elasticsearch中做索引,相应的Logstash有Elasticsearch output插件,对于用户是透明的。

3.Kibana

Kibana是专门设计用来与Elasticsearch协作的,可以自定义多种表格、柱状图、饼状图、折线图对存储在Elasticsearch中的数据进行深入挖掘分析与可视化。下图定制的仪表盘可以动态监测数据库集群中每个数据库实例产生的各种级别的日志。

图4. 实时监测DB2实例运行状态的动态仪表盘

 


三、Kafka

Kafka是LinkedIn开源的分布式消息队列,它采用了独特的消费者-生产者架构实现数据平台各组件间的数据共享。集群概念中的server在Kafka中称之为broker,它使用主题管理不同类别的数据,比如DB2日志归为一个主题,tomcat日志归为一个主题。我们使用Logstash作为Kafka消息的生产者时,output插件就需要配置好Kafka broker的列表,也就是Kafka集群主机的列表;相应的,用作Kafka消费者角色的Logstash的input插件就要配置好需要订阅的Kafka中的主题名称和ZooKeeper主机列表。Kafka通过将数据持久化到硬盘的Write Ahead Log(WAL)保证数据可靠性与顺序性,但这并不会影响实时数据的传输速度,实时数
4000
据仍是通过内存传输的。Kafka是依赖于ZooKeeper的,它将每组消费者消费的相应topic的偏移量保存在ZooKeeper中。据称LinkedIn内部的Kafka集群每天已能处理超过1万亿条消息。

图5. 基于消息订阅机制的Kafka架构

 


除了可靠性和独特的push&pull架构外,相较于其他消息队列,Kafka还拥有更大的吞吐量:

图6. 基于消息持久化机制的消息队列吞吐量比较

 


四、Spark Streaming

Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。它将批处理、流处理、即席查询融为一体。Spark社区也是相当火爆,平均每三个月迭代一次版本更是体现了它在大数据处理领域的地位。

Spark Streaming不同于Storm,Storm是基于事件级别的流处理,Spark Streaming是mini-batch形式的近似流处理的微型批处理。Spark Streaming提供了两种从Kafka中获取消息的方式:

第一种是利用Kafka消费者高级API在Spark的工作节点上创建消费者线程,订阅Kafka中的消息,数据会传输到Spark工作节点的执行器中,但是默认配置下这种方法在Spark Job出错时会导致数据丢失,如果要保证数据可靠性,需要在Spark Streaming中开启Write Ahead Logs(WAL),也就是上文提到的Kafka用来保证数据可靠性和一致性的数据保存方式。可以选择让Spark程序把WAL保存在分布式文件系统(比如HDFS)中。

第二种方式不需要建立消费者线程,使用createDirectStream接口直接去读取Kafka的WAL,将Kafka分区与RDD分区做一对一映射,相较于第一种方法,不需再维护一份WAL数据,提高了性能。读取数据的偏移量由Spark Streaming程序通过检查点机制自身处理,避免在程序出错的情况下重现第一种方法重复读取数据的情况,消除了Spark Streaming与ZooKeeper/Kafka数据不一致的风险。保证每条消息只会被Spark Streaming处理一次。以下代码片通过第二种方式读取Kafka中的数据:

// Create direct kafka stream with brokers and topics

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

               jssc,

               String.class,

               String.class,

               StringDecoder.class,

               StringDecoder.class,

               kafkaParams,

               topicsSet);

messages.foreachRDD(new Function<JavaPairRDD<String,String>,Void>(){

public Void call(JavaPairRDD<String, String> v1)

           throws Exception {

v1.foreach(new VoidFunction<Tuple2<String, String>>(){

                     public void call(Tuple2<String, String> tuple2) {

                          try{

                            JSONObject a = new JSONObject(tuple2._2);

                            ...

Spark Streaming获取到消息后便可以通过Tuple对象自定义操作消息,如下图是针对DB2数据库日志的邮件告警,生成告警邮件发送到Notes邮箱:

图6. 基于Spark Streaming对DB2异常日志实现Notes邮件告警

 


 

五、互联网行业日志处理方案举例介绍与应用

1.新浪

新浪采用的技术架构是常见的Kafka整合ELK Stack方案。Kafka作为消息队列用来缓存用户日志;使用Logstash做日志解析,统一成JSON格式输出给Elasticsearch;使用Elasticsearch提供实时日志分析与强大的搜索和统计服务;Kibana用作数据可视化组件。该技术架构目前服务的用户包括微博、微盘、云存储、弹性计算平台等十多个部门的多个产品的日志搜索分析业务,每天处理约32亿条(2TB)日志。

新浪的日志处理平台团队对Elasticsearch做了大量优化(比如调整max open files等),并且开发了一个独立的Elasticsearch Index管理系统,负责索引日常维护任务(比如索引的创建、优化、删除、与分布式文件系统的数据交换等)的调度及执行。为Elasticsearch安装了国内中文分词插件elasticsearch-analysis-ik,满足微盘搜索对中文分词的需求。

2.腾讯

腾讯蓝鲸数据平台告警系统的技术架构同样基于分布式消息队列和全文搜索引擎。但腾讯的告警平台不仅限于此,它的复杂的指标数据统计任务通过使用Storm自定义流式计算任务的方法实现,异常检测的实现利用了曲线的时间周期性和相关曲线之间的相关性去定义动态的阈值,并且基于机器学习算法实现了复杂的日志自动分类(比如summo logic)。

告警平台把拨测(定时curl一下某个url,有问题就告警)、日志集中检索、日志告警(5分钟Error大于X次告警)、指标告警(cpu使用率大于X告警)整合进同一个数据管线,简化了整体的架构。

3.七牛

七牛采用的技术架构为Flume+Kafka+Spark,混部在8台高配机器。根据七牛技术博客提供的数据,该日志处理平台每天处理500亿条数据,峰值80万TPS。 

Flume相较于Logstash有更大的吞吐量,而且与HDFS整合的性能比Logstash强很多。七牛技术架构选型显然考虑了这一点,七牛云平台的日志数据到Kafka后,一路同步到HDFS,用于离线统计,另一路用于使用Spark Streaming进行实时计算,计算结果保存在Mongodb集群中。

 

任何解决方案都不是十全十美的,具体采用哪些技术要深入了解自己的应用场景。就目前日志处理领域的开源组件来说,在以下几个方面还比较欠缺:

l Logstash的内部状态获取不到,目前没有好的成熟的监控方案。

l Elasticsearch具有海量存储海量聚合的能力,但是同Mongodb一样,并不适合于写入数据非常多(1万TPS以上)的场景。

l 缺乏真正实用的异常检测方法;实时统计方面缺乏成熟的解决方案,Storm就是一个底层的执行引擎,而Spark还缺少时间窗口等抽象。

l 对于日志自动分类,还没有开源工具可以做到summo logic那样的效果。

 

结束语:

大数据时代的运维管理意义重大,好的日志处理平台可以事半功倍的提升开发人员和运维人员的效率。本文通过简单用例介绍了ELK Stack、Kafka和Spark Streaming在日志处理平台中各自在系统架构中的功能。现实中应用场景繁多复杂、数据形式多种多样,日志处理工作不是一蹴而就的,分析处理过程还需要在实践中不断挖掘和优化,笔者也将致力于DB2数据库运行状态更细节数据的收集和更全面细致的监控。

参考资源 (resources)

· 参考 “日志: 每个软件工程师都应该知晓的实时数据集成提取的那点事”,详细了解日志处理。

· 查看文章“新浪是如何分析处理32亿条实时日志的?”,了解新浪的日志处理方案设计。

· 查看文章“腾讯蓝鲸数据平台之告警系统”,了解腾讯在大型系统运维工程上的实践。

· 查看文章“七牛是如何搞定每天500亿条日志的”,了解七牛海量日志处理解决方案。

· 查看文章“Mesos在去哪儿网的应用”,了解去哪儿网利用日志在集群管理中的实践。

· 参考网站“https://softwaremill.com/mqperf/”,了解基于消息持久化的各消息队列各种负载下的性能分析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: