扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章给大家介绍如何进行数据湖deltalake流表的读写,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
我们提供的服务有:网站制作、成都网站设计、微信公众号开发、网站优化、网站认证、勃利ssl等。为成百上千家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的勃利网站制作公司
delta lake和 spark structured streaming可以深度整合。delta lake克服了很多常见的与流系统和文件整合带来的相关限制,如下:
保证了多个流(或并发批处理作业)的仅一次处理。
当使用文件作为流源时,可以有效地发现哪些文件是新文件。
1. 作为stream source
1.1 案例讲解
当你的structured streaming使用delta lake作为stream source的时候,应用会处理delta 表中已有的数据,以及delta 表新增的数据。
spark.readStream.format("delta").load("/delta/events")
也可以做一些优化,如下:
a.通过maxFilesPerTrigger配置控制structured streaming从delta lake加载的微批文件数。要知道Structured streaming也是微批的概念。该参数就是控制每次trigger计算的最大新增文件数,默认是1000,实际情况要根据数据量和资源数量进行控制。
b.通过maxBytesPerTrigger控制每次trigger处理的最大数据量。这是设置一个“ soft max”,这意味着一个批处理大约可以处理此数量的数据,并且可能处理的数量超出这个限制。如果使用的是Trigger.Once,则 此配置无效。如果将此配置与maxFilesPerTrigger结合使用,两个参数任意一个达到临届条件,都会生效。
1.2 忽略更新和删除
structured streaming不处理不是追加的输入数据,并且如果对作为source的delta table的表进行了任何修改,则structured streaming会抛出异常。 对于变更常见的企业场景,提供了两种策略,来处理对delta 表变更给structured streaming 任务造成的影响:
可以删除输出和checkpoint,并重新启动structured streaming对数据计算,也即是重新计算一次。
可以设置以下两个选项之一:
ignoreDeletes:忽略在分区表中删除数据的事务。
ignoreChanges:如果由于诸如UPDATE,MERGE INTO,DELETE(在分区内)或OVERWRITE之类的数据更改操作而不得不在源表中重写文件,则重新处理更新的文件。因此未更改的行仍可能会处理并向下游传输,因此structured streaming的下游应该能够处理重复数据。删除不会传输到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,则流不会因源表的删除或更新而中断。
1.3 案例
假设有一张表叫做user_events,有三个字段:date,user_email,action,而且该表以date字段进行分区。structured streaming区处理这张表,且还有其程序会对该delta 表进行插入和删除操作。
假设仅仅是删除操作,可以这么配置stream:
events.readStream .format("delta") .option("ignoreDeletes", "true") .load("/delta/user_events")
假设对delta表修改操作,可以这么配置stream:
events.readStream .format("delta") .option("ignoreChanges", "true") .load("/delta/user_events")
如果使用UPDATE语句更新了user_email字段某个值,则包含相关user_email的文件将被重写,这个是delta lake更改操作实现机制后面会讲。使用ignoreChanges时,新记录将与同一文件中的所有其他未更改记录一起向下游传输。 所以下游程序应该能够处理这些传入的重复记录。
2.delta 表作为sink
delta table可以作为Structured Streaming的sink使用。delta lake的事务日志确保了其能实现仅一次处理。
2.1 append mode
默认是append 模式,仅仅是追加数据到delta 表:
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path
2.2 complete mode
也可以使用Structured Streaming每个批次覆盖一次整张表。在某些聚合场景下会用到该模式:
.format("delta") .load("/delta/events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg") .start("/delta/eventsByCustomer")
对于延迟要求更宽松的应用程序,可以使用Trigger.Once来节省计算资源。once trigger每次处理从开始到最新的数据,典型的kappa模型,很适合这种场景了。
关于如何进行数据湖deltalake流表的读写就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流