Java版Flink使用指南——将消息写入到RabbitMQ的队列中

大纲

  • 新建工程
    • 新增依赖
  • 编码
    • 自动产生数据
    • 写入RabbitMQ
  • 测试

在 《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们介绍了如何使用Java在Flink中读取RabbitMQ中的数据,并将其写入日志中。本文将通过代码产生一些数据,然后将它们写入到另外一个RabbitMQ队列中。

新建工程

我们在IntelliJ中新建一个工程SinkToRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

编码

自动产生数据

这段代码将产生两个字符串数据,后续这些数据会被写入到RabbitMQ的队列中。

List<String> data = new ArrayList<>();
data.add("Hello, World!");
data.add("Hello, Flink!");
DataStream<String> stream = env.fromCollection(data);

写入RabbitMQ

不同于《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》中创建RMQSource用来接收RabbitMQ队列中数据,这次我们创建RMQSink用来发布数据。

String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSink<String> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SimpleStringSchema());
stream.addSink(stringRMQSink).name(username + "'s sink to " + sinkQueueName).setParallelism(parallelism);	

测试

打包、提交并运行任务
在这里插入图片描述
然后在RabbitMQ的后台可以看到收到两条消息
在这里插入图片描述
其内容也是我们之前在代码中生成的内容
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1474892.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Vine: 一种全新定义 Vue 函数式组件的解决方案

7月6日的 vue confg 大会上 ShenQingchuan 大佬介绍了他的 Vue Vine 项目&#xff0c; 一种全新定义 Vue 函数式组件的解决方案。 和 React 的函数式组件有异曲同工之妙&#xff0c;写起来直接起飞了。 让我们来快速体验一下 vine&#xff0c; 看看到底给我们带来了哪些惊喜吧…

分别通过LS和RML进行模型参数辨识matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1 最小二乘法(LS)参数辨识 4.2 递归最大似然估计(RML)参数辨识 5.完整程序 1.程序功能描述 分别通过LS和RML进行模型参数辨识matlab仿真&#xff0c;仿真输出参数辨识的误差&#xff0c…

网络基础:BGP协议

BGP&#xff08;边界网关协议&#xff0c;Border Gateway Protocol&#xff09;是一种用于在不同自治系统&#xff08;Autonomous Systems&#xff0c;AS&#xff09;之间交换路由信息的路径向量协议。BGP是互联网的核心路由协议之一&#xff0c;负责管理和维护互联网范围内的路…

MySQL安全加固及等保测评

登录后复制 Mysql基础命令 create USER new_userlocalhost IDENTIFIED BY password; //创建用户 alter user root% identified with mysql_native_password by ********; //修改密码 rename user root% to root192.168.1.1; //重命名 flush privileges; …

Java面试题--JVM大厂篇之深入解析G1 GC——革新Java垃圾回收机制

目录 引言: 正文&#xff1a; 一、G1 GC的区域划分及其作用 1. 伊甸园区&#xff08;Eden Region&#xff09; 2. 幸存者区&#xff08;Survivor Region&#xff09; 3. 老年代区&#xff08;Old Generation Region&#xff09; 二、区域划分的优势: 三、图片解析: 结…

昇思25天学习打卡营第20天|LSTM+CRF序列标注

学AI还能赢奖品&#xff1f;每天30分钟&#xff0c;25天打通AI任督二脉 (qq.com) LSTMCRF序列标注 概述 序列标注指给定输入序列&#xff0c;给序列中每个Token进行标注标签的过程。序列标注问题通常用于从文本中进行信息抽取&#xff0c;包括分词(Word Segmentation)、词性标…

Python | Leetcode Python题解之第220题存在重复元素III

题目&#xff1a; 题解&#xff1a; class Solution(object):def containsNearbyAlmostDuplicate(self, nums, k, t):from sortedcontainers import SortedSetst SortedSet()left, right 0, 0res 0while right < len(nums):if right - left > k:st.remove(nums[left]…

D - Go Stone Puzzle(abc361)

分析&#xff1a;因为n很小&#xff0c;可以逐一搜索&#xff0c;用一个队列将每种情况列出来&#xff0c;用bfs寻找从s到t的最短路径 #include <bits/stdc.h> using namespace std; int n; string s, t; map<string, int> dis; void bfs() { dis[s] 0; …

RocketMQ NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动源码解析

