Kettle实现rabbitMQ的生产与消费_rabbitmq不支持顺序消费
wptr33 2025-10-14 06:17 2 浏览
文章目录
- 一、Kettle为什么可以读取流数据?
- 二、rabbitMQ中启动MQTT插件并创建队列和路由键
- 三、Kettle实现rabbitMQ的生产与消费
Kettle是一款非常强大的ETL工具,不仅可以使用图形化界面,还可以处理各种数据,今天记录一下本人使用Kettle中MQTT组件来实现从rabbitMQ中读取流数据,并进行解析和处理。
提示:以下是本篇文章正文内容,下面案例可供参考
一、Kettle为什么可以读取流数据?
首先,本人使用的是Kettle8.2,里面关于流处理的组件有以下几种 (注意Kettle版本,我现在使用的是8.x版本,这里面只有MQTT组件,可以连接rabbitMQ,但之前使用的7.x版本是没有MQTT流处理的,也就是不能处理rabbitMQ中的数据,而9.x版本中已经有rabbitMQ组件了):
从流中获取数据信息的第一步就是第一个组件“Get records fromstream”,之后会写到,这些流处理包括JMS、Kafka、MQTT。
然后,Kettle其实是不可以直接连接rabbitMQ的,rabbitMQ默认使用amqp协议,但也可以启用MQTT插件,来使用MQTT协议。因此,我们使用Kettle通过MQTT协议步骤来生产和消费rabbitMQ。
二、rabbitMQ中启动MQTT插件并创建队列和路由键
首先使用rabbitMQ自带的控制台输入命令,也可以用windows cd到rabbit目录输入命令。
输入以下命令:
rabbitmq-plugins enable rabbitmq_mqtt 开启 rabbitmq_mqtt 对应端口 1883
rabbitmq-plugins enable rabbitmq_web_mqtt 开启 rabbitmq_web_mqtt 对应端口 15675
因为我们是使用Kettle来连接rabbitMQ,所以使用的是1883端口,切记,只能使用端口1883,开启之后,可以在 http://rabbitMQ的ip地址:15672/#/ web页面查看端口是否开启:
确定端口开启之后,我们在Exchanges模块下面找到amq.topic交换器,点击进去之后,再绑定队列和路由键:
MQTT官方文档中有涉及到MQTT的系统配置,可自行尝试是否可以更改默认配置,本文未涉及:
值得注意的是我们使用的交换器只能是amq.topic,原因是rabbitMQ中的MQTT插件默认配置中只有一个交换器就是amq.topic,然后队列名称也只能是“mqtt-subscription-”开头,路由键名称可以随便设置。但要便于记忆,后续Kettle中使用的就是这个路由键。
三、Kettle实现rabbitMQ的生产与消费
1、生产数据发送给rabbitMQ
使用Kettle组件:生成记录、MQTT producer
值得注意的是,端口号只能是1883,还有就是下面的topic name是填写路由键,不是topic名称,本次绑定在amq.topic交换器下面队列的路由键是routing.update.username,所以这里填写的就是routing.update.username,其他设置默认就好,如果想要知道其他配置的作用,可参考Kettle的 官方文档 。
2、从rabbitMQ消费数据
使用Kettle组件:MQTT consumer
需要注意的还是端口和路由键,还有就是后续处理步骤最好使用英文命名,使用中文有时候会读取失败,或者识别不到XML文件,或者报错不是.ktr文件,重点切记!!!
后续处理步骤使用组件:Get records fromstream、表输出、空操作、写日志、transformation executor
“Get records fromstream”从流中接收信息,“表输出”将接收的信息存储到数据库中,“空操作”插入数据库时如果报错的消极处理,也可以换成“excel输出”,存储报错信息,“写日志”是将接收到信息打印到控制台,“transformation executor”是指定一个子转换步骤来处理数据,如后续没有处理需求,该步骤可省略,可只使用“Get records fromstream”和“写日志”两个步骤就行,进行验证。因为本次处理的数据为Json数据,所以还要对Json数据进行解析和处理,然后再使用解析后的数据去更新相关数据表。
接收到Json数据存储到了Mysql数据库中,所以解析就使用了Mysql自带的函数(JSON_EXTRACT),使用方法可参考文章:
mysql解析json字符串_Mysql解析json字符串/数组
也可参考本人的sql来解析Json数组:
select
x.id,
x.only_id,
x.createby,
x.createtime,
x.platform,
x.shopname,
x.realshopname,
x.username,
x.oldusername,
x.rownum,
y.user_id
from
(
select
a.id,
replace(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].id'),'"','') as only_id,
replace(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].createBy'),'"','') as createby,
from_unixtime(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].createTime')/1000,'%Y-%m-%d %H:%i:%S') as createtime,
replace(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].platform'),'"','') as platform,
replace(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].shopName'),'"','') as shopname,
replace(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].realShopName'),'"','') as realshopname,
replace(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].userName'),'"','') as username,
replace(json_extract(substring_index( substring_index( a.message, ";", b.id ), ";",- 1 ), '$[0].oldUserName'),'"','') as oldusername,
b.id as rownum
from
(select id,replace(replace(replace(message,"},{","};{"),"]",""),"[","") as message,flag from sys_update_shopusername_log) a
join mysql.help_toplic_autonum b on b.id <= ( length( a.message ) - length( replace ( a.message, ";", "" ) ) + 1 )
where a.flag = 0
) x
join sys_user_detail y on x.username = y.username and upper(x.platform) = y.platform
-- “sys_update_shopusername_log”为存储的消费到的Json数据
-- “mysql.help_toplic_autonum”自定义拆分Json数组的辅助表
先启动MQTT消费者,如报错,就检查ip地址、端口、路由键是否正确。启动完成后再启动MQTT生产者,发送消息给rabbitMQ,再自己消费。
消费者:
生产者:
再次查看消费者消费情况:
可以看到是能够生产数据和消费数据,这个之后就可以让上游开发将数据信息发送到我们的默认交换器amq.topic的绑定队列里面,我们就可以消费和处理了。
四、总结
注意细节:是否开启MQTT插件,端口号是否是1883,交换器和队列名称是否符合默认设定,Kettle里面MQTT producer和MQTT consumer组件所涉及到topic name 都是路由键,是在rabbitMQ中创建队列时绑定的路由键,最后就是可以根据接收到消息使用transformation executor组件来进行后续开发,转换命名最好使用英文命名。
提示:如本文有一点点帮助到您,请点赞、转发、收藏、留言,感谢!!!,如需转载、引用敬请注明!!!
相关推荐
- 深度剖析 MySQL 数据库索引失效场景与优化策略
-
在互联网软件开发领域,MySQL数据库凭借其开源、高效等特性被广泛应用。而索引,作为提升MySQL查询性能的关键利器,能大幅加速数据检索。然而,在实际开发中,即便精心创建了索引,却常常遭遇索引失...
- 15分钟,带你了解indexedDB,这个前端存储方案很重要!
-
原文来源于:程序员成长指北;作者:Django强哥如有侵权,联系删除最近在给前端班授课,在这次之前的最后一次课已经是在2年前,2年的时间,前端的变化很大,也是时候要更新课件了。整理客户端存储篇章时模糊...
- MySQL 面试总被问到的那些问题,你都懂了吗?
-
事务的四大特性是什么?首先得提一下ACID,这可是数据库事务的灵魂所在:原子性(Atomicity):要么全部成功,要么全部失败回滚。一致性(Consistency):确保数据在事务前后都处于一致状态...
- Java 字符串常见的操作_java字符串总结
-
在Java当中,为字符串类提供了丰富的操作方法,对于字符串,我们常见的操作就是:字符串的比较、查找、替换、拆分、截取以及其他的一些操作。在Java中,有String,StringBuffer和St...
- java学习分享:Java截取(提取)子字符串(substring())
-
在String中提供了两个截取字符串的方法,一个是从指定位置截取到字符串结尾,另一个是截取指定范围的内容。下面对这两种方法分别进行介绍。1.substring(intbeginIndex)形...
- 你必须知道的 7 个杀手级 JavaScript 单行代码
-
1.如果你需要一个临时的唯一ID,请生成随机字符串。这个例子将为你生成一个随机字符串:constrandomString=Math.random().toString(36).slice(2)...
- MySQL 索引失效:原因、场景与解决方案
-
在互联网软件开发领域,MySQL作为一款广泛使用的关系型数据库,其性能优化至关重要。而索引,作为提升MySQL查询性能的关键手段,一旦失效,会导致查询效率大幅下降,影响整个系统的性能。今天,就来...
- Axure9 教程:可模糊搜索的多选效果
-
一、交互效果说明1.点击话题列表中的话题选项,上方输入框内显示选择的话题标签,最多可选择5个标签,超出将有文字提示。2.点击输入框内已选择的话题标签的删除按钮,可以删除已选择的话题标签,并且该标签返回...
- JavaScript字符串操作方法大全,包含ES6方法
-
一、charAt()返回在指定位置的字符。...
- 为什么MySQL索引不生效?来看看这8个原因
-
在数据库优化中,最让人头疼的事情之一莫过于精心设计的索引没有发挥作用。为什么会出现这种情况?这篇文章带大家一起探讨一些常见原因,方便大家更好地理解MySQL查询优化器是如何选择索引的,以及在出现类...
- Kettle实现rabbitMQ的生产与消费_rabbitmq不支持顺序消费
-
文章目录一、Kettle为什么可以读取流数据?...
- MySQL高频函数Top10!数据分析效率翻倍,拒绝无效加班!
-
引言:为什么你的SQL代码又臭又长?“同事3行代码搞定的事,你写了30行?”“每次处理日期、字符串都抓狂,疯狂百度?”——不是你不努力,而是没掌握这些高频函数!本文精炼8年数据库开发经验,总结出10个...
- mysql的截取函数用法详解_mysql截取指定字符
-
substring()函数测试数据准备:用法:以下语法是mysql自动提示的1:substirng(str,pos):从指定位置开始截取一直到数据完成str:需要截取的字段的pos:开始截取的位置。从...
- MySQL函数:字符串如何截取_mysql 字符串截取函数
-
练习截取字符串函数(五个)mysql索引从1开始...
- 数据集成产品分析(一)_数据集成工具有哪些
-
编辑导语:数据集成产品是数据中台建设的第一环节,在构建数据中台或大数据系统时,首先要将企业内部各个业务系统的数据实现互联互通,从物理上打破数据孤岛。本文作者对数据集成产品进行了分析,一起来看一下吧。数...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
程序员的开源月刊《HelloGitHub》第 71 期
-
详细介绍一下Redis的Watch机制,可以利用Watch机制来做什么?
-
如何将AI助手接入微信(打开ai手机助手)
-
SparkSQL——DataFrame的创建与使用
-
假如有100W个用户抢一张票,除了负载均衡办法,怎么支持高并发?
-
Java面试必考问题:什么是乐观锁与悲观锁
-
redission YYDS spring boot redission 使用
-
如何利用Redis进行事务处理呢? 如何利用redis进行事务处理呢英文
-
一文带你了解Redis与Memcached? redis与memcached的区别
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)