FlinkSQL自定义 redis connector flinksql自定义sink
wptr33 2024-12-17 16:47 19 浏览
导读:一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector
先来看一下官网的一张 connector 架构图:
Metadata
Table API 和 SQL 都是声明式 API。这包括表的声明。因此,执行 CREATE TABLE 语句会导致目标目录中的元数据更新。对于大多数目录实现,不会为此类操作修改外部系统中的物理数据。特定于连接器的依赖项不必出现在类路径中。WITH 子句中声明的选项既不经过验证也不以其他方式解释。动态表的元数据(通过 DDL 创建或由目录提供)表示为 CatalogTable 的实例。必要时,表名将在内部解析为 CatalogTable。
Planning
在规划和优化表程序时,需要将 CatalogTable 解析为 DynamicTableSource(用于在 SELECT 查询中读取)和 DynamicTableSink(用于在 INSERT INTO 语句中写入)。
DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供特定于连接器的逻辑,用于将 CatalogTable 的元数据转换为 DynamicTableSource 和 DynamicTableSink 的实例。在大多数情况下,工厂的目的是验证选项(例如示例中的 'port' = '5022')、配置编码/解码格式(如果需要)以及创建表连接器的参数化实例。
默认情况下,使用 Java 的服务提供者接口 (SPI) 发现 DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例。连接器选项(例如示例中的 'connector' = 'custom')必须对应于有效的工厂标识符。
尽管在类命名中可能不明显,但 DynamicTableSource 和 DynamicTableSink 也可以被视为有状态的工厂,它们最终为读取/写入实际数据生成具体的运行时实现。
规划器使用源和接收器实例来执行特定于连接器的双向通信,直到找到最佳逻辑计划。根据可选声明的能力接口(例如 SupportsProjectionPushDown 或 SupportsOverwrite),规划器可能会对实例应用更改,从而改变生成的运行时实现。
Runtime
一旦逻辑规划完成,规划器将从表连接器获取运行时实现。运行时逻辑在 Flink 的核心连接器接口中实现,例如 InputFormat 或 SourceFunction。
这些接口按另一个抽象级别分组为 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子类。
例如,OutputFormatProvider(提供 org.apache.flink.api.common.io.OutputFormat)和 SinkFunctionProvider(提供 org.apache.flink.streaming.api.functions.sink.SinkFunction)都是 SinkRuntimeProvider 的具体实例,规划器可以 处理。
自定义 redis sink connector
大概需要下面 4 个过程:
- 自定义 Factory,根据需要实现 DynamicTableSourceFactory, DynamicTableSinkFactory.
- 自定义 TableSink, 实现 DynamicTableSink
- 定义 Options 也就是 connector 相关的属性
- 在 resource 下面添加配置文件 org.apache.flink.table.factories.Factory 里面添加 Factory 的全限定名
Factory
package flink.connector.redis;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
/**
* 自定义 Factory
**/
public class RedisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
return new RedisDynamicTableSink(options);
}
/**
* 这里没有实现 source
* @param context
* @return
*/
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
return null;
}
@Override
public String factoryIdentifier() {
return "redis";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet();
options.add(RedisOptions.HOST);
options.add(RedisOptions.PORT);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet();
options.add(RedisOptions.EXPIRE);
return options;
}
}
TableSink
package flink.connector.redis;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import static flink.connector.redis.RedisOptions.*;
/**
* 自定义 DynamicTableSink
**/
public class RedisDynamicTableSink implements DynamicTableSink {
private ReadableConfig options;
public RedisDynamicTableSink(ReadableConfig options) {
this.options = options;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
// 获取 redis 的 host 和 port
String host = options.getOptional(HOST).get();
Integer port = options.getOptional(PORT).get();
Integer expire = options.getOptional(EXPIRE).get();
MyRedisSink myRedisSink = new MyRedisSink(host, port, expire);
return SinkFunctionProvider.of(myRedisSink);
}
@Override
public DynamicTableSink copy() {
return new RedisDynamicTableSink(this.options);
}
@Override
public String asSummaryString() {
return "redis table sink";
}
}
package flink.connector.redis;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import redis.clients.jedis.Jedis;
/**
* 自定义 sink 写入 redis
**/
public class MyRedisSink extends RichSinkFunction<RowData> {
private final String host;
private final int port;
private int expire;
private Jedis jedis;
public MyRedisSink(String host, int port, int expire) {
this.host = host;
this.port = port;
this.expire = expire;
}
@Override
public void open(Configuration parameters) throws Exception {
this.jedis = new Jedis(host, port);
}
@Override
public void invoke(RowData value, Context context) throws Exception {
this.jedis.set(String.valueOf(value.getString(0)), String.valueOf(value.getInt(1)), "NX", "EX", expire);
}
@Override
public void close() throws Exception {
this.jedis.close();
}
}
这里用的是 Jedis 没有使用 Flink 自带的 redis connector ,因为 Flink 自带的功能有限,很多功能都需要自己扩展,所以就直接使用 Jedis.我这里只是为了演示,只实现了最简单的 set 功能.
Options
package flink.connector.redis;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* Option utils for redis table source sink.
*/
public class RedisOptions {
private RedisOptions() {}
public static final ConfigOption<String> HOST =
ConfigOptions.key("host")
.stringType()
.noDefaultValue()
.withDescription(
"The Redis table host.");
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(6379)
.withDescription(
"The Redis table port.");
public static final ConfigOption<Integer> EXPIRE =
ConfigOptions.key("expire")
.intType()
.noDefaultValue()
.withDescription(
"The Redis table expire time.");
}
所有 redis 相关的属性都可以在这里添加,比如用户名密码,连接池相关的配置等.
配置文件
最后也是最重要的一点就是在 resource 下面添加配置文件,因为 Flink 是通过 SPI 机制来发现工厂的,注意这个路径一定不要写错. 详解 SPI 机制——ServiceLoader.load
使用&测试
到这里基本就完成了,下面来测试一下自定义的 connector 能否把数据准确的写入到 redis 里面.
// 定义数据源表
tEnv.executeSql("""
|CREATE TABLE datagen (
| f_sequence INT,
| f_random INT,
| f_random_str STRING,
| ts AS localtimestamp,
| WATERMARK FOR ts AS ts
|) WITH (
| 'connector' = 'datagen',
| -- optional options --
| 'rows-per-second'='1',
| 'fields.f_sequence.kind'='sequence',
| 'fields.f_sequence.start'='1',
| 'fields.f_sequence.end'='20',
| 'fields.f_random.min'='1',
| 'fields.f_random.max'='1000',
| 'fields.f_random_str.length'='10'
|)
|""".stripMargin)
// 定义 redis 表
tEnv.executeSql(
"""
|create table redis_sink (
|f1 STRING,
|f2 INT
|) WITH (
|'connector' = 'redis',
|'host' = 'xxx',
|'port' = '6379',
|'expire' = '100'
|)
|""".stripMargin)
// 执行插入 SQL
tEnv.executeSql(
"""
|insert into redis_sink
|select f_random_str,f_random
|from datagen
|""".stripMargin)
上面的 datagen 会产生 20 条数据.执行上面的 SQL 然后查询一下 redis 打印的数据如下:
68652c3a52 : 396
de3044d6d0 : 248
b09690ec10 : 436
dab4bb9ea9 : 821
d57a47d883 : 134
4d3d23767a : 63
9ca712a25f : 527
cb3019326d : 164
4a4af63f89 : 803
3cb960dbf1 : 575
db95bf7590 : 500
4274665b4b : 910
5c27396cb1 : 993
c1d957a2c8 : 951
8b24d7abe2 : 66
817b59d742 : 354
baa51bb58a : 14
db32f9cd53 : 510
3c5db2220b : 44
7c169eaef9 : 160
通过上面的 Demo,相信大家对自定义 Flink SQL connector 已经有所了解,那在生产环境中就可以根据自己的需求去定制各种 connector 了.
最后
感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
原文地址:https://mp.weixin.qq.com/s/-h7fZxULS5cz2ZMgEEAncw
相关推荐
- Linux文件系统操作常用命令(linux文件内容操作命令)
-
在Linux系统中,有一些常用的文件系统操作命令,以下是这些命令的介绍和作用:#切换目录,其中./代表当前目录,../代表上一级目录cd#查看当前目录里的文件和文件夹ls#...
- 别小看tail 命令,它难倒了技术总监
-
我把自己以往的文章汇总成为了Github,欢迎各位大佬star...
- lnav:基于 Linux 的高级控制台日志文件查看器
-
lnav是一款开源的控制台日志文件查看器,专为Linux和Unix-like系统设计。它通过自动检测日志文件的格式,提取时间戳、日志级别等关键信息,并将多个日志文件的内容按时间顺序合并显示,...
- 声明式与命令式代码(声明模式和命令模式)
-
编程范式中的术语和差异信不信由你,你可能已经以开发人员的身份使用了多种编程范例。因为没有什么比用编程理论招待朋友更有趣的了,所以这篇文章可以帮助您认识代码中的流行范例。命令式编程命令式编程是我们从As...
- linux中的常用命令(linux常用命令和作用)
-
linux中的常用命令linux中的命令统称shell命令shell是一个命令行解释器,将用户命令解析为操作系统所能理解的指令,实现用户与操作系统的交互shell终端:我们平时输入命令,执行程序的那个...
- 提高工作效率的--Linux常用命令,能够决解95%以上的问题
-
点击上方关注,第一时间接受干货转发,点赞,收藏,不如一次关注评论区第一条注意查看回复:Linux命令获取linux常用命令大全pdf+Linux命令行大全pdf...
- 如何限制他人操作自己的电脑?(如何控制别人的电脑不让发现)
-
这段时间,小猪罗志祥正处于风口浪尖,具体是为啥?还不知道的小伙伴赶紧去补一下最近的娱乐圈八卦~简单来说,就是我们的小罗同事,以自己超强的体力,以及超强的时间管理能力,重新定义了「多人运动」的含义,重新...
- 最通俗易懂的命令模式讲解(命令模式百科)
-
我们先不讲什么是命令模式,先通过一个场景来引出命令模式,看看命令模式能解决什么样的问题。现在有一个渣男张三,他有还几个女朋友,你现在是不是还是单身狗,你就说你气不气?然后他需要每天分别叫几个女朋友起床...
- 互联网大厂后端必看!Spring Boot 中Runtime执行与停止命令?
-
你是否曾在使用SpringBoot开发项目时,遇到需要执行系统命令的场景?比如调用脚本进行文件处理,又或是启动外部程序?很多后端开发人员会使用Processexec=Runtime.get...
- Linux 常用命令(linux常用的20个命令面试)
-
日志排查类操作命令...
- Java字节码指令:if_icmpgt(0xA3)(java字节码使用的汇编语言)
-
if_icmpgt是Java字节码中的一条条件跳转指令,其全称是"IfIntegerCompareGreaterThan"。它用于比较两个整数值的大小。如果栈顶的第一个...
- 外贸干货|如何增加领英的曝光量和询盘
-
#跨境电商#...
- golang执行linux命令(golang调用shell脚本)
-
需求需要通过openssl生成rsa秘钥,然后保存该秘钥。代码实例packagemainimport("io/ioutil""bytes"&...
- LINUX磁盘挂载(linux磁盘挂载到windows)
-
1、使用root用户查看磁盘挂载情况:fdisk-l2、使用df查看当前磁盘挂载情况,根据和fdisk-l的结果进行对比,查看还有那些磁盘未使用3、挂载:mount磁盘挂载路径...
- Linux命令学习——nl命令(linux ln命令的使用)
-
nl命令主要功能为每一个文件添加行号,每一个输入的文件添加行号后发送到标准输出。当没有文件或文件为-时,读取标准输入...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
因果推断Matching方式实现代码 因果推断模型
-
git pull命令使用实例 git pull--rebase
-
面试官:git pull是哪两个指令的组合?
-
git 执行pull错误如何撤销 git pull fail
-
git fetch 和git pull 的异同 git中fetch和pull的区别
-
git pull 和git fetch 命令分别有什么作用?二者有什么区别?
-
git pull 之后本地代码被覆盖 解决方案
-
还可以这样玩?Git基本原理及各种骚操作,涨知识了
-
git命令之pull git.pull
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)