百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

FlinkSQL自定义 redis connector flinksql自定义sink

wptr33 2024-12-17 16:47 33 浏览

导读:一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector

先来看一下官网的一张 connector 架构图:

Metadata

Table API 和 SQL 都是声明式 API。这包括表的声明。因此,执行 CREATE TABLE 语句会导致目标目录中的元数据更新。对于大多数目录实现,不会为此类操作修改外部系统中的物理数据。特定于连接器的依赖项不必出现在类路径中。WITH 子句中声明的选项既不经过验证也不以其他方式解释。动态表的元数据(通过 DDL 创建或由目录提供)表示为 CatalogTable 的实例。必要时,表名将在内部解析为 CatalogTable。

Planning

在规划和优化表程序时,需要将 CatalogTable 解析为 DynamicTableSource(用于在 SELECT 查询中读取)和 DynamicTableSink(用于在 INSERT INTO 语句中写入)。

DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供特定于连接器的逻辑,用于将 CatalogTable 的元数据转换为 DynamicTableSource 和 DynamicTableSink 的实例。在大多数情况下,工厂的目的是验证选项(例如示例中的 'port' = '5022')、配置编码/解码格式(如果需要)以及创建表连接器的参数化实例。

默认情况下,使用 Java 的服务提供者接口 (SPI) 发现 DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例。连接器选项(例如示例中的 'connector' = 'custom')必须对应于有效的工厂标识符。

尽管在类命名中可能不明显,但 DynamicTableSource 和 DynamicTableSink 也可以被视为有状态的工厂,它们最终为读取/写入实际数据生成具体的运行时实现。

规划器使用源和接收器实例来执行特定于连接器的双向通信,直到找到最佳逻辑计划。根据可选声明的能力接口(例如 SupportsProjectionPushDown 或 SupportsOverwrite),规划器可能会对实例应用更改,从而改变生成的运行时实现。

Runtime

一旦逻辑规划完成,规划器将从表连接器获取运行时实现。运行时逻辑在 Flink 的核心连接器接口中实现,例如 InputFormat 或 SourceFunction。

这些接口按另一个抽象级别分组为 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子类。

例如,OutputFormatProvider(提供 org.apache.flink.api.common.io.OutputFormat)和 SinkFunctionProvider(提供 org.apache.flink.streaming.api.functions.sink.SinkFunction)都是 SinkRuntimeProvider 的具体实例,规划器可以 处理。

自定义 redis sink connector

大概需要下面 4 个过程:

  1. 自定义 Factory,根据需要实现 DynamicTableSourceFactory, DynamicTableSinkFactory.
  2. 自定义 TableSink, 实现 DynamicTableSink
  3. 定义 Options 也就是 connector 相关的属性
  4. 在 resource 下面添加配置文件 org.apache.flink.table.factories.Factory 里面添加 Factory 的全限定名

Factory

package flink.connector.redis;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;

/**
 * 自定义 Factory 
 **/
public class RedisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig options = helper.getOptions();
        return new RedisDynamicTableSink(options);
    }

    /**
     * 这里没有实现 source 
     * @param context
     * @return
     */
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        return null;
    }

    @Override
    public String factoryIdentifier() {
        return "redis";
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet();
        options.add(RedisOptions.HOST);
        options.add(RedisOptions.PORT);
        return options;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet();
        options.add(RedisOptions.EXPIRE);
        return options;
    }
}

TableSink

package flink.connector.redis;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import static flink.connector.redis.RedisOptions.*;

/**
 * 自定义 DynamicTableSink 
 **/
public class RedisDynamicTableSink implements DynamicTableSink {

    private ReadableConfig options;

    public RedisDynamicTableSink(ReadableConfig options) {
        this.options = options;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.UPDATE_BEFORE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .build();
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        // 获取 redis 的 host 和 port
        String host = options.getOptional(HOST).get();
        Integer port = options.getOptional(PORT).get();
        Integer expire = options.getOptional(EXPIRE).get();
        MyRedisSink myRedisSink = new MyRedisSink(host, port, expire);
        return SinkFunctionProvider.of(myRedisSink);
    }

    @Override
    public DynamicTableSink copy() {
        return new RedisDynamicTableSink(this.options);
    }

    @Override
    public String asSummaryString() {
        return "redis table sink";
    }
}
package flink.connector.redis;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import redis.clients.jedis.Jedis;

/**
 * 自定义 sink 写入 redis
 **/
public class MyRedisSink extends RichSinkFunction<RowData> {

    private final String host;
    private final int port;
    private int expire;
    private Jedis jedis;

    public MyRedisSink(String host, int port, int expire) {
        this.host = host;
        this.port = port;
        this.expire = expire;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        this.jedis = new Jedis(host, port);
    }

