大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 通过两篇来完成 集群模式配置、集群模式启动

在这里插入图片描述

基本介绍

Apache Druid 从 Kafka 中获取数据并进行分析的流程通常分为以下几个步骤:

  • Kafka 数据流的接入: Druid 通过 Kafka Indexing Service 直接从 Kafka 中摄取实时流数据。Kafka 是一个高吞吐量的消息队列,适合处理大量实时数据。Druid 会订阅 Kafka 的 topic,每当新数据到达时,它会自动从 Kafka 中读取数据。

  • 数据解析与转换: 数据从 Kafka 进入 Druid 后,首先会进行数据解析,通常采用 JSON、Avro 或 CSV 格式。解析的过程中,Druid 可以根据预定义的 schema 进行字段映射、过滤和数据转换,比如将字符串转为数值类型、提取时间戳等。这一步允许对数据进行初步处理,比如数据清洗或格式化。

  • 实时数据摄取与索引: Druid 将解析后的数据放入一个实时索引中,同时也将数据存储在内存中。Druid 的一个核心特点是,它会为每条记录生成倒排索引和 bitmap 索引,这样可以大大加快查询速度。实时摄取的数据在内存中保存一段时间,直到满足一定条件(比如时间或数据量),然后会以段的形式写入深度存储(如 HDFS 或 S3)。

  • 批处理与历史数据合并: Druid 支持实时和批处理的混合模式。当实时摄取的数据段被持久化到深度存储后,Druid 可以自动将这些段与批处理数据合并。这种设计确保了在数据分析时,既能查询到最新的实时数据,也能访问历史数据。批处理数据可以通过 Hadoop 或 Spark 等框架预先批量加载到 Druid 中。

  • 数据分片与副本管理: Druid 支持水平扩展,通过分片将数据分布在多个节点上。每个分片可以有多个副本,这样可以保证系统的高可用性和容错性。通过负载均衡,Druid 可以有效处理大规模查询请求,尤其是在数据量非常大的情况下。

  • 查询与分析: Druid 的查询系统基于 HTTP/JSON API,支持多种类型的查询,如时间序列查询、分组聚合查询、过滤查询等。Druid 的查询引擎设计非常高效,可以处理大规模的 OLAP(在线分析处理)查询。由于 Kafka 中的数据是实时流式的,Druid 的查询结果通常可以反映出最新的业务指标和分析结果。

  • 可视化与监控: Druid 的数据可以与 BI 工具(如 Superset、Tableau)集成,生成实时的报表和仪表盘。用户可以通过这些可视化工具,实时监控业务指标,做出数据驱动的决策。

整个流程中,Druid 负责将 Kafka 中的数据转化为高效的、可查询的 OLAP 格式,并且通过索引和分布式架构实现高效查询。这个系统可以被广泛应用于实时监控、用户行为分析、金融交易分析等场景。

从Kafka中加载数据

典型架构

在这里插入图片描述

日志业务中,我们不会在Druid中处理复杂的数据转换清晰工作

案例测试

假设有以下网络流量数据:

  • ts:时间戳
  • srcip:发送端IP地址
  • srcport:发送端端口号
  • dstip:接收端IP地址
  • dstport:接收端端口号
  • protocol:协议
  • packets:传输包
  • bytes:传输的字节数
  • cost: 传输耗费的时间

数据是JSON格式,通过Kafka传输
每行数据包含:

  • 时间戳
  • 维度列
  • 指标列

需要计算的指标:

  • 记录的条数:count
  • packets:max
  • bytes:min
  • cost:sum

数据汇总粒度:分钟

测试数据

{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}{"ts":"2020-10-01T00:01:38Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":4, "bytes":4000, "cost": 0.1}{"ts":"2020-10-01T00:02:08Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":5, "bytes":5000, "cost": 0.2}{"ts":"2020-10-01T00:02:09Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":6, "bytes":6000, "cost": 0.2}{"ts":"2020-10-01T00:02:10Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":7, "bytes":7000, "cost": 0.2}{"ts":"2020-10-01T00:02:11Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":8, "bytes":8000, "cost": 0.2}{"ts":"2020-10-01T00:02:12Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":9, "bytes":9000, "cost": 0.2}

