百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

Flink SQL 知其所以然(六)| 维表 join 的性能优化之路(上)

wptr33 2025-02-26 14:05 26 浏览

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

  1. 背景及应用场景介绍:博主期望你能了解到,flink sql 提供了轻松访问外部存储的 lookup join(与上节不同,上节说的是流与流的 join)。lookup join 可以简单理解为使用 flatmap 访问外部存储数据然后将维度字段拼接到当前这条数据上面
  2. 来一个实战案例:博主以曝光用户日志流关联用户画像(年龄、性别)维表为例介绍 lookup join 应该达到的关联的预期效果。
  3. flink sql lookup join 的解决方案以及原理的介绍:主要介绍 lookup join 的在上述实战案例的 sql 写法,博主期望你能了解到,lookup join 是基于处理时间的,并且 lookup join 经常会由于访问外部存储的 qps 过高而导致背压,产出延迟等性能问题。我们可以借鉴在 DataStream api 中的维表 join 优化思路在 flink sql 使用 local cache异步访问维表批量访问维表三种方式去解决性能问题。
  4. 总结及展望:官方并没有提供 批量访问维表 的能力,因此博主自己实现了一套,具体使用方式和原理实现敬请期待下篇文章。

2.背景及应用场景介绍

维表作为 sql 任务中一种常见表的类型,其本质就是关联表数据的额外数据属性,通常在 join 语句中进行使用。比如源数据有人的 id,你现在想要得到人的性别、年龄,那么可以通过用户 id 去关联人的性别、年龄,就可以得到更全的数据。

维表 join 在离线数仓中是最常见的一种数据处理方式了,在实时数仓的场景中,flink sql 目前也支持了维表的 join,即 lookup join,生产环境可以用 mysql,redis,hbase 来作为高速维表存储引擎。

Notes:在实时数仓中,常用实时维表有两种更新频率

实时的更新:维度信息是实时新建的,实时写入到高速存储引擎中。然后其他实时任务在做处理时实时的关联这些维度信息。

周期性的更新:对于一些缓慢变化维度,比如年龄、性别的用户画像等,几万年都不变化一次的东西,实时维表的更新可以是小时级别,天级别的。

3.来一个实战案例

来看看在具体场景下,对应输入值的输出值应该长啥样。

需求指标:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。此处我们只关心关联维表这一部分的输入输出数据。

来一波输入数据:



注意:redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。如下图所示:

预期输出数据如下:


flink sql lookup join 登场。下面是官网的链接。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join

4.flink sql lookup join

4.1.lookup join 定义

以上述案例来说,lookup join 其实简单理解来,就是每来一条数据去 redis 里面搂一次数据。然后把关联到的维度数据给拼接到当前数据中。

熟悉 DataStream api 的小伙伴萌,简单来理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中处理每一条来的数据,针对每一条数据去访问用户画像的 redis。(实际上,flink sql api 中也确实是这样实现的!sql 生成的 lookup join 代码就是继承了 flatmap)

4.2.上述案例解决方案

来看看上述案例的 flink sql lookup join sql 怎么写:

