Flink教程-flink 1.11 使用sql将流式数据写入文件系统
wptr33 2024-12-23 14:05 25 浏览
flink提供了一个file system connector,可以使用DDL创建一个table,然后使用sql的方法写入数据,支持的写入格式包括json、csv、avro、parquet、orc。
一个最简单的DDL如下:
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
h string,
m string
) PARTITIONED BY (dt,h,m) WITH (
'connector'='filesystem',
'path'='file:///tmp/abc',
'format'='orc'
);
下面我们简单的介绍一下相关的概念和如何使用。
滚动策略
Key Default Type Description sink.rolling-policy.file-size 128MB MemorySize 分区文件的最大值,超过这个大小,将会启动一个新文件。 sink.rolling-policy.rollover-interval 30 m Duration 分区文件滚动的最大时间间隔,超过这个时间,将会新启动一个文件 sink.rolling-policy.check-interval 1 m Duration 一个时间间隔,定期去检查上面那个配置指定的策略下,文件是否应该滚动生成新文件.
- 在写入列格式(比如parquet、orc)的时候,上述的配置和checkpoint的间隔一起来控制滚动策略,也就是说sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval、checkpoint间隔,这三个选项,只要有一个条件达到了,然后就会触发分区文件的滚动,结束上一个文件的写入,生成新文件。
- 对于写入行格式的数据,比如json、csv,主要是靠sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval,也就是文件的大小和时间来控制写入数据的滚动策略.
分区提交
在往一个分区写完了数据之后,我们希望做一些工作来通知下游。比如在分区目录写一个SUCCESS文件,或者是对于hive来说,去更新metastore的数据,自动刷新一下分区等等。 分区的提交主要依赖于触发器和提交的策略:
- 触发器:即什么时候触发分区的提交,
- 提交策略:也就是分区写完之后我们做什么,目前系统提供了两种内置策略:1.往分区目录写一个空SUCCESS文件;2.更新元数据.
分区提交触发器
key default type 解释 sink.partition-commit.trigger process-time String 触发器的类型,目前系统提供了两种:process-time 和 partition-time,如果选择了process-time,则当系统时间大于processtime的时候触发提交,如果选择了partition-time,则需要先从分区字段里面抽取分区时间的开始时间,然后当水印大于这个分区时间的时候触发分区的提交. sink.partition-commit.delay 0 s Duration 提交分区的延迟时间
- process-time. 这种提交方式依赖于系统的时间,一旦遇到数据延迟等情况,会造成分区和分区的数据不一致。
- partition-time :这种情况需要从分区字段里抽取出来相应的pattern,具体可参考下一个段落分区的抽取。
- sink.partition-commit.delay:一旦这个数值设置不为0,则在process-time情况下,当系统时间大于分区创建时间加上delay延迟,会触发分区提交; 如果是在partition-time 情况下,则需要水印大于分区创建时间加上delay时间,会触发分区提交.
第一个参数process-time、partition-time,我们不用做过多的解释,就类似于flink中的processtime和eventtime。
第二个参数sink.partition-commit.delay我们用实际案例解释下: 比如我们配置的是分区是/yyyy-MM-dd/HH/,写入的是ORC列格式,checkpoint配置的间隔是一分钟,也就是默认情况下会每分钟生成一个orc文件,最终会在每个分区(/yyyy-MM-dd/HH/)下面生成60个orc文件。
比如当前系统正在写入/day=2020-07-06/h=10/分区的数据,那么这个分区的创建时间是2020-07-06 10:00:00,如果这个delay配置采用的是默认值,也就是0s,这个时候当写完了一个ORC文件,也就是2020-07-06 10:01:00分钟的时候,就会触发分区提交,比如更新hive的元数据,这个时候我们去查询hive就能查到刚刚写入的文件;如果我们想/day=2020-07-06/h=10/这个分区的60个文件都写完了再更新分区,那么我们可以将这个delay设置成 1h,也就是等到2020-07-06 11:00:00的时候才会触发分区提交,我们才会看到/2020-07-06/10/分区下面的所有数据
分区时间的抽取
从分区值里抽取分区时间,我们可以理解为上面触发器参数配置为partition-time的时候,分区的创建时间,当水印大于这个时间+delay的时候触发分区的提交.
Key Default Type 解释 partition.time-extractor.kind default String 抽取分区的方式,目前有default和custom两种,如果是default,需要配置partition.time-extractor.timestamp-pattern,如果是custom,需要配置自定义class partition.time-extractor.class null String 自定义class partition.time-extractor.timestamp-pattern null String 从分区值中抽取时间戳的模式,需要组织成yyyy-MM-dd HH:mm:ss格式,比如 对于上面我们提到的分区/yyyy-MM-dd/HH/,其中两个分区字段对应的字段名分为是dt和hour,那么我们这个timestamp-pattern 可以配置成'hour:00:00'
自定义抽取分区时间的话,需要实现PartitionTimeExtractor接口:
public interface PartitionTimeExtractor extends Serializable {
String DEFAULT = "default";
String CUSTOM = "custom";
/**
* Extract time from partition keys and values.
*/
LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);
...................
}
分区提交策略
定义了分区提交的策略,也就是写完分区数据之后做什么事情,目前系统提供了以下行为:
- metastore,只支持hive table,也就是写完数据之后,更新hive的元数据.
- success file: 写完数据,往分区文件写一个success file.
- 自定义
key Default Type 描述 sink.partition-commit.policy.kind null string 可选:metastore,success-file,custom,这个可以写一个或者多个,比如可以这样,'metastore,success-file' sink.partition-commit.policy.class null string 如果上述选择custom的话,这里指定相应的class sink.partition-commit.success-file.name null string 如果上述选择的是success-file,这里可以指定写入的文件名,默认是 _SUCCESS
完整示例
定义实体类
public static class UserInfo implements java.io.Serializable{
private String userId;
private Double amount;
private Timestamp ts;
public String getUserId(){
return userId;
}
public void setUserId(String userId){
this.userId = userId;
}
public Double getAmount(){
return amount;
}
public void setAmount(Double amount){
this.amount = amount;
}
public Timestamp getTs(){
return ts;
}
public void setTs(Timestamp ts){
this.ts = ts;
}
}
自定义source
public static class MySource implements SourceFunction<UserInfo>{
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};
@Override
public void run(SourceContext<UserInfo> sourceContext) throws Exception{
while (true){
String userid = userids[(int) (Math.random() * (userids.length - 1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(new Date().getTime()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}
@Override
public void cancel(){
}
}
写入file
通过sql的ddl创建一个最简单的基于process time的table,然后写入数据.
在这个实例中,我们开启了checkpoint的时间间隔是10s,所以会每隔10s写入一个orc文件.
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());
String sql = "CREATE TABLE fs_table (\n" +
" user_id STRING,\n" +
" order_amount DOUBLE,\n" +
" dt STRING," +
" h string," +
" m string \n" +
") PARTITIONED BY (dt,h,m) WITH (\n" +
" 'connector'='filesystem',\n" +
" 'path'='file:///tmp/abc',\n" +
" 'format'='orc'\n" +
")";
tEnv.executeSql(sql);
tEnv.createTemporaryView("users", dataStream);
String insertSql = "insert into fs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);
完整的代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteFile.java
更多精彩内容,欢迎关注我的公众号【大数据技术与应用实战】
相关推荐
- VPS主机搭建Ghost环境:Nginx Node.js MariaDB
-
Ghost是一款个人博客系统,它是使用Node.js语言和MySQL数据库开发的,同时支持MySQL、MariaDB、SQLite和PostgreSQL。用户可以在支持Node.js的服务器上使用自己...
- centos7飞速搭建zabbix5.0并添加windows、linux监控
-
一、环境zabbix所在服务器系统为centos7,监控的服务器为windows2016和centos7。二、安装zabbix官方安装帮助页面...
- Zabbix5.0安装部署
-
全盘展示运行状态,减轻运维人员的重复性工作量,提高系统排错速度,加速运维知识学习积累。1.png...
- MariaDB10在CentOS7系统下,迁移数据存储位置
-
背景在CentOS7下如果没有默认安装MySQL数据库,可以选择安装MariaDB,最新的版本现在是10可以选择直接yum默认安装的方式yum-yinstallmariadbyum-yi...
- frappe项目安装过程
-
1,准备一台虚拟机,debian12或者ubuntusever22.04.3可以用virtualbox/qemu,或者你的超融合服务器安装一些常用工具和依赖库我这里选择server模式安装,用tab...
- 最新zabbix一键安装脚本(基于centos8)
-
一、环境准备注意:操作系统必须是centos8及以上的,因为我配的安装源是centos8的。并且必须连接互联网,脚本是基于yum安装的!!!...
- ip地址管理之phpIPAM保姆级安装教程 (原创)
-
本教程基于Ubuntu24.04LTS,安装phpIPAM(最新稳定版1.7),使用Apache、PHP8.3和MariaDB,遵循最佳实践,确保安全性和稳定性。一、环境准备1....
- centos7傻瓜式安装搭建zabbix5.0监控服务器教程
-
zabbix([`zaebiks])是一个基于WEB界面的提供分布式系统监视...
- zabbix7.0LTS 保姆级安装教程 小白也能轻松上手安装
-
系统环境:rockylinux9.4(yumupdate升级到最新版本)数据库:mariadb10.5.22第一步:关闭防火墙和selinux使用脚本关闭...
- ubuntu通过下载安装包安装mariadb10.4
-
要在Ubuntu18.04上安装MariaDB10.4.34,用的是那个tar.gz的安装包。步骤大概是:...
- 从0到1:基于 Linux 快速搭建高可用 MariaDB Galera 集群(实战指南)
-
在企业生产环境中,数据库的高可用性至关重要。今天带你从0到1,手把手在Linux系统上快速搭建一个高可用MariaDBGaleraCluster,实现数据库同步复制、故障自动恢复,保障业务...
- Windows 中安装 MariaDB 数据库
-
mariadb在Windows下的安装非常简单,下载程序双击运行就可以了。需要注意:mariadb和MySQL数据库在Windows下默认是不区分大小写的,但是在Linux下是区分...
- SQL执行顺序(SqlServer)
-
学习SQL这么久,如果突然有人问你SQL的执行顺是怎么样的?是不是很多人会觉得C#、JavaScript都是根据编程顺序来处理的,那么SQL也是根据编程顺序来执行的吗?...
- C# - StreamWriter与StreamReader 读写文件 101
-
读写文本文件的方式:1)File静态类的File.ReadAllLines();与File.WriteAllLines();方法进行读写...
- C#中的数组探究与学习
-
C#中的数组一般分为:...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
因果推断Matching方式实现代码 因果推断模型
-
git pull命令使用实例 git pull--rebase
-
git pull 和git fetch 命令分别有什么作用?二者有什么区别?
-
面试官:git pull是哪两个指令的组合?
-
git 执行pull错误如何撤销 git pull fail
-
git fetch 和git pull 的异同 git中fetch和pull的区别
-
git pull 之后本地代码被覆盖 解决方案
-
还可以这样玩?Git基本原理及各种骚操作,涨知识了
-
git命令之pull git.pull
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mysql max (33)
- vba instr (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)