写入的数据如下所示:
在这里插入图片描述

启动Kafka

这里由于资源比较紧张,我就只启动一台Kafka了:
我在 h121 节点上启动

kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties

创建 Topic

kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1 --topic druid1

推送消息

kafka-console-producer.sh --broker-list h121.wzk.icu:9092 --topic druid1

输出我们刚才的数据,一行一行的写入输入进行(后续要用)。

提取数据

浏览器打开我们之前启动的Druid服务

http://h121.wzk.icu:8888/

LoadData

点击控制台中的 LoadData 模块:
在这里插入图片描述

Streaming

选择 Streaming:
在这里插入图片描述

Kafka

继续选择Kafka,点击 ConnectData,在右侧输入对应的信息,点级Apply:

  • h121.wzk.icu:9092
  • druid1

在这里插入图片描述

ParserData

此时可以看到右下角有:Next: Parse Data:
在这里插入图片描述
数据虽然加载了,但是格式不对,我们在右侧选择:JSON:
在这里插入图片描述

点击之后,可以看到,(如果你解析不顺利,可以用这个尝试)点击 Add column flattening
在这里插入图片描述
如果正常解析,数据应该是这个样子:
在这里插入图片描述

ParserTime

继续点击 Next Parse Time:
在这里插入图片描述

Transform

继续点击 Next Transform:

  • 不建议在Druid中进行复杂的数据变化操作,可考虑将这些操作放在数据预处理的过程中处理
  • 这里没有定义数据转换

在这里插入图片描述

Filter

继续点击 Next Filter:

  • 不建议在Druid中进行复杂的数据过滤操作,可以考虑将这些操作放在数据预处理中
  • 这里没有定义数据过滤

在这里插入图片描述

Configuration Schema

点击 Next Configuration Schema:

  • 定义指标列、维度列
  • 定义如何在维度列上进行计算
  • 定义是否在摄取数据时进行数据的合并(即RollUp),以及RollUp的粒度

在这里插入图片描述
此时点击右侧的:RollUp,会看到数据被聚合成了两条:
在这里插入图片描述
聚合结果:
在这里插入图片描述

Partition

点击 Next Partition:

  • 定义如何进行数据分区
  • Primary partitioning 有两种方式:
  • 方式1:uniform,以一个固定的时间间隔聚合函数数据,建议使用这种方式,这里将每天的数据作为一个分区
  • 方式2:arbitary,尽量保证每个 segements大小一致,时间间隔不固定
  • Secondary Partitioning
  • 参数1:Max rows per segment,每个Segment最大的数据条数
  • 参数2:Max total rows,Segment等待发布的最大数据条数

在这里插入图片描述

Tune

点击 Next Tune:

  • 定义任务执行和优化相关的参数

在这里插入图片描述

Publish

点击 Next Publish:

  • 定义Datasource的名称
  • 定义数据解析失败后采取的动作

在这里插入图片描述

Edit Special

点击 Next Edit spec:

  • JSON串为数据摄取规范,可返回之前的步骤中进行修改,也可以直接编辑规范内容,并在之前的步骤可以看到修改的结果
  • 摄取规范定义完成后,点击Submit会创建一个数据摄取的任务

在这里插入图片描述

Submit

点击 Submit 按钮:

在这里插入图片描述

数据查询

  • 数据摄取规范发布后生成Supervisor
  • Supervisor会启动一个Task,从kafka中摄取数据
    需要等待一段时间,Datasource才会创建完毕,选择 【Datasources】板块:
    在这里插入图片描述

点击末尾的三个小圆点,选择 Query With SQL:
在这里插入图片描述

会出现如下的界面,我们写入SQL,并运行:

SELECT *
FROM "druid1"

执行结果如下图:
在这里插入图片描述

