您的位置:首页 > 数据库

数据仓库(六)---分布式SQL查询引擎---presto介绍

2018-02-23 17:38 597 查看
我们在之前的文章中已经学习了数据仓库hive,如果要对数据仓库进行交互查询,则需要交互查询的引擎用于提高查询效率。本章介绍presto。

简介

Presto是一个开源的分布式SQL查询引擎,适用于实时交互式分析查询,支持海量的数据;主要是为了解决商业数据仓库的交互分析,和处理速度低下的问题。它支持标准的ANSI SQL,包括复杂查询、聚合(aggregation)、连接(join)和窗口函数(window functions)。

Presto支持在线数据查询,包括Hive, Cassandra, 关系数据库以及专有数据存储。 一条Presto查询可以将多个数据源的数据进行合并,可以跨越整个组织进行分析。

我们都知道hive等数据仓库是使用mapreduce来进行查询的,mapreduce本身的运行就要花一定的时间,所以查询效率会低些,不能满足web端实时的交互式分析查询。只能用来做离线的海量数据分析。

但是有时候我们需要查某些详细的数据时又需要交互的少量的进行查询,这种场景下Presto应运而生。

Preston 官网:http://prestodb.io/

Preston Github 主页:https://github.com/facebook/presto

https://github.com/prestodb/presto

http://prestodb-china.com/ (京东版本)

https://teradata.github.io/presto/docs/current/overview.html (teradata版本)

Facebook数据基础设施的Presto团队由以下成员组成, Martin Traverso, Dain Sundstrom, David Phillips, Eric Hwang, Nileema Shingte 以及Ravi Murthy.

背景

Presto是facebook搭建开源的项目。

Facebook是一家数据驱动的公司。 数据处理和分析是Facebook为10亿多活跃用户开发和交付产品的核心所在。 拥有世界上最大的数据仓库之一,存储了大约 300PB 以上的数据。 这些数据被一系列不同种类的程序所使用, 包括传统的数据批处理程序、基于图论的数据分析、机器学习、和实时性的数据分析。

分析人员、数据科学家和工程师需要处理数据、分析数据、不断地改善我们的产品, 对于这些人来说, 提高数据仓库的查询性能是非常重要的。在一定时间内能够运行更多的查询并且能够更快地获得查询结果能够提高他们的工作效率。

Facebook数据仓库中的数据存储在几个大型的Hadoop HDFS的集群上。 Hadoop MapReduce和Hive被设计为用于进行大规模、高可靠性的计算,而且这些技术都被优化为用来提高整体系统的吞吐量。但是当数据仓库增长到PB级别,并且需求进一步提升的时候, 就非常需要一个在数据仓库上工作的,能够提供低延迟的交互式查询系统。

在2012年秋天,Facebook 数据基础设施(Data Infrastructure)部门的一支团队开始为数据仓库的用户解决这个问题。他们评估了一些外部项目, 发现这些项目或者是太不成熟,或者就是不能满足我们在灵活性和规模性上的要求。 所以他们决定开始搭建Presto,一个崭新的能够在PB级别的数据上进行交互式查询的系统。

现状

Presto的开发是从2012年的秋天开始的。 在2013年早期的时候Facebook的第一个生产系统开始运行。 在2013年春天的时候这个系统推广到了Facebook的整个公司。从那是起, Presto成为了公司内在数据仓库上进行交互式分析的主要系统。 它被部署到了多个不同的地区,而且我们成功地将一个集群扩展到了1000个节点。 超过1000名以上的员工在日常工作中使用这个系统, 他们每天在一个PB的数据上会运行超过30,000个查询。

Presto在CPU的性能和主要的查询性能上比Hive/MapReduce要好10倍以上。它目前支持ANSI SQL的大部分操作, 包括连接、 左/右外连接、 子查询、以及通用的聚合和标量函数, 同时也包含了一些近似的去重(使用了 HyperLogLog)和近似的百分数(基于quantile digest算法,)计算。目前阶段的主要限制是在表连接时候的大小限制以及唯一键值和群组的基数(cardinality of unique keys/groups)。目前系统没有能力将查询结果回写到特定的表中(目前查询结果会直接通过流输出的方式返回给客户端)。