    @Override
    public void invoke(RowData value, Context context) throws Exception {
        this.jedis.set(String.valueOf(value.getString(0)), String.valueOf(value.getInt(1)), "NX", "EX", expire);
    }

    @Override
    public void close() throws Exception {
        this.jedis.close();
    }
}

这里用的是 Jedis 没有使用 Flink 自带的 redis connector ,因为 Flink 自带的功能有限,很多功能都需要自己扩展,所以就直接使用 Jedis.我这里只是为了演示,只实现了最简单的 set 功能.

Options

package flink.connector.redis;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** 
 * Option utils for redis table source sink. 
 */ 
public class RedisOptions {

    private RedisOptions() {}

    public static final ConfigOption<String> HOST =
            ConfigOptions.key("host")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "The Redis table host.");

    public static final ConfigOption<Integer> PORT =
            ConfigOptions.key("port")
                    .intType()
                    .defaultValue(6379)
                    .withDescription(
                            "The Redis table port.");

    public static final ConfigOption<Integer> EXPIRE =
            ConfigOptions.key("expire")
                    .intType()
                    .noDefaultValue()
                    .withDescription(
                            "The Redis table expire time.");
}

所有 redis 相关的属性都可以在这里添加,比如用户名密码,连接池相关的配置等.

配置文件

最后也是最重要的一点就是在 resource 下面添加配置文件,因为 Flink 是通过 SPI 机制来发现工厂的,注意这个路径一定不要写错. 详解 SPI 机制——ServiceLoader.load

使用&测试

到这里基本就完成了,下面来测试一下自定义的 connector 能否把数据准确的写入到 redis 里面.

// 定义数据源表
tEnv.executeSql("""
                  |CREATE TABLE datagen (
                  | f_sequence INT,
                  | f_random INT,
                  | f_random_str STRING,
                  | ts AS localtimestamp,
                  | WATERMARK FOR ts AS ts
                  |) WITH (
                  | 'connector' = 'datagen',
                  | -- optional options --
                  | 'rows-per-second'='1',
                  | 'fields.f_sequence.kind'='sequence',
                  | 'fields.f_sequence.start'='1',
                  | 'fields.f_sequence.end'='20',
                  | 'fields.f_random.min'='1',
                  | 'fields.f_random.max'='1000',
                  | 'fields.f_random_str.length'='10'
                  |)
                  |""".stripMargin)
                  
// 定义 redis 表
                  tEnv.executeSql(
            """
              |create table redis_sink (
              |f1 STRING,
              |f2 INT
              |) WITH (
              |'connector' = 'redis',
              |'host' = 'xxx',
              |'port' = '6379',
              |'expire' = '100'
              |)
              |""".stripMargin)
              
// 执行插入 SQL
              tEnv.executeSql(
            """
              |insert into redis_sink
              |select f_random_str,f_random
              |from datagen
              |""".stripMargin)

上面的 datagen 会产生 20 条数据.执行上面的 SQL 然后查询一下 redis 打印的数据如下:

68652c3a52 : 396
de3044d6d0 : 248
b09690ec10 : 436
dab4bb9ea9 : 821
d57a47d883 : 134
4d3d23767a : 63
9ca712a25f : 527
cb3019326d : 164
4a4af63f89 : 803
3cb960dbf1 : 575
db95bf7590 : 500
4274665b4b : 910
5c27396cb1 : 993
c1d957a2c8 : 951
8b24d7abe2 : 66
817b59d742 : 354
baa51bb58a : 14
db32f9cd53 : 510
3c5db2220b : 44
7c169eaef9 : 160

通过上面的 Demo,相信大家对自定义 Flink SQL connector 已经有所了解,那在生产环境中就可以根据自己的需求去定制各种 connector 了.

最后

感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

原文地址:https://mp.weixin.qq.com/s/-h7fZxULS5cz2ZMgEEAncw

相关推荐

MySQL进阶五之自动读写分离mysql-proxy

自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...

Postgres vs MySQL_vs2022连接mysql数据库

...

3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?

引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...

一文由浅入深带你完全掌握MySQL的锁机制原理与应用

本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...

验证Mysql中联合索引的最左匹配原则

后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...

MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)

目录1.索引基础...

你会看 MySQL 的执行计划(EXPLAIN)吗?

SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...

MySQL 从入门到精通(四)之索引结构

索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...

mysql总结——面试中最常问到的知识点

mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...

mysql总结——面试中最常问到的知识点(2)

首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...

MySQL基础全知全解!超详细无废话!轻松上手~

本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...

深入剖析 MySQL 中的锁机制原理_mysql 锁详解

在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...

Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析

引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...

MySQL基础篇:DQL数据查询操作_mysql 查

一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...

MySql:索引的基本使用_mysql索引的使用和原理

一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...