探索ClickHouse——连接Kafka和Clickhouse

安装Kafka

新增用户

sudo adduser kafka
sudo adduser kafka sudo
su -l kafka

安装JDK

sudo apt-get install openjdk-8-jre

下载解压kafka

可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否则会报“Classpath is empty”之类的错误。

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1

配置

配置kafka

vim ~/kafka/config/server.properties

将下面这行加入文件的末尾

# ~/kafka/config/server.properties
delete.topic.enable=true

同时修改log的路径

# ~/kafka/config/server.properties
log.dirs=/home/kafka/logs

创建zookeeper service

sudo vim /etc/systemd/system/zookeeper.service

将下面内容填入上述文件中

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

创建kafka service

sudo vim /etc/systemd/system/kafka.service

将下面内容填入上述文件中

[Unit]
Requires=zookeeper.service
After=zookeeper.service[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

启动kafka#

启动服务

sudo systemctl start kafka

查看状态

sudo systemctl status kafka

● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-09-28 03:09:39 UTC; 4s ago
Main PID: 3561758 (sh)
Tasks: 42 (limit: 2143)
Memory: 292.4M
CPU: 2.768s
CGroup: /system.slice/kafka.service
├─3561758 /bin/sh -c “/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1”
└─3561760 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/>
Sep 28 03:09:39 ubuntua systemd[1]: Started kafka.service.

可以看到kafka已经处于running状态。

测试

创建Topic

~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

发送消息

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

订阅Topic

新启动一个界面,执行下面命令

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

它会收到上面发的消息

Hello, World

连接

创建表

使用kafka engine将kafka中的流映射到一个表中。我们以《探索ClickHouse——使用Projection加速查询》中的数据为例。

clickhouse-client --stream_like_engine_allow_direct_select 1
CREATE TABLE uk_price_paid_from_kafka (uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list='TutorialTopic', kafka_group_name='clickhouse', kafka_format='CSV', kafka_skip_broken_messages=1, kafka_num_consumers=1;

CREATE TABLE uk_price_paid_from_kafka
(
uuid_string String,
price_string String,
time String,
postcode String,
a String,
b String,
c String,
addr1 String,
addr2 String,
street String,
locality String,
town String,
district String,
county String,
d String,
e String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = ‘localhost:9092’, kafka_topic_list = ‘TutorialTopic’, kafka_group_name = ‘clickhouse’, kafka_format = ‘CSV’, kafka_skip_broken_messages = 1, kafka_num_consumers = 1
Query id: 07a063e9-6a61-42c0-8fec-1fe2f119ee28
Ok.
0 rows in set. Elapsed: 0.008 sec.

给kafka发送消息

~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic

进入消息输入模式,发送下面两个消息

"{F887F88E-7D15-4415-804E-52EAC2F10958}","70000","1995-07-07 00:00","MK15 9HP","D","N","F","31","","ALDRICH DRIVE","WILLEN","MILTON KEYNES","MILTON KEYNES","MILTON KEYNES","A","A"
"{40FD4DF2-5362-407C-92BC-566E2CCE89E9}","44500","1995-02-03 00:00","SR6 0AQ","T","N","F","50","","HOWICK PARK","SUNDERLAND","SUNDERLAND","SUNDERLAND","TYNE AND WEAR","A","A"

在这里插入图片描述

Clickhouse收到消息

在clickhouse-client交互终端中执行下面指令:

select * from uk_price_paid_from_kafka;

在这里插入图片描述
可以看到之前发送给kafka Topic的内容在Clickhouse中被收到了。

问题

后面我再在clickhouse-client交互终端中查询不到数据了。即使我们给kafka该主题发消息,也查询不到。后面我们再将《探索ClickHouse——使用MaterializedView存储kafka传递的数据》中讲解使用MaterializedView清洗和固化kafka的数据。

参考资料

  • https://openjdk.org/install/
  • https://kafka.apache.org/quickstart
  • https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04#step-2-mdash-downloading-and-extracting-the-kafka-binaries
  • https://cloud.tencent.com/developer/article/1892086
  • https://sineyuan.github.io/post/clickhouse-kafka/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/147125.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

centos7卸载docker

菜鸟教程-常见命令:https://www.runoob.com/docker/docker-command-manual.html 1. 准备工作: 1.1 杀死docker有关的容器: docker kill $(docker ps -a -q)1.2 删除所有docker容器: docker rm $(docker ps -a -q)1.3 删除所有d…

wordpress搭建自己的博客详细过程以及踩坑

WordPress作为一款开源的内容管理系统(CMS),具有诸多优势。首先,它的易用性使得即使对于没有编程经验的用户来说也能轻松上手,通过直观的用户界面和友好的管理工具,用户可以方便地创建、编辑和发布内容。其…

【python学习第12节 pandas】

文章目录 一,pandas1.1 pd.Series1.2 pd.date_range1.3 pd_DataFrame1.4浏览数据1.5布尔索引1.6设置值1.7操作1.8合并1.8.1concat()函数1.8.2 merge()函数 一,pandas 1.1 pd.Series pd.Series 是 Pandas 库中的一个数据结构&…

爆文采集器-热点爆文章采集工具

当信息在互联网上迅速传播,新闻迅速变化,自媒体创作者和信息追踪者们都希望能够捕捉到瞬息万变的热点话题,以吸引更多的关注和流量。爆文采集器成为了一项关键的工具,有助于他们在信息的海洋中找到并分享最新、最热门的内容。 热点…

AOP:分页参数统一校验

需求说明 为了保证系统的安全性,需要对所有的 查询列表 接口,添加分页参数,并对分页参数进行校验, ,保证参数的合法性。 比如, pageSize(每页显示条数),如果不做校验&a…

SpringCloud Alibaba - 基于 FeignClient 整合 Sentinel,实现“线程隔离”和“熔断降级”

目录 一、FeignClient 整合 Sentinel 1.1、整合原因 1.2、实现步骤 1.2.1、修改 OrderService 中的 application.yml 文件 1.2.2、给 FeignClient 编写失败后的降级逻辑 二、线程隔离 2.1、线程隔离的两种方式 2.1.1、线程池隔离 2.1.2、信号量隔离(Sentin…

国庆10.01

TCPselect 代码 服务器 #include<myhead.h> #include<sqlite3.h> #define PORT 6666 //端口号 #define IP "192.168.0.104" //IP地址//键盘事件 int jp(fd_set tempfds,int maxfd) {char buf[128] ""; //用来接收数据char buf1[128] …

电流流过电阻时会减小吗?

我相信很多人刚接触电路时都会有这个想法&#xff1a;由于电阻会抵抗或阻碍电荷的流动&#xff0c;假如现在电流往一个方向流动&#xff0c;且电路中只有一个电阻器&#xff0c;那么从电流流出的地方到刚接触电阻中间应该有有高电流&#xff0c;从电阻刚流出到最后应该有低电流…

使用WPS自动化转换办公文档: 将Word, PowerPoint和Excel文件转换为PDF

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Ubuntu配置深度学习环境(TensorFlow和PyTorch)

文章目录 一、CUDA安装1.1 安装显卡驱动1.2 CUDA安装1.3 安装cuDNN 二、Anaconda安装三、安装TensorFlow和pyTorch3.1 安装pyTorch3.2 安装TensorFlow2 四、安装pyCharm4.1 pyCharm的安装4.2 关联anaconda的Python解释器 五、VScode配置anaconda的Python虚拟环境 前言&#xff…

Java下正面解除警告Unchecked cast: ‘java.lang.Object‘ to ‘java.util.ArrayList‘

就是我在反序列化时&#xff0c;遇到这样一个警告&#xff1a; Unchecked cast: java.lang.Object to java.util.ArrayList<com.work1.Student>然后我去网上查&#xff0c;有些人说用SuppressWarnings(“unchecked”)去忽略警告&#xff0c;但是我觉得作为一名合格的程序…

AWS-Lambda之导入自定义包-pip包

参考文档&#xff1a; https://repost.aws/zh-Hans/knowledge-center/lambda-import-module-error-python https://blog.csdn.net/fxtxz2/article/details/112035627 简单来说,以 " alibabacloud_dyvmsapi20170525 " 包为例 ## 创建临时目录 mkdir /tmp cd ./tmp …

深入探讨 Presto 中的缓存

【squids.cn】 全网zui低价RDS&#xff0c;免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 Presto是一种流行的开源分布式SQL引擎&#xff0c;使组织能够在多个数据源上大规模运行交互式分析查询。缓存是一种典型的提高 Presto 查询性能的优化技术。它为 Prest…

循环语句

章节目录&#xff1a; 一、while 循环1.1 句式与基本使用1.2 while...else1.3 单行语句 二、for 循环2.1 句式与基本使用2.2 for...else2.3 range() 函数 三、退出循环3.1 break3.2 continue 四、pass 语句五、结束语 一、while 循环 1.1 句式与基本使用 句式&#xff1a; w…

postgresql-管理表空间

postgresql-管理表空间 基本概念创建表空间用户授权移动表空间 修改表空间移动表空间位置 删除表空间 基本概念 在 PostgreSQL 中&#xff0c;表空间&#xff08;tablespace&#xff09;表示数据文件的存放目录&#xff0c;这些数据文件代表了数 据库的对象&#xff0c;例如表…

buuctf-[RoarCTF 2019]Easy Java

第一次遇到java类的题目 打开环境&#xff0c;很像sql 点击help 以为是文件包含&#xff0c;&#xff0c;但是不对 这里需要了解JAVA WEB目录结构 WEB-INF&#xff1a;Java的web应用安全目录&#xff1b; 此外如果想在页面访问WEB-INF应用里面的文件&#xff0c;必须要通过w…

音乐创作软件:ToneLIB Jam v4.7.8 Crack

从强大的选项卡编辑器到 3D 模式 Tonelib Jam 是一款用于播放和创作音乐的综合软件应用程序。TL Jam专为初学者和经验丰富的吉他手而设计&#xff0c;可以提供一个完美的平台来掌握乐器&#xff0c;让您轻松学习自己喜欢的歌曲或设置高效的日常吉他练习程序。TL Jam 具有功能强…

华为云云耀云服务器L实例评测|使用华为云耀云服务器L实例的CentOS部署Docker并运行Tomcat应用

目录 前言 步骤1&#xff1a;登录到华为云耀云服务器L实例 步骤2&#xff1a;安装Docker 并验证Docker安装 步骤3&#xff1a;拉取Tomcat镜像并运行Tomcat容器 步骤4&#xff1a;放行8080端口 步骤5&#xff1a;访问tomcat 步骤6&#xff1a;管理Tomcat容器 小结 前言 …

27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

【论文阅读】大语言模型中的文化道德规范知识

&#x1f680;Write In Front&#x1f680; &#x1f4dd;个人主页&#xff1a;令夏二十三 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd; &#x1f4e3;系列专栏&#xff1a;论文阅读 &#x1f4ac;总结&#xff1a;希望你看完之后&#xff0c;能对…