在漫长的数仓建设过程中,实时数仓与离线数仓分别由不同的团队进行独立建设,有大而广的离线数仓体系,也有需要追求业务时效,需要建设实时数仓。然而,业务数据需求和数据产品需求中,往往需要把实时数据与离线数据结合在一起进行比对和分析,但是这两个天然不一样的数据存储和计算结构,需要同时开发两套数据模型。在数据处理过程中,实时数仓需要使用Blink/Flink 处理,离线需要写ODPS SQL处理,还有在线计算模型,需要开发java代码处理。
创新互联建站凭借在网站建设、网站推广领域领先的技术能力和多年的行业经验,为客户提供超值的营销型网站建设服务,我们始终认为:好的营销型网站就是好的业务员。我们已成功为企业单位、个人等客户提供了成都网站制作、网站设计、外贸网站建设服务,以良好的商业信誉,完善的服务及深厚的技术力量处于同行领先地位。
如上图所示,实时数据与离线数据在存储层、计算层、服务层,都是割裂分离、独立建设的。实时数据,有着增量式计算的特性,需要快速的流转与计算,它主要以DataHub、Flink、Hbase等异构系统做为支撑,串联形成一个完整实时计算全链路。而离线数据,是定时、批量计算的特性,由存储计算统一的ODPS系统做为支撑。它们除了计算链路的差异,还有着数据处理的逻辑差异:
面对三种计算模式,较低的研发效率、不可控的数据质量,以及臃肿数据接口服务的困境,三端(流计算、批计算、在线计算)一体计算的想法自然油然而生,在20年做价格力业务时候,我就一直思考有什么解法,其实,这个也是大数据架构经常面临的问题,业界达成共识,可以归纳两种方案:
同一个引擎承载流、批两种计算模式,在流计算模式下进行实时数据计算,在批计算模式下进行离线数据计算。
流批一体计算的典型架构是:Flink + Kappa架构。Flink可以实现基于SQL的流批一体的计算表达,复杂计算通过Java应用承接,价格力计算架构就是典型这种架构,但是这种架构存在以下问题:
Flink解决了流计算、批计算一体的能力,这两种都是异步处理,只是时效不同而已,但没有解决在线计算的能力,如果要提供在线计算能力,不得不在以下两个方案选择:
Flink实现批处理,其实是有点一厢情愿,为啥这么说,因为其吞吐规模,跟MR批计算(ODPS)完全不是一个量级,如果Flink真能实现和ODPS完全对等的吞吐规模和资源成本,那完全不需要ODPS什么事了,但现实是,对于一些只有批量处理场景的(比如特征预处理、统计计算),ODPS仍然是第一优先选择,只有当面临流批同时存在的场景时候,并且对批处理规模要求不大时候,Flink的确提供非常不错的一体化解决方案。
同一SQL代码通过自动化转义,翻译到流计算引擎和批计算引擎上进行流、批计算,也包括翻译到HSF接口代码,提供在线交互计算能力。
Flink的流批一体架构非常优秀,能解决90%的流批一体问题,但不幸的是,我们有些业务场景(典型的价格计算场景),远不是Flink写写SQL可以解决的:
淘系价格计算引擎,以Flink + Kappa为核心的数据架构,关于这种数据架构演进,可以参考我其他文章,三种计算模式的叠加是价格服务计算引擎的常态模式,他们都在各自核心计算发挥自己最大的优势:
那么如何整合这3个不同计算架构,Flink提出一个引擎承接所有计算模式,也就是Flink的流批一体引擎,但这带来的问题就是,不同计算模式,底层的引擎本身就很难完全周全到,与其去统一计算引擎,为何不统一表达和调度,而把真正的计算下放到各自计算引擎,这就是Unify Engine的核心思想。
在实现三端一体化时候,有个核心技术难点,就是SQL引擎,很多数据产品都自带自己的SQL引擎,Flink内部有SQL引擎,ODPS内部有C++实现的SQL引擎,Hive也有,Mysql内部也有SQL解析引擎,这些SQL引擎都高度集成到各自的存储和计算里,如果你说要找个独立的可用在Java环境的SQL引擎,市面上有是有,不过要么是非常复杂的calcite sql引擎,要么是非常简单的select * 简易sql引擎,能做的事情非常少,开箱即用的几乎没有。但Unify SQL引擎又是实现三端一体化的核心组件,没有它,其他什么事情都无从谈起。从无设计一个SQL引擎成本是非常高的,其中不说复杂的语法解析,生成AST语法树,就单单SQL逻辑计划优化,就是非常复杂,幸运的是,业界是存在一个可以二次开发的SQL引擎,就是calcite SQL引擎,其实,很多SQL引擎都是基于calcite二次开发的,比如Flink、Spark内部的SQL解析引擎就是基于calcite二次开发的,我们设计的SQL引擎也是基于calcite的。Calcite 使用了基于关系代数的查询引擎,聚焦在关系代数的语法分析和查询逻辑的规划,通过calcite提供的SQL API(解析、验证等)将它们转换成关系代数的抽象语法树,并根据一定的规则或成本估计对AST关系进行优化,最后进一步生成ODPS/Flink/Java环境可以理解的执行代码。calcite的主要功能:
至于calcite的比较详细的原理,可以详解:Apache Calcite 处理流程详解(地址:https://xie.infoq.cn/article/1df5a39bb071817e8b4cb4b29),这里不详解了。有了calcite,解决了SQL->逻辑树,但是真正执行SQL计算的,还需要进一步将逻辑数转换成物理执行树(Physical Exec DAG),在这个DAG,是包含可执行的Java代码(JavaCode)片段,最后下发到不同执行环境,会被进一步串联可被环境执行的链路,比如在ODPS环境,会生成MR代码,在Flink环境,会被转换成Stream Operator,在Java环境,会被转换成Collector Chain,在Spring环境,会被转换成Bean组件。
PS:如果你们看过Flink源码,对上面流程会非常眼熟,是的,Unify SQL Engine不是从头设计的,是基于Flink 1.12源码魔改的,其中Parse和下面要说的Codegen技术都是直接参考了Flink设计,当然说是魔改的,就是还有大量代码需要基于上面做二次开发,比如从执行DAG到各个环境真正可执行的MR/Bean/Stream。
在SQL解析后,经过逻辑优化器和物理优化器,产生的PhyscialRel物理计划树,包含大量的复杂数据逻辑处理,比如SQL常见的CASE WHEN语句,常见的做法是给所有符号运算定义个父类(比如ExecNode),实际运行时,委派给真实的子类运行,这涉及到大量虚拟函数表的搜寻,最终这种分支指令一定程度阻止指令的管道化和并行执行,导致这种搜寻成本比函数本身执行成本还高。
Codegen技术就是专门针对这样的场景孕育而生,行业做的比较出色的Codegen技术,有LLVM和Janino,LLVM主要针对编译器,而Java的代码codegen通常使用Janino,Janino做为一种小巧快速的Java编译器,不仅能像Javac将一组java文件编译成Class文件,也可以将Java表达式、语句块、类定义块或者Java文件进行编译,直接加载成ByteCode,并在同一个JVM里进行运行。
Unify SQL Engine也使用Janino用来做CodeGen技术,并有效地提升代码的执行效率。关于Janino更多内容,可以参考这篇文章:Java CodeGen编译器Janino(地址:https://zhuanlan.zhihu.com/p/407857568)。这里有采用Codegen和不采用Codegen的技术性能对比:
表达式 |
100*x+20/2 |
(x+y)(xx+y)/(x-y)100/(xy) |
Node树遍历执行 |
10ms |
88ms |
Janino生成代码执行 |
6ms |
9ms |
可以看出当表达式越复杂时,使用Janino的效果就会体现越明显。
通常计算分为无状态计算和有状态计算,无状态计算一般是过滤、project映射,其每次计算依赖当前数据上下文,相互独立的,不依赖前后数据,因此,不需要有额外的存储保存中间计算结果或者缓存数据,但还有一类是有状态计算,除了当前数据上下文,还需要依赖之前计算的中间态数据,典型的比如:
可见,当需要进行有状态计算,需要有后背存储来承载中间状态结果,Unify SQL Engine是支持3种后背存储:内存、Redis和Hbase:
Flink是可以支持双流Join,但是Flink的双流Join的语义完全照搬了SQL的JOIN语义,就是一边的数据会和另一边的所有数据JOIN,这个对于离线分析没有任何问题,但是对于实时计算是会存在重复计算,在有些场景还有损业务逻辑,比如:当订单流去双流JOIN优惠表的时候,就会出现这个问题,优惠表的数据是会不停变化的,但是我们希望以快照数据做为JOIN的依据,而不是把优惠变更的数据都复现一遍,Unify SQL Engine是做到后者语义的,也就是SNAPSHOT JOIN,也是业务场景常见的语义:
Unify SQL Engine现在已经可以做到将SQL翻译成不同执行环境可运行的任务,通过Unify SQL统一表达了不同环境的逻辑计算,但是离最终我们期望的还很远,其中一点就是要做到统一调度和分配,现在不同环境的协调是需要开发者自己去分配和调度,比如哪些计算需要下发到ODPS MR计算,哪些是在Java环境运行,未来我们希望这些分配也是可以做到统一调度和运行,包括全量和增量计算的自动协同,离线和在线数据协同等
通过Unify SQL Engine,开发者可以自己选择底层的计算引擎,对于数据量较大但对时效要求不高的场景,可以选择在ODPS计算,对于时效有要求同时数据规模可接受内,可以选择在Flink调度,对于计算逻辑复杂,需要大量依赖HSF接口,可以选择在Java环境启动,选择自己最容易接受的资源和成本,承接其计算语义。
同时,也是希望通过Unify SQL Engine最大化的利用计算资源,比如Java应用,很多情况下是空闲状态的,CPU利用率是比较低下的,比如一些流计算可以下发到这些空闲的应用,并占用非常小的CPU(比如5%以内),整体的资源利用率就提升了,还比如,Flink计算资源是比较难申请,那么可以选择在Java环境里计算(Java相比Flink环境缺乏一些特性,比如Exactly once语义)等等。
我们是大淘宝技术营销工具团队,承担天猫各种大促活动,开发面向消费者端/商家端营销产品,负责双11核心玩法、价格管控、权益发放的核心业务团队。这里覆盖全域营销产品,拥有亿级用户规模、千万商家规模,这里充满技术挑战,有复杂的业务场景、T级大规模的数据处理,千万级的QPS、秒级亿数据实时处理等。
当前文章:三端一体计算方案:UnifySQLEngine
网页路径:http://www.csdahua.cn/qtweb/news38/21388.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网