Facebook积极努力地扩展Presto的功能以及提供性能。比如去除查询中连接和聚合的大小限制,同时将提供将查询结果写入输出表的功能。 同时开发一个查询加速器。主要是设计一种为查询处理优化的新的数据格式来避免不必要的数据转换。 这些新的特性会将后台数据仓库中经常使用的数据集合缓存起来, 系统会有效地使用这些缓存数据来加速查询的速度,而不需要让用户知道缓存机制的存在。还有开发一个高性能的HBase连接器(HBase connector)。

架构和工作原理

Presto是一个分布式SQL查询引擎, 它被设计为用来专门进行高速、实时的数据分析。它支持标准的ANSI SQL,包括复杂查询、聚合(aggregation)、连接(join)和窗口函数(window functions)。

下面的架构图中展现了简化的Presto系统架构。客户端(client)将SQL查询发送到Presto的协调员(coordinator)。协调员会进行语法检查、分析和规划查询计划。计划员(scheduler)将执行的管道组合在一起, 将任务分配给那些里数据最近的节点,然后监控执行过程。 客户端从输出段中将数据取出, 这些数据是从更底层的处理段中依次取出的。

Presto的运行模型和Hive或MapReduce有着本质的区别。Hive将查询翻译成多阶段的MapReduce任务, 一个接着一个地运行。 每一个任务从磁盘上读取输入数据并且将中间结果输出到磁盘上。 然而Presto引擎没有使用MapReduce。它使用了一个定制的查询和执行引擎和响应的操作符来支持SQL的语法。除了改进的调度算法之外, 所有的数据处理都是在内存中进行的。 不同的处理端通过网络组成处理的流水线。 这样会避免不必要的磁盘读写和额外的延迟。 这种流水线式的执行模型会在同一时间运行多个数据处理段, 一旦数据可用的时候就会将数据从一个处理段传入到下一个处理段。 这样的方式会大大的减少各种查询的端到端响应时间。



Presto系统是用Java来实现的, 主要原因是Java的开发效率高,且拥有非常好的生态环境, 并且很容易同Facebook数据基础设施的其他Java应用进行集成。Presto会将查询计划中的一部分动态地编译成JVM字节代码,并让JVM优化和生成原生的机器代码。 通过谨慎地使用内存和数据结构,Presto避免了通常Java程序会碰到的内存分配和垃圾收集(Java garbage collection)的问题。

存储插件(连接器)

扩展性是在设计Presto时的另一个要点。在项目的早期阶段, 意识到除了HDFS之外,大量数据会被存储在很多其他类型的系统中。 其中一些是像HBase一类的为人熟知的系统,另一类则是象Facebook New Feed一样的定制的后台。Presto设计了一个简单的数据存储的抽象层, 来满足在不同数据存储系统之上都可以使用SQL进行查询。存储插件(连接器,connector)只需要提供实现以下操作的接口, 包括对元数据(metadata)的提取,获得数据存储的位置,获取数据本身的操作等。除了Hive/HDFS后台系统之外, Presto项目也开发了一些连接其他系统的Presto 连接器,包括HBase,Scribe和定制开发的系统。