数据摄取规范

{"type":"kafka","spec":{"ioConfig":Object{...},"tuningConfig":Object{...},"dataSchema":Object{...}}
}
  • dataSchema:指定传入数据的Schema
  • ioConfig:指定数据的来源和去向
  • tuningConfig:指定各种摄取参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1550751.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【HTML5】html5开篇基础(5)

1.❤️❤️前言~🥳🎉🎉🎉 Hello, Hello~ 亲爱的朋友们👋👋,这里是E绵绵呀✍️✍️。 如果你喜欢这篇文章,请别吝啬你的点赞❤️❤️和收藏📖📖。如果你对我的…

elementUI表格中某个字段(state)使用计算属性进行转换为对应中文显示

代码案例&#xff1a; <template><el-table:data"tableData"style"width: 100%"><el-table-columnprop"date"label"日期"width"180"/><el-table-columnprop"name"label"姓名"wid…

通信工程师笔记

第一章 1.支撑网是使业务网正常运行,增强网络功能,提供全网服务质量以满足用户要求的网络。 2.常见的有线通信线路包括&#xff08;1&#xff09;双绞线&#xff0c;&#xff08;2&#xff09;同轴电缆&#xff0c;&#xff08;3&#xff09;光纤等&#xff0c;无线通信线路是…

哈希-01-数据分类处理

文章目录 1. 题目描述2. 思路3. 代码 1. 题目描述 信息社会&#xff0c;有海量的数据需要分析处理&#xff0c;比如公安局分析身份证号码、 QQ 用户、手机号码、银行帐号等信息及活动记录。 采集输入大数据和分类规则&#xff0c;通过大数据分类处理程序&#xff0c;将大数据…

【Preference Learning】Reasoning with Language Model is Planning with World Model

arxiv: https://arxiv.org/abs/2305.14992 问题背景&#xff1a;当前LLM推理受到几个关键因素的限制&#xff1a; &#xff08;1&#xff09;LLM缺乏世界模型&#xff08;一种人类就有的对环境的心理表征&#xff0c;可以模拟行动以及活动对外部世界状态的影响&#xff09;去…

TVS/ESD管应用问题点

TVS管全称是Transient Voltage Suppressor&#xff0c; 即瞬态抑制器二极管&#xff0c; 其主要为硅材料&#xff0c; 是 二极管的一种引申工艺器件。 根据其应用的定义&#xff0c; 更多为涉及浪涌等大功率方面的测试&#xff0c; 因 此TVS管主要指SMAJ、 SMBJ及其以上封装&a…

9.29学习

1.线上问题rebalance 因集群架构变动导致的消费组内重平衡&#xff0c;如果kafka集内节点较多&#xff0c;比如数百个&#xff0c;那重平衡可能会耗时导致数分钟到数小时&#xff0c;此时kafka基本处于不可用状态&#xff0c;对kafka的TPS影响极大 产生的原因 ①组成员数量发…

数据库软题1-数据模型+数据库三级模式两级映像

一、数据模型 (一)常见的数据模型 题1-二维表-关系模型 二、三级模式两级映像 (一) 外模式/模式/内模式 <>视图/基本表/文件 题1-三级模式与数据库的三对应 题2-三级模式与数据库的三对应 题3-视图是虚拟表 解析&#xff1a;视图是从一个或几个基本表&#xff08;或视…

ZYNQ中 PL 使用 PS 端晶振实现流水灯

ZYNQ中 PL 使用 PS 端晶振实现流水灯 Create Block Design 点击 Create Block Design, 然后点击 OK: 添加 IP 核 点击 Diagram 中的 号: 在 IP 核的搜索框中输入 ZYNQ, 找到 ZYNQ7 Processing System 的 IP 核,双击之: Viavado 会把 ZYNQ 的 IP 加到我们的 Block Design 中…

企业如何提升知识产权管理效率?

