【大数据测试spark+kafka-详细教程(附带实例)】

大数据测试:Spark + Kafka 实时数据处理与窗口计算教程

  • 1. 概述
    • 1.1 大数据技术概述
    • 1.2 Apache Kafka 与 Spark 的结合
  • 2. 技术原理与流程
    • 2.1 Kafka 简介
    • 2.2 Spark Streaming 简介
    • 2.3 数据流动与处理流程
  • 3. 环境配置
    • 3.1 安装依赖项
  • 4. 实例:实时数据处理与窗口计算
    • 4.1 Kafka 生产者代码
    • 4.2 Spark Streaming 消费者代码
    • 4.3 解释与操作
  • 5. 运行与测试
    • 5.1 创建 Kafka Topic
    • 5.2 启动 Kafka 生产者
    • 5.3 启动 Spark Streaming 程序
    • 5.4 输出结果
  • 6. 总结

1. 概述

1.1 大数据技术概述

大数据(Big Data)指的是无法用传统数据库技术和工具进行处理和分析的超大规模数据集合。在大数据技术中,实时数据流的处理尤为重要,尤其是如何高效地对海量的实时数据进行采集、存储、处理与分析。

在这方面,Apache KafkaApache Spark 是两个关键技术。Kafka 作为分布式流处理平台,可以高效地进行实时数据流的生产和消费,而 Spark 提供了强大的分布式计算能力,尤其是其扩展的流式计算模块 Spark Streaming,非常适合处理实时数据流。

1.2 Apache Kafka 与 Spark 的结合

  • Kafka 是一个分布式消息队列,可以处理高吞吐量、低延迟的实时数据流。Kafka 被广泛用于日志收集、监控系统、实时数据传输等场景。
  • Spark 是一个统一的分析引擎,支持批量处理、流式处理和图计算。Spark Streaming 是 Spark 的一个流式处理组件,用于实时处理流数据。

通过结合 Kafka 和 Spark,我们可以实现大规模数据的实时处理、聚合和窗口计算。Spark 可以从 Kafka 消费数据流,并进行实时计算与分析,适用于诸如实时日志分析、用户行为分析、实时推荐等场景。


2. 技术原理与流程

2.1 Kafka 简介

Kafka 是一个分布式的消息队列系统,能够实现高吞吐量、可扩展性、容错性。它的基本组成包括:

  • Producer(生产者):负责向 Kafka 发送数据。
  • Consumer(消费者):从 Kafka 中消费数据。
  • Broker(代理):Kafka 的节点,每个节点负责存储消息。
  • Topic(主题):消息被组织在 Topic 中,生产者向 Topic 发送数据,消费者从 Topic 中读取数据。
  • Partition(分区):Kafka 支持水平分区,使得数据可以分布在多个 Broker 上。

2.2 Spark Streaming 简介

Spark Streaming 是 Spark 的流处理模块,它以 DStream(离散流)为基本数据结构,能够实时地处理数据流。DStream 是一个连续的 RDD(弹性分布式数据集),Spark Streaming 将实时流数据划分成一个个小的批次,使用批处理模型对这些小批次进行处理。

2.3 数据流动与处理流程

  1. Kafka Producer:将数据发送到 Kafka Topic。
  2. Kafka Broker:Kafka 集群负责存储和转发数据。
  3. Spark Streaming:通过 Kafka 的消费者接口从 Topic 中消费数据。
  4. 数据处理与计算:在 Spark Streaming 中进行数据聚合、过滤、窗口计算等操作。
  5. 输出结果:将处理后的数据输出到外部系统,如 HDFS、数据库或控制台。

3. 环境配置

3.1 安装依赖项

  1. 安装 Java:确保安装了 Java 8 或更高版本。

    检查版本:

    java -version
    
  2. 安装 Apache Spark:从 Apache Spark 官网 下载并安装 Spark。

  3. 安装 Apache Kafka:从 Kafka 官网 下载并安装 Kafka。

  4. Maven 配置:在 Java 项目中使用 Maven 作为构建工具,添加必要的 Spark 和 Kafka 依赖。

pom.xml 文件中添加 Spark 和 Kafka 的 Maven 依赖:

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming Kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency><!-- Kafka Consumer --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