(Scribe是Facebook的开源项目,可以实时的将大量服务器产生的日志文件汇总到文件系统中, 详见:https://github.com/facebook/scribe

Presto 支持可插拔的连接器用于提供数据查询。不同连接器的要求不一样。

HADOOP/HIVE

Presto 支持读以下版本的 hive 数据:

Apache Hadoop 1.x,使用 hive-hadoop1 连接器

Apache Hadoop 2.x,使用 hive-hadoop2 连接器

Cloudera CDH 4,使用 hive-cdh4 连接器

Cloudera CDH 5,使用 hive-cdh5 连接器

支持以下格式:Text、SequenceFile、RCFile、ORC。

另外,还需要一个远程的 Hive metastore 服务。本地的或者嵌入式模式是不支持的。Presto 不使用 MapReduce 并且只需要 HDFS。

CASSANDRA

Cassandra 2.x 是需要的。这个连接器是完全独立于 Hive 连接器的并且仅仅需要一个安装好的 Cassandra 集群。

TPC-H

TPC-H 连接器动态地生成数据用于实验和测试 Presto。这个连接器没有额外的要求。

当然,Presto 还支持一些其他的连接器,包括:

JMX

Kafka

MySQL

PostgreSQL

presto在hive中工作的原理

如图



Presto查询引擎是一个Master-Slave的架构,由下面三部分组成:

一个Coordinator节点

一个Discovery Server节点

多个Worker节点

Coordinator: 负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行

Discovery Server: 通常内嵌于Coordinator节点中

Worker节点: 负责实际执行查询任务,负责与HDFS交互读取数据

Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。如果配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker 节点与 HDFS 交互读取数据。

Presto执行查询过程

整体查询流程

Client使用HTTP协议发送一个query请求。

通过Discovery Server发现可用的Server。

Coordinator构建查询计划(Connector插件提供Metadata)

Coordinator向workers发送任务

Worker通过Connector插件读取数据

Worker在内存里执行任务(Worker是纯内存型计算引擎)

Worker将数据返回给Coordinator,之后再Response Client



SQL执行流程

当Coordinator收到一个Query,其SQL执行流程如图所示。SQL通过Anltr3解析为AST(抽象语法树),然后通过Connector获取原始数据的Metadata信息,这里会有一些优化,比如缓存Metadata信息等,根据Metadata信息生成逻辑计划,然后会依次生成分发计划和执行计划,在执行计划里需要去Discovery里获取可用的node列表,然后根据一定的策略,将这些计划分发到指定的Worker机器上,Worker机器再分别执行。



提交查询

用户使用Presto Cli提交一个查询语句后,Cli使用HTTP协议与Coordinator通信,Coordinator收到查询请求后调用SqlParser解析SQL语句得到Statement对象,并将Statement封装成一个QueryStarter对象放入线程池中等待执行。



SQL编译过程

Presto与Hive一样,使用Antlr编写SQL语法,语法规则定义在Statement.g和StatementBuilder.g两个文件中。

如下图中所示从SQL编译为最终的物理执行计划大概分为5部,最终生成在每个Worker节点上运行的LocalExecutionPlan,这里不详细介绍SQL解析为逻辑执行计划的过程,通过一个SQL语句来理解查询计划生成之后的计算过程。





SQL语句生成的逻辑执行计划Plan如上图所示。那么Presto是如何对上面的逻辑执行计划进行拆分以较高的并行度去执行完这个计划呢,我们来看看物理执行计划。

物理执行计划

逻辑执行计划图中的虚线就是Presto对逻辑执行计划的切分点,逻辑计划Plan生成的SubPlan分为四个部分,每一个SubPlan都会提交到一个或者多个Worker节点上执行。

SubPlan有几个重要的属性planDistribution、outputPartitioning、partitionBy属性。

PlanDistribution表示一个查询Stage的分发方式,逻辑执行计划图中的4个SubPlan共有3种不同的PlanDistribution方式:Source表示这个SubPlan是数据源,Source类型的任务会按照数据源大小确定分配多少个节点进行执行;Fixed表示这个SubPlan会分配固定的节点数进行执行(Config配置中的query.initial-hash-partitions参数配置,默认是8);None表示这个SubPlan只分配到一个节点进行执行。在下面的执行计划中,SubPlan1和SubPlan0 PlanDistribution=Source,这两个SubPlan都是提供数据源的节点,SubPlan1所有节点的读取数据都会发向SubPlan0的每一个节点;SubPlan2分配8个节点执行最终的聚合操作;SubPlan3只负责输出最后计算完成的数据。

OutputPartitioning属性只有两个值HASH和NONE,表示这个SubPlan的输出是否按照partitionBy的key值对数据进行Shuffle。在下面的执行计划中只有SubPlan0的OutputPartitioning=HASH,所以SubPlan2接收到的数据是按照rank字段Partition后的数据。



完全基于内存的并行计算

查询的并行执行流程

Presto SQL的执行流程如下图所示



Cli通过HTTP协议提交SQL查询之后,查询请求封装成一个SqlQueryExecution对象交给Coordinator的SqlQueryManager#queryExecutor线程池去执行

每个SqlQueryExecution线程(图中Q-X线程)启动后对查询请求的SQL进行语法解析和优化并最终生成多个Stage的SqlStageExecution任务,每个SqlStageExecution任务仍然交给同样的线程池去执行

每个SqlStageExecution线程(图中S-X线程)启动后每个Stage的任务按PlanDistribution属性构造一个或者多个RemoteTask通过HTTP协议分配给远端的Worker节点执行

Worker节点接收到RemoteTask请求之后,启动一个SqlTaskExecution线程(图中T-X线程)将这个任务的每个Split包装成一个PrioritizedSplitRunner任务(图中SR-X)交给Worker节点的TaskExecutor#executor线程池去执行

上面的执行计划实际执行效果如下图所示。



Coordinator通过HTTP协议调用Worker节点的 /v1/task 接口将执行计划分配给所有Worker节点(图中蓝色箭头)

SubPlan1的每个节点读取一个Split的数据并过滤后将数据分发给每个SubPlan0节点进行Join操作和Partial Aggr操作

SubPlan1的每个节点计算完成后按GroupBy Key的Hash值将数据分发到不同的SubPlan2节点

所有SubPlan2节点计算完成后将数据分发到SubPlan3节点

SubPlan3节点计算完成后通知Coordinator结束查询,并将数据发送给Coordinator

源数据的并行读取

在上面的执行计划中SubPlan1和SubPlan0都是Source节点,其实它们读取HDFS文件数据的方式就是调用的HDFS InputSplit API,然后每个InputSplit分配一个Worker节点去执行,每个Worker节点分配的InputSplit数目上限是参数可配置的,Config中的query.max-pending-splits-per-node参数配置,默认是100。

分布式的Hash聚合

上面的执行计划在SubPlan0中会进行一次Partial的聚合计算,计算每个Worker节点读取的部分数据的部分聚合结果,然后SubPlan0的输出会按照group by字段的Hash值分配不同的计算节点,最后SubPlan3合并所有结果并输出

流水线

数据模型

Presto中处理的最小数据单元是一个Page对象,Page对象的数据结构如下图所示。一个Page对象包含多个Block对象,每个Block对象是一个字节数组,存储一个字段的若干行。多个Block横切的一行是真实的一行数据。一个Page最大1MB,最多16*1024行数据。



节点内部流水线计算

下图是一个Worker节点内部的计算流程图,左侧是任务的执行流程图。



Worker节点将最细粒度的任务封装成一个PrioritizedSplitRunner对象,放入pending split优先级队列中。每个

Worker节点启动一定数目的线程进行计算,线程数task.shard.max-threads=availableProcessors() * 4,在config中配置。

每个空闲的线程从队列中取出一个PrioritizedSplitRunner对象执行,如果执行完成一个周期,超过最大执行时间1秒钟,判断任务是否执行完成,如果完成,从allSplits队列中删除,如果没有,则放回pendingSplits队列中。

每个任务的执行流程如下图右侧,依次遍历所有Operator,尝试从上一个Operator取一个Page对象,如果取得的Page不为空,交给下一个Operator执行。

节点间流水线计算

下图是ExchangeOperator的执行流程图,ExchangeOperator为每一个Split启动一个HttpPageBufferClient对象,主动向上一个Stage的Worker节点拉数据,数据的最小单位也是一个Page对象,取到数据后放入Pages队列中



本地化计算

Presto在选择Source任务计算节点的时候,对于每一个Split,按下面的策略选择一些minCandidates

优先选择与Split同一个Host的Worker节点

如果节点不够优先选择与Split同一个Rack的Worker节点

如果节点还不够随机选择其他Rack的节点

对于所有Candidate节点,选择assignedSplits最少的节点。

动态编译执行计划

Presto会将执行计划中的ScanFilterAndProjectOperator和FilterAndProjectOperator动态编译为Byte Code,并交给JIT去编译为native代码。Presto也使用了Google Guava提供的LoadingCache缓存生成的Byte Code。





上面的两段代码片段中,第一段为没有动态编译前的代码,第二段代码为动态编译生成的Byte Code反编译之后还原的优化代码,我们看到这里采用了循环展开的优化方法。

循环展开最常用来降低循环开销,为具有多个功能单元的处理器提供指令级并行。也有利于指令流水线的调度。

小心使用内存和数据结构

使用Slice进行内存操作,Slice使用Unsafe#copyMemory实现了高效的内存拷贝,Slice仓库参考:https://github.com/airlift/slice

Facebook工程师在另一篇介绍ORCFile优化的文章中也提到使用Slice将ORCFile的写性能提高了20%~30%,参考:https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/

类BlinkDB的近似查询

为了加快avg、count distinct、percentile等聚合函数的查询速度,Presto团队与BlinkDB作者之一Sameer Agarwal合作引入了一些近似查询函数approx_avg、approx_distinct、approx_percentile。approx_distinct使用HyperLogLog Counting算法实现。

GC控制

Presto团队在使用hotspot java7时发现了一个JIT的BUG,当代码缓存快要达到上限时,JIT可能会停止工作,从而无法将使用频率高的代码动态编译为native代码。

Presto团队使用了一个比较Hack的方法去解决这个问题,增加一个线程在代码缓存达到70%以上时进行显式GC,使得已经加载的Class从perm中移除,避免JIT无法正常工作的BUG。

presto的优点

Ad-hoc,期望查询时间秒级或几分钟

比Hive快10倍

支持多数据源,如Hive、Kafka、MySQL、MonogoDB、Redis、JMX等,也可自己实现Connector

Client Protocol: HTTP+JSON, support various languages(Python, Ruby, PHP, Node.js Java)

支持JDBC/ODBC连接

ANSI SQL,支持窗口函数,join,聚合,复杂查询等

presto的特点(实现低延迟的原因)

完全基于内存的并行计算

流水线式计算作业

本地化计算

动态编译执行计划

GC控制

与Hive运行对比



上图显示了MapReduce与Presto的执行过程的不同点,MR每个操作要么需要写磁盘,要么需要等待前一个stage全部完成才开始执行,而Presto将SQL转换为多个stage,每个stage又由多个tasks执行,每个tasks又将分为多个split。所有的task是并行的方式进行允许,stage之间数据是以pipeline形式流式的执行,数据之间的传输也是通过网络以Memory-to-Memory的形式进行,没有磁盘io操作。这也是Presto性能比Hive快很多倍的决定性原因。

与hive、SparkSQL结果对比图



使用场景

Presto 支持 SQL 并提供了一个标准数据库的语法特性,但其不是一个通常意义上的关系数据库,他不是关系数据库,如 MySQL、PostgreSQL 或者 Oracle 的替代品。Presto 不是设计用来解决在线事物处理(OLTP)。

Presto 是一个工具,被用来通过分布式查询来有效的查询大量的数据。Presto 是一个可选的工具,可以用来查询 HDFS,通过使用 MapReduce 的作业的流水线,例如 hive,pig,但是又不限于查询 HDFS 数据,它还能查询其他的不同数据源的数据,包括关系数据库以及其他的数据源,比如 cassandra。

Presto 被设计为处理数据仓库和分析:分析数据,聚合大量的数据并产生报表,这些场景通常被定义为 OLAP。

谁在使用presto

国外:

Facebook,Presto 的开发者

国内:

腾讯

美团,Presto实现原理和美团的使用实践

窝窝团,#数据技术选型#即席查询Shib+Presto,集群任务调度HUE+Oozie

京东

presto的缺点

前面介绍了Presto的各种优点,其实其也有一些缺点,主要缺点为以下三条:

No fault tolerance;当一个Query分发到多个Worker去执行时,当有一个Worker因为各种原因查询失败,那么Master会感知到,整个Query也就查询失败了,而Presto并没有重试机制,所以需要用户方实现重试机制。

Memory Limitations for aggregations, huge joins;比如多表join需要很大的内存,由于Presto是纯内存计算,所以当内存不够时,Presto并不会将结果dump到磁盘上,所以查询也就失败了,但最新版本的Presto已支持写磁盘操作,这个待后续测试和调研。

MPP(Massively Parallel Processing )架构;这个并不能说其是一个缺点,因为MPP架构就是解决大量数据分析而产生的,但是其缺点也很明显,假如我们访问的是Hive数据源,如果其中一台Worker由于load问题,数据处理很慢,那么整个查询都会受到影响,因为上游需要等待上游结果。

参考链接:

http://tech.dianwoda.com/2016/10/20/unt/

http://armsword.com/2017/12/05/presto/

https://www.bbsmax.com/A/l1dyyE9gde/

https://tech.meituan.com/presto.html

http://armsword.com/2017/12/05/presto/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: