百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

流式数据库 KSQL 概念详解 (二)

wptr33 2024-12-08 19:12 26 浏览

联接 Join 集合

可以使用具有SQL连接语法的JOIN语句,使用ksqlDB实时合并事件流。ksqlDB连接和关系数据库连接的相似之处在于,它们都基于通用值组合了来自两个或多个源的数据。ksqlDB连接的结果是一个新的流或表,其中填充了您在SELECT语句中指定的列值。

使用ksqlDB,无需围绕连接流和表编写低级逻辑,因此可以专注于用于组合流数据的业务逻辑。

可以通过以下方式联接流和表:

    • 连接多个流以创建新的流。
    • 连接多个表以创建一个新表。
    • 连接多个流和表以创建一个新的流。

Join 语句

ksqlDB JOIN子句具有SQL JOIN子句的熟悉语法。以下示例创建一个pageviews_enriched流,该流是pageviews流和users表的组合:

CREATE STREAM pageviews_enriched AS
  SELECT 
     users.userid AS userid, 
     pageid, 
     regionid, 
     gender 
  FROM pageviews
    LEFT JOIN users ON pageviews.userid = users.userid
  EMIT CHANGES;

当连接两个流时,必须指定WITHIN子句以匹配在指定时间间隔内都出现的记录。有关有效时间单位,请参见时间单位。

这里有一个例子流,信息流加入结合orders,payments与shipments流。结果shipped_orders流包含下订单后1小时内支付的所有订单,并在收到付款后2小时内发货。

CREATE STREAM shipped_orders AS
     SELECT 
        o.id as orderId 
        o.itemid as itemId,
        s.id as shipmentId,
        p.id as paymentId
     FROM orders o
        INNER JOIN payments p WITHIN 1 HOURS ON p.id = o.id
        INNER JOIN shipments s WITHIN 2 HOURS ON s.id = o.id;

Join和Windows

ksqlDB允许将具有相同键的记录进行分组,以便将有状态操作(例如联接)组合到windows中。您为窗口指定保留期,此保留期控制ksqlDB等待无序记录的时间。如果记录在窗口的保留期过后到达,则该记录将被丢弃,并且不会在该窗口中进行处理。

注意:仅对流连接流支持窗口。

每个记录键都跟踪Windows。在联接操作中,ksqlDB使用窗口状态存储区将到目前为止收到的所有记录存储在定义的窗口边界内。在指定的窗口保留期之后,将清除状态存储中的旧记录。

Join 需求

ksqlDB应用程序必须满足特定的需求,Join才能成功。

共分区的数据

连接时必须对输入数据进行共分区。这样可以确保在处理过程中,从连接的两侧将具有相同键的输入记录传递到同一流任务。联接时,用户有责任确保数据进行分区。

Join功能

ksqlDB支持大量的流和表联接操作,包括INNER,LEFT OUTER和FULL OUTER。通常,LEFT OUTER缩短为LEFT JOIN,而FULL OUTER缩短为OUTER JOIN。

不支持RIGHT OUTER JOIN。而是交换操作数并使用LEFT JOIN。

下表显示了支持的组合。


类型

INNER

LEFT OUTER

FULL OUTER

流-流

窗口式的

支持的

支持的

支持的

表-表

非窗口式

支持的

支持的

支持的

流-表

非窗口式

支持的

支持的

不支持

流流联接

ksqlDB支持流之间的INNER,LEFT OUTER和FULL OUTER连接。

所有这些操作都支持乱序记录。

要加入两个流,必须使用WITHIN子句指定一个窗口方案。一侧的新输入记录为另一侧的每个匹配记录生成联接输出,并且联接窗口中可以有多个此类匹配记录。

连接仅在将流标记为要重新分区时才对流进行数据重新分区。如果两个流都被标记,则都将被重新分区。

重要的提示

Kafka保证来自一个源分区的任何两个消息的相对顺序,前提是它们在重新分区后也都位于同一个分区中。否则,Kafka可能会交错插入事件。用例将确定这些订购保证是否可接受。

LEFT OUTER联接将在结果流中包含leftRecord-NULL记录,这意味着该联接包含从没有进行匹配的右侧流中选择的字段的NULL值。

FULL OUTER联接将在结果流中包含leftRecord-NULL或NULL-rightRecord记录,这意味着联接中包含来自未进行匹配的流的字段的NULL值。