&#x1f52d; 嗨&#xff0c;您好 &#x1f44b; 我是 vnjohn&#xff0c;在互联网企业担任后端开发&#xff0c;CSDN 优质创作者 &#x1f4d6; 推荐专栏&#xff1a;Spring、MySQL、Nacos、Java&#xff0c;后续其他专栏会持续优化更新迭代 &#x1f332;文章所在专栏&#…

【5G VoNR】VoNR流程简述

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G技术研究。 博客内容主要围绕…

go-redis源码解析:如何实现sentinel高可用

go-redis里&#xff0c;sentinel只用来获取master和从节点的ip地址&#xff0c;在获取master和replica节点ip时&#xff0c;如果sentinel不可用&#xff0c;那么会换其他的sentinel重试&#xff0c;并将可用的sentinel换到第一个 1. 用于获取master节点 先通过读锁获取c.senti…

模板进阶:非类型模板参数,类模板特化,模板的编译分离

1. 非类型模板参数 模板参数分类类型形参与非类型形参。 类型形参即&#xff1a;出现在模板参数列表中&#xff0c;跟在class或者typename之类的参数类型名称。 非类型形参&#xff0c;就是用一个常量作为类(函数)模板的一个参数&#xff0c;在类(函数)模板中可将该参数当成常…

【Unity数据交互】如何Unity中读取Ecxel中的数据

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 专栏交流&#x1f9e7;&…

2024/7/7周报

文章目录 摘要Abstract文献阅读题目问题本文贡献问题描述图神经网络Framework实验数据集实验结果 深度学习MAGNN模型相关代码GNN为什么要用GNN&#xff1f;GNN面临挑战 总结 摘要 本周阅读了一篇用于多变量时间序列预测的多尺度自适应图神经网络的文章&#xff0c;多变量时间序…

Vulkan 学习(1)---- Vulkan 基本概念和发展历史

目录 Vulkan及其演化史Vulkan 基本概念基本术语 Vulkan 的原理Vulkan应用程序Vulkan的编程模型硬件初始化窗口展示表面资源设置流水线设置描述符和描述符缓冲池基于SPIR-V的着色器流水线管理指令的记录队列的提交 Vulkan及其演化史 目前主流的图形渲染API有OpenGL、OpenGL ES、…

ROS——多个海龟追踪一个海龟实验

目标 通过键盘控制一个海龟&#xff08;领航龟&#xff09;的移动&#xff0c;其余生成的海龟通过监听实现追踪定期获取领航龟和其余龟的坐标信息&#xff0c;通过广播告知其余龟&#xff0c;进行相应移动其余龟负责监听 疑惑点&#xff08;已解决&#xff09; int main(int…

【感谢告知】本账号内容调整,聚焦于Google账号和产品的使用经验和问题案例分析

亲爱的各位朋友&#xff1a; 感谢您对本账号的关注和支持&#xff01; 基于对朋友们需求的分析和个人兴趣的转变&#xff0c;该账号从今天将对内容做一些调整&#xff0c;有原来的内容改为Google&#xff08;谷歌&#xff09;账号和产品的使用经验&#xff0c;以及相关问题的…

判断是否为完全二叉树

目录 分析 分析 1.完全二叉树的概念&#xff1a;对于深度为K的&#xff0c;有n个结点的二叉树&#xff0c;当且仅当其每一个结点都与深度为K的满二叉树中编号从1至n的结点一一对应时称之为完全二叉树。 要注意的是满二叉树是一种特殊的完全二叉树。 2.思路&#xff1a;可以采…

Redis源码整体结构

一 前言 Redis源码研究为什么先介绍整体结构呢?其实也很简单,作为程序员的,要想对一个项目有快速的认知,对项目整体目录结构有一个清晰认识,有助于我们更好的了解这个系统。 二 目录结构 Redis源码download到本地之后,对应结构如下: 从上面的截图可以看出,Redis源码一…

pdf怎么转换成图片格式文件,pdf文档怎么转换成图片格式

在数字化时代&#xff0c;pdf文件转换成图片格式是一种常见的操作&#xff0c;无论是在工作还是日常生活中&#xff0c;我们总会遇到需要将pdf文件转换为图片的需求。这可能是因为图片格式更易于分享、展示或编辑。那么&#xff0c;如何高效地将pdf转换成图片呢&#xff1f;本文…