随着企业规模的扩大和创新活动的增加&#xff0c;知识产权管理日益复杂。有效的知识产权管理不仅能够保护企业的创新成果&#xff0c;还能为企业带来巨大的商业价值。然而&#xff0c;许多企业在知识产权管理方面面临着效率低下的问题&#xff0c;管理效率的提升成为企业亟需解…

交换机支持的以太网协议大全

在现代网络架构中,以太网协议是基础设施的核心组成部分,而交换机作为以太网网络的关键设备,其支持的协议种类繁多。理解这些协议的功能和应用场景对于网络管理员和工程师而言至关重要,因为这些协议决定了网络的性能、稳定性和安全性。 以太网协议的发展经历了从最初的10Mb…

使用apipost工具导入通过swag生成的golang接口文档步骤

1.通过swag init 生成docs接口文档 见使用swag init --parseDependency生成api文档报错解决办法 2.导入操作 ok,操作完成

NB_IOT类产品的通信功能生产测试方案

01 物联网行业中存在的问题 在产品的生产流程中&#xff0c;NB 产品的 Socket 通信测试是一个至关重要的环节。Socket 通信作为一种常用的网络通信方式&#xff0c;对于 NB 产品的性能和稳定性有着直接的影响。 在这个阶段&#xff0c;测试人员会精心设计一系列的测试用例&a…

基于OpenCV的实时年龄与性别识别(支持CPU和GPU)

关于深度实战社区 我们是一个深度学习领域的独立工作室。团队成员有&#xff1a;中科大硕士、纽约大学硕士、浙江大学硕士、华东理工博士等&#xff0c;曾在腾讯、百度、德勤等担任算法工程师/产品经理。全网20多万粉丝&#xff0c;拥有2篇国家级人工智能发明专利。 社区特色…

NineData云原生智能数据管理平台新功能发布|2024年9月版

本月发布 3 项更新&#xff0c;其中重点发布 3 项。 重点发布 数据复制 - 新增 Oracle 到 Kafka 复制链路 数据复制功能新增支持将 Oracle 中的数据复制到 Kafka&#xff0c;实现数据的实时流转和分发&#xff0c;支持全量和增量。 数据复制 - 新增库表分组复制 创建复制任务…

网上很好看的动态音频效果是怎么做的?只需两个步骤实现动态音乐效果!

网上很好看的动态音频效果是怎么做的&#xff1f;只需两个步骤实现动态音乐效果&#xff01;大家听音乐的时候&#xff0c;有没看到别人桌面上会有一些音频效果&#xff0c;随着音乐的节奏而跳动。 其实这是用了一些桌面插件&#xff0c;来显示音频效果&#xff0c;咱们这期就…

【小程序】微信小程序课程 -4 项目实战

目录 1、 效果图 2、创建项目 2.1 创建小程序端 2.1.1 先创建纯净项目 2.1.2 删除components 2.1.4 删除app.json红色部分 2.1.5 删除index.json红色部分 2.1.6 删除index.wxss全部内容 2.1.7 删除index.wxml全部内容 2.1.8 app.json创建4个页面 2.1.9 app.json添加…

学习大模型新人必看,大语言模型(LLM)入门学习路线图

Github项目上有一个大语言模型学习路线笔记&#xff0c;它全面涵盖了大语言模型的所需的基础知识学习&#xff0c;LLM前沿算法和架构&#xff0c;以及如何将大语言模型进行工程化实践。这份资料是初学者或有一定基础的开发/算法人员入门活深入大型语言模型学习的优秀参考。这份…

淘宝api上货软件)一刻工具箱,一天上几万不出现爬虫违规,更新开放类目错放功能,淘宝电商必备软件!

天猫淘宝抖音上货神器&#xff0c;助力电商快速铺货 在当今这个信息爆炸、电商飞速发展的时代&#xff0c;如何快速有效地将产品铺货到各大电商平台&#xff0c;成为每一位电商从业者都需要面对的问题。 通过电商API接口能为电商从业者打造的综合辅助工具&#xff0c;支持天猫、…