流流连接的语义

下表显示了各种流-流连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:

  • 所有记录具有相同的键。
  • 所有记录都属于一个连接窗口。
  • 所有记录均按时间戳顺序处理。

收到新输入后,将在表中列出的条件下触发联接。具有NULL键或NULL值的输入记录将被忽略,并且不会触发联接。

时间戳记

左流

右流

内部联接 Inner

左联接 left

右联接 right

1个

Null





2个


Null




3

A



[A,Null]

[A,Null]

4


一个

[A,a]

[A,a]

[A,a]

5

B


[B,a]

[B,a]

[B,a]

6


b

[A,b],[B,b]

[A,b],[B,b]

[A,b],[B,b]

7

Null





8


Null




9

C


[C,a],[C,b]

[C,a],[C,b]

[C,a],[C,b]

10


C

[A,c],[B,c],[C,c]

[A,c],[B,c],[C,c]

[A,c],[B,c],[C,c]

11


Null




12

Null





13


Null




14


d

[A,d],[B,d],[C,d]

[A,d],[B,d],[C,d]

[A,d],[B,d],[C,d]

15

D


[D,a],[D,b],[D,c],[D,d]

[D,a],[D,b],[D,c],[D,d]

[D,a],[D,b],[D,c],[D,d]

流表联接

ksqlDB仅支持流和表之间的INNER和LEFT连接。

流表联接始终是非窗口联接。当新记录到达流上时,您可以针对表执行表查找。只有到达流侧的事件才触发下游更新并产生联接输出。表端的更新不会产生更新的联接输出。

流表联接仅在将流标记为要重新分区时才对流进行数据重新分区。

重要提示

ksqlDB当前在时间同步方面提供了最大的努力,但是没有保证,这可能会导致结果丢失或left Record-NULL结果。

流表联接的语义

下表显示了各种流表连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:

  • 所有记录具有相同的键。
  • 所有记录均按时间戳顺序处理。

仅左侧流的输入记录会触发联接。右侧表的输入记录仅更新内部右侧连接状态。

具有NULL值的表的输入记录被解释为对应键的逻辑删除,表示从表中删除了键。逻辑删除不会触发联接。

时间戳记

左流

右表

内部联接

左联接

1

Null




2


Null(墓碑)



3

A



[A,Null]

4


一个



5

B


[B,a]

[B,a]

6


b



7

Null




8


Null(墓碑)



9

C



[C,Null]

10


C



11


Null



12

Null




13


Null



14


d



15

D


[D,d]

[D,d]

请注意,即使表端稍后已填充,如果表端尚未包含键的值,INNER JOIN也不会产生任何输出。对于LEFT JOIN,相同的场景将导致leftRecord-NULL的输出。因此,重要的是接收流事件之前加载表数据。

ksqlDB尝试按事件时间顺序处理连接的两端,但是不能提供有力的保证,尤其是在存在乱序行的情况下。

为了最大程度地提高联接的可预测性,请确保源主题中提供了历史表数据,查询正在运行,并且ksqlDB开始生成流之前有足够的时间来处理表数据。

表表联接

ksqlDB支持表之间的INNER,LEFT OUTER和FULL OUTER连接。不支持与多个记录匹配的联接(一对多)。

表-表联接始终是非窗口联接。

表-表联接最终是一致的。

重要提示

ksqlDB当前在时间同步方面提供了最大的努力,但是没有保证,这可能会导致结果丢失或leftRecord-NULL结果。

表-表联接只能在其PRIMARY KEY字段上联接,并且不支持一对多(1:N)联接。

表-表联接的语义

下表显示了各种表-表连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:

  • 所有记录具有相同的键。
  • 所有记录均按时间戳顺序处理。

具有NULL值的输入记录被解释为相应键的逻辑删除,表示从表中删除了键。逻辑删除不会触发联接。如果接收到输入逻辑删除,则输出逻辑删除将直接转发到连接结果表(如果连接结果表中已经存在相应的键)。

时间戳记

左表

右表

内部联接

左联接

外连接

1

Null(墓碑)





2


Null(墓碑)




3

A



[A,Null]

[A,Null]

4


一个

[A,a]

[A,a]

[A,a]

5

B


[B,a]

[B,a]

[B,a]

6


b

[B,b]

[B,b]

[B,b]

7

Null(墓碑)


Null(墓碑)

Null(墓碑)