CREATE TABLE show_log (
    log_id BIGINT,
    `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
    user_id STRING,
    proctime AS PROCTIME()
)
WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.user_id.length' = '1',
  'fields.log_id.min' = '1',
  'fields.log_id.max' = '10'
);

CREATE TABLE user_profile (
    user_id STRING,
    age STRING,
    sex STRING
    ) WITH (
  'connector' = 'redis',
  'hostname' = '127.0.0.1',
  'port' = '6379',
  'format' = 'json',
  'lookup.cache.max-rows' = '500',
  'lookup.cache.ttl' = '3600',
  'lookup.max-retries' = '1'
);

CREATE TABLE sink_table (
    log_id BIGINT,
    `timestamp` TIMESTAMP(3),
    user_id STRING,
    proctime TIMESTAMP(3),
    age STRING,
    sex STRING
) WITH (
  'connector' = 'print'
);

-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT 
    s.log_id as log_id
    , s.`timestamp` as `timestamp`
    , s.user_id as user_id
    , s.proctime as proctime
    , u.sex as sex
    , u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id

这里使用了 for SYSTEM_TIME as of 时态表的语法来作为维表关联的标识语法。

Notes:实时的 lookup 维表关联能使用处理时间去做关联。

运行结果如下:


flink web ui 算子图如下:

但是!!!但是!!!但是!!!

flink 官方并没有提供 redis 的维表 connector 实现。

没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!

4.3.关于维表使用的一些注意事项

  1. 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 为 1 的数据在 08:00 时 age 由 12-18 变为了 18-24,那么当我们的任务在 08:01 failover 之后从 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
  2. 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据

4.4.再说说维表常见的性能问题及优化思路

所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。

举个例子:

  • 在没有使用维表的情况下:一条数据从输入 flink 任务到输出 flink 任务的时延假如为 0.1 ms,那么并行度为 1 的任务的吞吐可以达到 1 query / 0.1 ms = 1w qps
  • 在使用维表之后:每条数据访问维表的外部存储的时长为 2 ms,那么一条数据从输入 flink 任务到输出 flink 任务的时延就会变成 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1 query / 2.1 ms = 476 qps。两者的吞吐量相差 21 倍

这就是为什么维表 join 的算子会产生背压,任务产出会延迟。

那么当然,解决方案也是有很多的。抛开 flink sql 想一下,如果我们使用 DataStream api,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:

  1. 按照 redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过得 local cache 即可。这样就可以把访问外部存储 2.1 ms 处理一个 query 变为访问内存的 0.1 ms 处理一个 query 的时长。
  2. 异步访问外存:DataStream api 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query。吞吐可变优化到 10 / 2.1 ms = 4761 qps
  3. 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 redis 维表的 1 query 占用 2.1 ms 时长中,其中可能有 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 ms 是 redis server 处理请求的时长。那么我们就可以使用 redis 提供的 pipeline 能力,在客户端(也就是 flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 redis sever。这样就可以把 2.1 ms 处理 1 个 query 变为 7ms(2ms + 50 * 0.1ms) 处理 50 个 query。吞吐可变为 50 query / 7 ms = 7143 qps。博主这里测试了下使用 redis pipeline 和未使用的时长消耗对比。如下图所示。

博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。

既然 DataStream 可以这样做,flink sql 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作

4.5.lookup join 的具体性能优化方案

  1. 按照 redis 维表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做访问 redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 redis 压力。在博主实现的 redis connector 中,内置了 local cache 的实现,小伙伴萌可以参考下面这部篇文章进行配置。
  2. 异步访问外存:目前博主实现的 redis connector 不支持异步访问,但是官方实现的 hbase connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
  3. 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 redis 的批量访问外存优化的功能。

4.6.基于 redis connector 的批量访问机制优化

先描述一下大概是个什么东西,具体怎么用。

你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.modetrue,sql 不用做任何改动的情况下,flink lookup join 算子会自动优化,优化效果如下:

lookup join 算子的每个 task 上,每攒够 30 条数据 or 每隔五秒(处理时间) 去触发一次批量访问 redis 的请求,使用的是 jedis client 的 pipeline 功能访问 redis server。实测性能有很大提升。关于这个批量访问机制的优化介绍和使用方式介绍,小伙伴们先别急,下篇文章会详细介绍到。

相关推荐

MySQL进阶五之自动读写分离mysql-proxy

自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...

Postgres vs MySQL_vs2022连接mysql数据库

...

3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?

引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...

一文由浅入深带你完全掌握MySQL的锁机制原理与应用

本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...

验证Mysql中联合索引的最左匹配原则

后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...

MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)

目录1.索引基础...

你会看 MySQL 的执行计划(EXPLAIN)吗?

SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...

MySQL 从入门到精通(四)之索引结构

索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...

mysql总结——面试中最常问到的知识点

mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...

mysql总结——面试中最常问到的知识点(2)

首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...

MySQL基础全知全解!超详细无废话!轻松上手~

本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...

深入剖析 MySQL 中的锁机制原理_mysql 锁详解

在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...

Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析

引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...

MySQL基础篇:DQL数据查询操作_mysql 查

一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...

MySql:索引的基本使用_mysql索引的使用和原理

一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...