4. 实例:实时数据处理与窗口计算

4.1 Kafka 生产者代码

以下是一个简单的 Kafka 生产者,用于生成模拟的用户行为日志(如点击事件)并发送到 Kafka Topic logs

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 模拟用户点击日志数据String[] actions = {"click", "view", "scroll"};String[] users = {"user1", "user2", "user3"};// 向 Kafka 发送模拟数据for (int i = 0; i < 100; i++) {String user = users[i % 3];String action = actions[i % 3];String timestamp = String.valueOf(System.currentTimeMillis() / 1000);String value = user + "," + action + "," + timestamp;producer.send(new ProducerRecord<>("logs", null, value));try {Thread.sleep(1000); // 每秒发送一条数据} catch (InterruptedException e) {e.printStackTrace();}}producer.close();}
}

4.2 Spark Streaming 消费者代码

以下是一个 Spark Streaming 程序,它从 Kafka Topic logs 中消费数据并进行窗口计算,统计每个用户在过去 10 秒内的点击次数。

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.List;public class SparkKafkaWindowExample {public static void main(String[] args) throws InterruptedException {// 初始化 Spark StreamingContextJavaStreamingContext jssc = new JavaStreamingContext("local[2]", "SparkKafkaWindowExample", new Duration(2000));// Kafka 配置参数String brokers = "localhost:9092";String groupId = "spark-consumer-group";String topic = "logs";// Kafka 参数设置Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", groupId);kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", "false");List<String> topics = Arrays.asList(topic);// 从 Kafka 获取数据流JavaReceiverInputDStream<ConsumerRecord<String, String>> stream =KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams));// 处理每条记录:解析用户、动作和时间戳JavaPairRDD<String, String> userActions = stream.mapToPair(record -> {String[] fields = record.value().split(",");return new Tuple2<>(fields[0], fields[1]); // userId, action});// 定义窗口大小为 10 秒,滑动间隔为 5 秒JavaPairRDD<String, Integer> userClickCounts = userActions.window(new Duration(10000), new Duration(5000)) // 滑动窗口.reduceByKeyAndWindow((Function2<Integer, Integer, Integer>) Integer::sum,new Duration(10000), // 窗口大小:10秒new Duration(5000)   // 滑动间隔5);// 输出每个窗口的用户点击次数userClickCounts.foreachRDD(rdd -> {rdd.collect().forEach(record -> {System.out.println("User: " + record._1() + ", Click Count: " + record._2());});});// 启动流式处理jssc.start();jssc.awaitTermination();}
}

4.3 解释与操作

  • Kafka 配置:配置 Kafka 参数,连接到 Kafka 服务,订阅 Topic logs
  • 数据解析:从 Kafka 消费数据后,解析每条日志(如 user1,click,1609459200)。
  • 窗口计算:使用 window() 定义一个窗口,窗口大小为 10 秒,滑动间隔为 5 秒。使用 reduceByKeyAndWindow() 聚合每个窗口内的用户点击次数。
  • 输出结果:每 5 秒统计一次过去 10 秒内的用户点击次数,输出到控制台。

5. 运行与测试

5.1 创建 Kafka Topic

在 Kafka 中创建 Topic logs

kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

5.2 启动 Kafka 生产者

运行 Kafka 生产者代码,模拟数据发送到 Kafka:

java KafkaProducerExample

5.3 启动 Spark Streaming 程序

运行 Spark Streaming 程序,消费 Kafka 数据并执行窗口计算:

java SparkKafkaWindowExample

5.4 输出结果

每隔 5 秒输出用户的点击次数,如:

User: user1, Click Count: 3
User: user2, Click Count: 5

6. 总结

通过结合使用 Apache KafkaApache Spark,我们可以高效地处理大规模的实时数据流。Kafka 负责消息的可靠传输,而 Spark Streaming 负责实时计算和分析。使用窗口计算(如 window()reduceByKeyAndWindow()),我们可以在不同时间段内对数据进行聚合,适用于实时监控、推荐系统、用户行为分析等场景。

此架构适用于需要处理大数据、实时响应的应用程序,并能满足高吞吐量、低延迟的要求。


推荐阅读:《大数据 ETL + Flume 数据清洗》,《大数据测试 Elasticsearch》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/9745.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【linux】再谈网络基础(二)

8. 再谈端口号 &#xff08;一&#xff09;与协议之间的关系 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/IP协议中, 用 "源IP", "源端口号", "目的IP", "目的端口号", "协议号" 这样一个五元组来标识…

关键词策略的有效运用提升内容价值和搜索排名的关键

内容概要 在当今的数字时代&#xff0c;关键词策略是确保内容创作成功的重要基础。无论是个人博客还是商业网站&#xff0c;合适的关键词不仅能够提升文章的可见性&#xff0c;还能显著影响搜索引擎的排名。合理运用关键词&#xff0c;有助于吸引目标读者&#xff0c;将他们引…

1.62亿元!812个项目立项!上海市2024年度“科技创新行动计划”自然科学基金项目立项

本期精选SCI&EI ●IEEE 1区TOP 计算机类&#xff08;含CCF&#xff09;&#xff1b; ●EI快刊&#xff1a;最快1周录用&#xff01; 知网(CNKI)、谷歌学术期刊 ●7天录用-检索&#xff08;100%录用&#xff09;&#xff0c;1周上线&#xff1b; 免费稿件评估 免费匹配期…

【Ant Design Pro】不想用轻量的hook就喜欢用dva的数据状态管理

就像TS是JS的超集一样&#xff0c;antdpro框架也类似&#xff0c;底层也是用dva来构建的。关于数据管理&#xff0c;官方还是建议我们使用轻量的hooks方法来进行操作使用。 使用dva实现数据状态管理效果 框架中的数据管理模式 简单的数据共享 对于简单的应用&#xff0c;不需…

requestAnimationFrame与setInterval的抉择

&#x1f64c; 如文章有误&#xff0c;恳请评论区指正&#xff0c;谢谢&#xff01; ❤ 写作不易&#xff0c;「点赞」「收藏」「转发」 谢谢支持&#xff01; 背景 在之前的业务中遇到有 JS 动画的实现场景&#xff0c;但当电脑打开太多网页或是同时启动很多应用时&#xff0c…

【C++练习】使用海伦公式计算三角形面积

编写并调试一个计算三角形面积的程序 要求&#xff1a; 使用海伦公式&#xff08;Herons Formula&#xff09;来计算三角形的面积。程序需要从用户那里输入三角形的三边长&#xff08;实数类型&#xff09;。输出计算得到的三角形面积&#xff0c;结果保留默认精度。提示用户…

附件商户,用户签到,uv统计功能(geo,bitmap,hyperloglog结构的使用)

目录 附近商户一&#xff1a;Geo数据结构二&#xff1a;附近商户搜索 用户签到一&#xff1a;BitMap功能演示二&#xff1a;实现签到功能三&#xff1a;统计签到功能 uv统计一&#xff1a;hyperloglog的用法二&#xff1a;测试百万数据的tji二&#xff1a;测试百万数据的tji 附…

【LuatOS】修改LuatOS源码为PC模拟器添加高精度时间戳库timeplus

0x00 缘起 LuatOS以及Lua能够提供微秒或者毫秒的时间戳获取工具&#xff0c;但并没有提供获取纳秒的工具。通过编辑LuatOS源码以及相关BSP源码&#xff0c;添加能够获取纳秒的timeplus库并重新编译&#xff0c;以解决在64位Windows操作系统中LuatOS模拟器获取纳秒的问题&#…

[Python学习日记-64] 组合

[Python学习日记-64] 组合 简介 继承与组合 组合的使用 简介 继承其实就是生活当中的归类&#xff0c;就是把对象之间的共同特征再一次提炼&#xff0c;然后形成一个类&#xff0c;但是在实际的开发当中不单单只有归类这一个动作&#xff0c;对象与对象之间都会有一些关系&a…

关于stm32中IO映射的一些问题

在STM32固件库&#xff08;比如HAL或LL库&#xff09;中&#xff0c;GPIO的寄存器映射已经定义好了&#xff0c;开发者可以通过标准的读写操作访问GPIO引脚的状态。 一、我们可以直接通过位移操作来修改特定值。 二、下面我们提供另一种方法&#xff0c;位带操作 首先要定义一…

Python游戏开发之《人机大战象棋》-附完整源码-python教程

今天给大家带来的是人机大战的象棋 中国象棋 首先绘制一下棋盘&#xff0c;看看样子&#xff1a; 黑白经典款 绘制棋盘&#xff1a; class Board(QLabel):棋盘坐标与屏幕坐标类似&#xff0c;左上角为 (0, 0)&#xff0c;右下角为 (8, 9)BOARD str(dirpath / u"images…

AutoCAD2014

链接: https://pan.baidu.com/s/1Q4fhVmiSYDZ2DbPNi7m4cA 提取码: f3bm

免费送源码:Java+ssm+MySQL 在线购票影城 计算机毕业设计原创定制

摘要 随着互联网趋势的到来&#xff0c;各行各业都在考虑利用互联网将自己推广出去&#xff0c;最好方式就是建立自己的互联网系统&#xff0c;并对其进行维护和管理。在现实运用中&#xff0c;应用软件的工作规则和开发步骤&#xff0c;采用Java技术建设在线购票影城。 本设计…

MYSQL——事务管理

什么是事务 在数据库使用者角度&#xff0c;事务就是完成一个事件。例如一个员工信息数据库&#xff0c;要完成员工离职的事件&#xff0c;可能需要很多操作&#xff0c;比如删除员工基本信息以及员工在公司的表现&#xff0c;薪资水平等。而这一系列的操作就是为了完成员工离…

书生实战营第四期-基础岛第四关-InternLM + LlamaIndex RAG 实践

一、任务要求1 基于 LlamaIndex 构建自己的 RAG 知识库&#xff0c;寻找一个问题 A 在使用 LlamaIndex 之前 浦语 API 不会回答&#xff0c;借助 LlamaIndex 后 浦语 API 具备回答 A 的能力&#xff0c;截图保存。 1、配置开发机系统 镜像&#xff1a;使用 Cuda12.0-conda 镜…

LC:二分查找——杂记

文章目录 268. 丢失的数字162. 寻找峰值 268. 丢失的数字 LC将此题归类为二分查找&#xff0c;并且为简单题&#xff0c;下面记一下自己对这道题目的思考。 题目链接&#xff1a;268.丢失的数字 第一次看到这个题目&#xff0c;虽然标注的为简单&#xff0c;但肯定不能直接排…

推荐一款国产数据库管理工具Chat2DB

什么是 Chat2DB ? Chat2DB 是一款专为现代数据驱动型企业打造的数据库管理、数据开发及数据分析工具。作为一款AI原生的产品&#xff0c;Chat2DB 将人工智能技术与传统数据库管理功能深度融合&#xff0c;旨在提供更为智能、便捷的工作体验&#xff0c;助力用户高效地管理数据…

前端三件套(HTML + CSS + JS)

前言&#xff1a; 前端三件套&#xff0c;会用就行 毕竟在后面学习JavaWeb&#xff0c;以及在学习vue的时候也有帮助 前端三件套&#xff1a; HTML 定义网页的结构和内容。CSS 负责网页的样式和布局。JavaScript 添加动态交互和功能。 使用到的工具是Visual Studio Code 即…

Flutter错误: uses-sdk:minSdkVersion 16 cannot be smaller than version 21 declared

前言 今天要做蓝牙通信的功能&#xff0c;我使用了flutter_reactive_ble这个库&#xff0c;但是在运行的时候发现一下错误 Launching lib/main.dart on AQM AL10 in debug mode... /Users/macbook/Desktop/test/flutter/my_app/android/app/src/debug/AndroidManifest.xml Err…

网络编程示例之网络基础知识

TCP/IP 中有两个具有代表性的传输层协议&#xff0c;分别是 TCP 和 UDP&#xff1a; TCP 是面向连接的、可靠的流协议。流就是指不间断的数据结构&#xff0c;当应用程序采用 TCP 发送消息时&#xff0c;虽然可以保证发送的顺序&#xff0c;但还是犹如没有任何间隔的数据流发送…