[null,b]

8


Null(墓碑)



Null(墓碑)

9

C



[C,Null]

[C,Null]

10


C

[C,c]

[C,c]

[C,c]

11


Null(墓碑)

Null(墓碑)

[C,Null]

[C,Null]

12

Null(墓碑)



Null(墓碑)

Null(墓碑)

13


Null(墓碑)




14


d



[n,d]

15

D


[D,d]

[D,d]

[D,d]

N向联接 Join

ksqlDB支持在单个语句中连接两个以上的源。这些连接在语义上等效于连续连接N个源,并且连接的顺序由写入连接的顺序控制。

以下面的查询为例,其中A是事件流,B并且C都是表:

CREATE STREAM joined AS 
  SELECT * 
  FROM A
    JOIN B ON A.id = B.product_id
    JOIN C ON A.id = C.purchased_id;

该查询的输出是流,中间连接结果将是stream A ? B。如果C是流而不是表,那么您将通过添加一个WITHIN子句来相应地重写连接,因为A ? Bwith与C是流-流连接:

CREATE STREAM joined AS 
  SELECT * 
  FROM A
    JOIN B ON A.id = B.product_id
    JOIN C WITHIN 10 SECONDS ON A.id = C.purchased_id;

N向联接的局限性

前面各节中对N向联接的每个中间步骤所描述的限制和限制。例如,FULL OUTER不支持流和表之间的联接。这意味着,如果N向联接中的任何阶段都解析为FULL OUTER流和表之间的联接,则整个查询将失败:

--- This JOIN fails with the following exception:
--- Join between invalid operands requested: left type: KTABLE, right type: KSTREAM
CREATE STREAM joined AS 
  SELECT * 
  FROM A
    JOIN B WITHIN 10 SECONDS ON A.id = B.product_id
    FULL OUTER JOIN C ON A.id = C.purchased_id;

分区要求

使用ksqlDB联接流数据时,必须确保流和表是共分区的,这意味着联接两侧的输入记录的分区配置都相同。

要连接两个数据源,流或表,ksqlDB需要根据连接列比较它们的记录。为确保具有相同联接列的记录在同一流任务上共置一处,联接列必须与源分区所在的列重合。

键 Keys

表始终按其分区PRIMARY KEY,而ksqlDB不允许对表进行重新分区,这意味着您只能将表的主键用作连接列。

流没有主键,但是有一个可选的 KEY 列。当存在KEY列时,定义了分区列。

流允许对除键列之外的表达式进行联接。当连接条件与KEY列不同时,ksqlDB在内部对流进行重新分区,这将隐式定义正确的键和分区。

重要提示

Kafka保证来自一个源分区的任何两个消息的相对顺序,前提是它们在重新分区后都位于同一个分区中。否则,Kafka可能会交错插入事件。用例将确定这些订购保证是否可接受。

以下示例显示了一个users表,该表clicks在点击的userId列上与流连接在一起。该users表具有id相同SQL类型的正确主键。该clicks流没有定义的键,因此ksqlDBuserId在执行连接之前在连接列()上对其内部重新分区以分配键。

-- clicks stream, with no or unknown key.
-- the schema of stream clicks is: USERID BIGINT | URL STRING
CREATE STREAM clicks (
    userId BIGINT, 
    url STRING
  ) WITH (
    kafka_topic='clickstream', 
    value_format='json'
  );

-- users table, with userId primary key. 
-- the schema of table users is: USERID BIGINT PRIMARY KEY | FULLNAME STRING
CREATE TABLE users (
    id BIGINT PRIMARY KEY, 
    fullName STRING
  ) WITH (
    kafka_topic='users', 
    value_format='json'
  );

-- join of users table with clicks stream, joining on the table's primary key and the stream's userId column:
-- join will automatically repartition clicks stream:
SELECT 
  c.userId,
  c.url, 
  u.fullName 
FROM clicks c
  JOIN users u ON c.userId = u.id;

共分区要求

使用ksqlDB联接流数据时,必须确保流和表是共分区的,这意味着联接两侧的输入记录的分区配置都相同。

  • 联接的输入记录必须具有相同的键架构。
  • 输入记录的两侧必须具有相同数量的分区。
  • 连接的两端必须具有相同的分区策略。

当您对输入进行共同分区时,在连接过程中,来自连接两侧的具有相同键的记录将被传递到同一流任务。

记录具有相同的键Schema

为了使联接生效,两端的键必须具有相同的SQL类型。

例如,您可以STRING将以用户ID为关键字的用户点击流与也以STRING用户ID为键的用户配置文件表结合在一起。双方具有完全相同的用户ID的记录将被合并。

如果您希望加入的列的架构不匹配,则CAST一侧可能会匹配另一侧。例如,如果INT联接的一侧有一个userId列,而另一侧是a LONG,那么您可以选择将INT一侧转换为a LONG:

-- stream with INT userId
CREATE STREAM clicks (
    userId INT KEY, 
    url STRING
  ) WITH (
    kafka_topic='clickstream', 
    value_format='json'
  );

-- table with BIGINT id stored in the key:
CREATE TABLE users (
    id BIGINT PRIMARY KEY, 
    fullName STRING
  ) WITH (
    kafka_topic='users', 
    value_format='json'
  );

-- Join utilising a CAST to convert the left sides join column to match the rights type.
SELECT 
  clicks.url, 
  users.fullName 
FROM clicks 
  JOIN users ON CAST(clicks.userId AS BIGINT) = users.id;

在现有Kafka主题之上创建的表(例如,使用CREATE TABLE语句创建的表)将根据Kafka主题中记录的键中所保存的数据进行键入。ksqlDB在PRIMARY KEY列中显示此数据。

在ksqlDB内部从其他源创建的表(例如,使用CREATE TABLE AS SELECT语句创建的表)将从其源复制键,除非存在显式GROUP BY或JOIN子句,否则该显式或子句可以更改键入该表的内容。

注意

如果联接需要,则ksqlDB会自动对流进行重新分区,但是ksqlDB会拒绝不是键的表列上的任何联接。这是因为ksqlDB不支持外键上的联接,并且对表的主题进行重新分区有可能对事件进行重新排序并错误地解释逻辑删除,这可能导致意外或意外的副作用。

如果在多个联接中使用相同的源,并且需要对数据进行重新分区,则您可能希望手动重新分区,以避免ksqlDB多次重新分区。

要对流进行重新分区,请使用PARTITION BY子句。请注意,只有在重新分区之后,Kafka才能保证来自一个源分区的任何两个消息的相对顺序,它们也都位于同一个分区中。否则,Kafka可能会交错插入邮事件。用例将确定这些订购保证是否可接受。

重要提示

如果PARTITION BY表达式的计算结果为NULL,则结果行将产生一个随机分区。您可能需要使用COALESCE来包装表达式并将所有NULL值转换为默认值,例如PARTITION BY COALESCE(MY_UDF_THAT_MAY_FAIL(Col0), 0)。

例如,如果您需要对要由product_id字段进行键控的流进行重新分区,并且需要将键分布在6个分区上才能进行联接,请使用以下SQL语句:

CREATE STREAM products_rekeyed 
  WITH (PARTITIONS=6) AS 
  SELECT * 
   FROM products
   PARTITION BY product_id;


记录具有相同数量的分区

联接的输入记录两侧必须具有相同数量的分区。

ksqlDB会检查这部分分区需求,并拒绝分区计数不同的任何连接。

使用DESCRIBE EXTENDED <source name>CLI中的命令确定源下的Kafka主题,并使用CLI中的SHOW TOPICS命令列出主题及其分区数。

如果联接的两面具有不同的分区数,则可能要更改源主题的分区数,或重新分配一面以匹配另一面的分区数。

以下示例创建一个重新分区的流,并使用指定数量的分区来维护现有键。

CREATE STREAM products_rekeyed 
  WITH (PARTITIONS=6) AS 
  SELECT * FROM products;

记录具有相同的分区策略

连接两侧的记录必须具有相同的分区策略。如果在所有应用程序中使用默认分区程序设置,并且生产者未指定显式分区,则无需担心分区策略。

但是,如果您的记录的生产者应用程序在配置中指定了自定义分区程序,则必须对联接两侧的记录使用相同的自定义分区程序逻辑。写入联接输入的应用程序必须具有相同的分区策略,以便具有相同键的记录将传递到相同的分区号。

这意味着输入记录必须在连接的两侧都位于同一分区中。例如,在流表联接中,如果具有userId键值的键alice123在流的分区1中,但alice123在表的分区2中,则即使两端都由键键入,联接也不会匹配userId。

ksqlDB无法验证两个连接输入的分区策略是否相同,因此必须确保这一点。

该DefaultPartitioner类实现了以下的分区策略:

  • 如果生产者在记录中指定了分区,请使用它。
  • 如果生产者指定键而不是分区,请根据键的哈希值选择一个分区。
  • 如果生产者未指定分区或键,请以循环方式选择一个分区。

自定义分区程序类实现了Partitioner接口,并在生产者配置属性中分配partitioner.class。


综合键列

某些联接的结果中有一个合成键列。此列并非来自任何来源。这是一个示例,可以帮助解释什么是合成键列以及为什么需要它们:

CREATE TABLE OUTPUT AS
  SELECT * FROM L FULL OUTER JOIN R ON L.ID = R.ID;

前面的语句似乎很简单:创建一个新表,该表是对两个源表进行完全外部联接并在它们的ID列上联接的结果。但是在完全外部联接中,一个L.ID或R.ID可能会丢失(NULL),或者两个都可能具有相同的值。由于生成给ApacheKafka?的数据应始终具有非空消息键,因此ksql选择要使用的第一个非空键:

L.ID

R.ID

Kafka消息键

10

null

10

null

7

7

8

8

8

Kafka消息键中存储的数据可能与两个源ID列都不匹配。相反,它是一个新列:合成列,这意味着该列不属于任何一个源表。

哪些联接导致合成键列?

结果中的键列与任何源列都不匹配的任何联接都称为具有合成键列。

下列类型的联接会导致将合成键列添加到结果模式:

  1. FULL OUTER JOIN,例如:
    sql CREATE TABLE OUTPUT AS SELECT * FROM L FULL OUTER JOIN R ON L.ID = R.ID;
  2. 在联接ON条件中使用的所有表达式都不是简单列引用的任何联接。例如:
    sql -- join on expressions other than column references:
    CREATE TABLE OUTPUT AS SELECT * FROM L JOIN R ON ABS(L.ID) = ABS(R.ID);

合成键列分配了什么名称?

合成键列的默认名称为ROWKEY。但是,如果联接中使用的任何源已经包含名为的列ROWKEY,则合成键列的名称为ROWKEY_1,或者ROWKEY_2存在存在名为的源列ROWKEY_1等。

-- given sources:
CREATE STREAM S1 (ROWKEY INT KEY, V0 STRING) WITH (...);
CREATE STREAM S2 (ID INT KEY, ROWKEY_1 INT) WITH (...);

CREATE STREAM OUTPUT AS
  SELECT * 
  FROM S1 JOIN S2 
  WITHIN 30 SECONDS 
  ON ABS(S1.ROWKEY) = ABS(S2.ID);

-- result in OUTPUT with synthetic key column name: ROWKEY_2

与其他任何键列一样,合成键列必须包含在流查询的投影中。如果投影缺少合成键,则将返回类似以下的错误,指示丢失的键列的名称:

Key missing from projection.
The query used to build `OUTPUT` must include the join expression ROWKEY in its projection.
ROWKEY was added as a synthetic key column because the join criteria did not match any source column. This expression must be included in the projection and may be aliased. 

(可选)可以为投影中的键列提供别名。推荐这样做,因为不能保证系统生成的名称在版本之间保持一致。例如:

CREATE STREAM OUTPUT AS
   SELECT ROWKEY AS ID, S1.C0, S2.C1 FROM S1 FULL OUTER JOIN S2 ON S1.ID = S2.ID;

相关推荐

MySQL进阶五之自动读写分离mysql-proxy

自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...

Postgres vs MySQL_vs2022连接mysql数据库

...

3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?

引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...

一文由浅入深带你完全掌握MySQL的锁机制原理与应用

本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...

验证Mysql中联合索引的最左匹配原则

后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...

MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)

目录1.索引基础...

你会看 MySQL 的执行计划(EXPLAIN)吗?

SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...

MySQL 从入门到精通(四)之索引结构

索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...

mysql总结——面试中最常问到的知识点

mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...

mysql总结——面试中最常问到的知识点(2)

首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...

MySQL基础全知全解!超详细无废话!轻松上手~

本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...

深入剖析 MySQL 中的锁机制原理_mysql 锁详解

在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...

Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析

引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...

MySQL基础篇:DQL数据查询操作_mysql 查

一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...

MySql:索引的基本使用_mysql索引的使用和原理

一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...