kafka动态认证 自定义认证 安全认证-亲测成功

kafka动态认证 自定义认证 安全认证-亲测成功

背景

Kafka默认是没有安全机制的,一直在裸奔。用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本以后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL),认证机制是SASL/PLAIN。

kafka下载安装

我这里用windows做的测试,部署到Linux上也是一样

官方下载地址:https://kafka.apache.org/downloads

我这里下载的kafka版本是:kafka_2.12-3.5.0.tgz

直接解压,如下图

在这里插入图片描述

启动zookeeper

这里的zookeeper配置其实没有做任何修改,zookeeper这里不做认证控制。

zookeeper配置文件在kafka_2.12-3.5.0\config\zookeeper.properties下,不用做任何修改

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=D:\kafka_2.12-3.5.0\zookeeper# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080#authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
#requireClientAuthScheme=sasl
#jaasLoginRenew=3600000

进入kafka主目录,打开cmd

#启动zookeeper
bin\windows\zookeeper-server-start.bat  config\zookeeper.properties

在这里插入图片描述

zookeeper-server-start.bat 启动脚本

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.IF [%1] EQU [] (echo USAGE: %0 zookeeper.propertiesEXIT /B 1
)rem set KAFKA_OPTS=-Djava.security.auth.login.config=D:\kafka_2.12-3.5.0\config\kafka_zookeeper_jaas.conf
SetLocal
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
"%~dp0kafka-run-class.bat" org.apache.zookeeper.server.quorum.QuorumPeerMain %*
EndLocal

在这里插入图片描述

kafka自定义认证配置

kafka的用户认证,是基于java的jaas。所以我们需要先添加jaas服务端的配置文件。

在kafka_2.12-3.5.0\config目录下新建kafka_jaas.conf 配置信息如下:

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-liang"user_admin="admin-123456"user_liang="liang-123456";
};

注意最后一个属性后面需要加分号!配置是不难理解的,第一行指定PlainLoginModule,算是声明这是一个SASL/PLAIN的认证类型,如果是其他的,那么就需要reqired其他的类。username和password则是用于集群内部broker的认证用的。

这里会让人疑惑的,应该是user_admin和user_liang这两个属性了。这个其实是用来定义用户名和密码的,形式是这样:user_userName=password。所以这里其实是定义了用户admin和用户liang对应的密码。

这一点可以在源码的PlainServerCallbackHandler类中找到对应的信息,kafka源码中显示,对用户认证的时候,就会到jaas配置文件中,通过user_username属性获取对应username用户的密码,再进行校验。当然这样也导致了该配置文件只有重启才会生效,即无法动态添加用户。

写完配置后,需要在kafka的配置中添加jaas文件的路径。在kafka_2.12-3.5.0/bin/kafka-run-class.sh中,找到下面的配置,修改KAFKA_OPTS到配置信息。如下:

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (set KAFKA_OPTS=""
)

将上述到KAFKA_OPTS修改为:

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (set KAFKA_OPTS="-Djava.security.auth.login.config=D:\kafka_2.12-3.5.0\config\kafka_jaas.conf"
)

修改Kafka配置文件

配置文件在kafka_2.12-3.5.0\config\server.properties 主要增加如下配置

sasl.enabled.mechanisms = PLAIN
sasl.mechanism.inter.broker.protocol = PLAIN
security.inter.broker.protocol = SASL_PLAINTEXT
listeners = SASL_PLAINTEXT://localhost:9092

其中SASL_PLAINTEXT的意思,是明文传输的意思,如果是SSL,那么应该是SASL_SSL。

这样就算是配置好kafka broker了,接下来启动kafka,观察输出日志,没有错误一般就没问题了。

进入kafka主目录,另外打开一个cmd

#启动kafka
bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述
在这里插入图片描述

使用Kafka客户端工具Kafka Tool连接

此时就可以根据上面配置的用户admin和用户liang和相应的密码去连接了

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其他用户或错误的密码连接就会提示没有权限,用户或密码错误
在这里插入图片描述

动态认证

以上的配置方案除了没有使用SSL加密之外,还存在一个严重的缺陷:用户信息是通过静态配置文件的方式存储的,当对用户信息进行添加、删除和修改的时候都需要重启Kafka集群,而我们知道,作为消息中间件,Kafka的上下游与众多组件相连,重启可能造成数据丢失或重复,Kafka应当尽量避免重启。

如果要动态增加一个用户,得修改kafka_jaas.conf的配置,新增加一个用户,而且还得重启Kafka,这样显然不合适。

解决方案

还好,Kafka允许用户为SASL/PLAIN认证机制提供了自定义的回调函数,如果不希望采用静态配置文件存储用户认证信息的话,只需要编写一个实现了 AuthenticateCallbackHandler 接口的类,然后在配置文件中指明这个类即可,指明的方法为在Kafka配置文件中添加如下内容

listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.liang.kafka.auth.handler.MyPlainServerCallbackHandler

引入相关的maven依赖包,pom添加如下依赖包

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>2.8.1</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-cache</artifactId><version>5.7.21</version></dependency>

动态认证的完整代码如下

package com.liang.kafka.auth.handler;import com.alibaba.druid.pool.DruidDataSource;
import com.liang.kafka.auth.util.DataSourceUtil;
import com.liang.kafka.auth.util.PasswordUtil;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;import static com.liang.kafka.auth.constants.Constants.*;/***  kafka自定义认证 sasl/plain二次开发*  liang*/
public class MyPlainServerCallbackHandler implements AuthenticateCallbackHandler  {private static final Logger logger = LoggerFactory.getLogger(MyPlainServerCallbackHandler.class);/*** 数据源*/private DruidDataSource dataSource = null;/*** 是否开启数据库验证开关*/private boolean enableDbAuth;private static final String JAAS_USER_PREFIX = "user_";private List<AppConfigurationEntry> jaasConfigEntries;@Overridepublic void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {//jaas配置信息,初始化一次,这就是为什么plain无法添加用户this.jaasConfigEntries = jaasConfigEntries;logger.info("==============configs:{}", JSON.toJSONString(configs));Object endbAuthObject = configs.get(ENABLE_DB_AUTH);if (Objects.isNull(endbAuthObject)) {logger.error("==============缺少开关配置 enable_db_auth!");enableDbAuth = Boolean.FALSE;return;}enableDbAuth = TRUE.equalsIgnoreCase(endbAuthObject.toString());if (!enableDbAuth) {return;}dataSource = DataSourceUtil.getInstance(configs);}//核心类,获取用户密码后,调用authenticate方法@Overridepublic void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {String username = null;for (Callback callback: callbacks) {if (callback instanceof NameCallback)username = ((NameCallback) callback).getDefaultName();else if (callback instanceof PlainAuthenticateCallback) {PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;boolean authenticated = authenticate(username, plainCallback.password());plainCallback.authenticated(authenticated);logger.info("===============认证 username:{},result:{}", username, authenticated);} elsethrow new UnsupportedCallbackException(callback);}}//用户密码是通过获取jaas文件的属性,属性名就是JAAS_USER_PREFIX变量当前缀+usernameprotected boolean authenticate(String username, char[] password) throws IOException {if (username == null || password == null) {logger.error("===========用户名或密码为空!");return false;} else {//先读取配置文件里的用户验证String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries,JAAS_USER_PREFIX + username,PlainLoginModule.class.getName());logger.info("===============读取密码 username:{},pwd:{}", username, expectedPassword);boolean jaasUserBool = expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray());if (jaasUserBool) {return true;}//是否开启数据库验证if (enableDbAuth) {return dbAuthenticate(username, password);}return false;}}protected boolean dbAuthenticate(String usernameInfo, char[] passwordCharArray) throws IOException {String password = new String(passwordCharArray);logger.info("=====================begin dbAuthenticate usernameInfo:{},password:{}", usernameInfo, password);String username = usernameInfo;String userQuery = "select\n" +" u.username, u.password\n" +" from u_user u \n" +" where u.state='1' and u.username=?";Connection conn = null;try {conn = dataSource.getConnection();PreparedStatement statement = conn.prepareStatement(userQuery);statement.setString(1, username);ResultSet resultSet = statement.executeQuery();if (resultSet.next()) {String dbPassword = resultSet.getString("password");Boolean bl = PasswordUtil.matches(password, dbPassword);if (Boolean.TRUE.equals(bl)) {logger.info("=====================密码验证成功username:{}", username);} else {logger.error("=====================密码验证失败username:{}", usernameInfo);}return bl;} else {logger.error("=====================认证失败,username:{} 没有找到", usernameInfo);return false;}} catch (Exception e) {logger.error("=====================数据库查询用户异常{}", e);throw new RuntimeException(e);} finally {if (conn != null) {try {conn.close();} catch (SQLException e) {throw new RuntimeException(e);}}}}@Overridepublic void close() throws KafkaException {if (dataSource != null) {dataSource.close();}}}

获取数据源代码

package com.liang.kafka.auth.util;import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.Properties;/*** @author liang* @desc 获取数据源*/
public class DataSourceUtil {private static final Logger LOG = LoggerFactory.getLogger(DataSourceUtil.class);/*** 保证 instance 在所有线程中同步*/private static volatile DruidDataSource dataSource = null;public static synchronized DruidDataSource getInstance(Map<String, ?> configs) {if (dataSource == null || dataSource.isClosed()) {dataSource = initDataSource(configs);}return dataSource;}private static final DruidDataSource initDataSource(final Map<String, ?> configs) {Properties properties = new Properties();for (Map.Entry<String, ?> entry : configs.entrySet()) {if (entry.getKey().startsWith("druid.")) {String key = entry.getKey();String value = (String) entry.getValue();LOG.info("datasource connection config: {}:{}", key, value);properties.setProperty(key, value);}}dataSource = new DruidDataSource();dataSource.configFromPropety(properties);return dataSource;}}

Kafka配置文件中添加数据源的相关配置

enable_db_auth = true
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.liang.kafka.auth.handler.MyPlainServerCallbackHandler
druid.name = mysql_db
druid.type = com.alibaba.druid.pool.DruidDataSource
druid.url = jdbc:mysql://127.0.0.1:3306/test?useSSL=FALSE&useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
druid.username = root
druid.password = root
druid.filters = stat
druid.driverClassName = com.mysql.cj.jdbc.Driver
druid.initialSize = 5
druid.minIdle = 2
druid.maxActive = 50
druid.maxWait = 60000
druid.timeBetweenEvictionRunsMillis = 60000
druid.minEvictableIdleTimeMillis = 300000
druid.validationQuery = SELECT 'x'
druid.testWhileIdle = true
druid.testOnBorrow = false
druid.poolPreparedStatements = false
druid.maxPoolPreparedStatementPerConnectionSize = 20

其中:enable_db_auth来控制是否开启动态认证。

编译打成jar包后,需要放到kafka_2.12-3.5.0\libs目录,还使用了相关的依赖包也要放入
在这里插入图片描述

重启Kafka后生效,Kafka的连接认证就会从数据库去查询,想增加,修改,删除用户,直接在数据库表里操作。

参考链接:
https://www.top8488.top/kafka/458.html/
https://zhuanlan.zhihu.com/p/301343840?utm_medium=social&utm_oi=886243404000944128&utm_id=0
https://www.jianshu.com/p/e4c50e4affb8

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1540994.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【全网首发】2024华为杯数学建模ABCDEF选题方向+完整思路代码+数据集处理+可视化结果

2024华为杯研究生数学建模比赛ABCDEF选题分析 建议选哪道题&#xff1f; 点击链接加入群聊【2024华为杯数学建模助攻资料】&#xff1a;http://qm.qq.com/cgi-bin/qm/qr?_wv1027&kxtS4vwn3gcv8oCYYyrqd0BvFc7tNfhV7&authKeyedQFZne%2BzvEfLEVg2v8FOm%2BWNg1V%2Fiv3H…

Apifox Mock使用教程

Apifox是一个功能强大的可视化接口文档管理工具&#xff0c;使用Apifox可以让接口Mock变得更简单容易。Apifox具有出色的Mock功能&#xff0c;不仅兼容Mock.js语法&#xff0c;同时提供Nunjucks 和自定义脚本支持&#xff0c;能够满足不同场景需求&#xff0c;为前端接口调试提…

Element 表格相关操作

数据和页面展示分离操作 <script setup> // 从Element Plus中导入需要的图标组件 import {Check,Delete,Edit,Message,Search,Star, } from element-plus/icons-vue // 导入Vue的ref和onMounted函数 import {ref,onMounted} from vue;// 使用ref创建一个响应式的use…

vue-使用refs取值,打印出来是个数组??

背景&#xff1a; 经常使用$refs去获取组件实例&#xff0c;一般都是拿到实例对象&#xff0c;这次去取值的时候发现&#xff0c;拿到的竟然是个数组。 原因&#xff1a; 这是vue的特性,自动把v-for里面的ref展开成数组的形式&#xff0c;哪怕你的ref名字是唯一的&#xff01…

Java集合(List篇)

List a.使用List i.最基础的一种集合&#xff0c;是一种有序列表&#xff0c;内部按照放入元素的先后顺序存放&#xff0c;每个元素都可以通过索引确定自己的位置。 ii.数组的删除和新增 iii.ArrayList集合的新增和删除。 iv.LinkedList&#xff08;链表式集合&#x…

Ceph 基本架构(一)

Ceph架构图 Ceph整体组成 Ceph 是一个开源的分布式存储系统&#xff0c;设计用于提供优秀的性能、可靠性和可扩展性。Ceph 的架构主要由几个核心组件构成&#xff0c;每个组件都有特定的功能&#xff0c;共同协作以实现高可用性和数据的一致性。 以下是 Ceph 的整体架构及其…

Tomcat CVE-2017-12615 靶场攻略

漏洞描述 当 Tomcat运⾏在Windows操作系统时&#xff0c;且启⽤了HTTP PUT请求⽅法&#xff08;例如&#xff0c;将 readonly初始化参数由默认值设置为false&#xff09;&#xff0c;攻击者将有可能可通过精⼼构造的攻击请求数据包向服务器上传包含任意代 的 JSP ⽂件&#xf…

队列基础概念

文章目录 &#x1f34a;自我介绍&#x1f34a;现实生活中的例子&#x1f34a;队列的介绍&#x1f34a;循环队列&#x1f34a;小结 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以&#xff1a;点赞关注评论收藏&#xff08;一键四连&#xff09;哦~ &#x1f34a;自我介…

LeetCode[中等] 54.螺旋矩阵

给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 思路&#xff1a;定义方向数组&#xff0c;按照顺时针顺序&#xff1a;右(0,1)&#xff0c;下(1,0)&#xff0c;左(0,-1)&#xff0c;上(0,-1) 从矩阵的左上角开始遍历…

了解深度学习,张量,线性代数,激活函数的概念

在人工智能领域&#xff0c;尤其是深度学习中&#xff0c;张量和线性代数是不可或缺的数学工具。这些数学知识的应用主要体现在以下几个方面&#xff1a; 数据表示与运算&#xff1a;张量是多维数组&#xff0c;用于表示和存储数据。在深度学习中&#xff0c;大部分的数据和权重…

常见项目场景题1(数据量很大时如何去重,实现超时处理)

数据很多&#xff0c;限制内存&#xff0c;如何去重 对于大数据量去重的场景&#xff0c;我们可以考虑使用位图(Bitmap) Bitmap 是使用二进制来表示某个元素是否存在的数组。用0和1来表示存在与不存在 使用Bitmap的话&#xff0c;一个数字占用1bit&#xff0c;大大减少内存消耗…

Unity自我实现响应式属性

其实只是写着玩,响应式编程建议使用UniRx插件(一套成熟的响应式编程解决方案),我写的主要是借鉴一下这个思想,实现的也不够优雅,不过逻辑也算严密可以正常使用.你可以查看我写的理解响应式属性的思想. 借鉴UniRx的ReactiveProperty类,且UniRx不仅有响应式属性. using System; …

CertiK协助修复Solana大整数模幂运算中的DOS漏洞

导读&#xff1a; 本文深入探讨了区块链交易费⽤模型的重要性及其在确保网络安全和有效运行中的关键作用。通过对以太坊和Solana区块链网络的交易费⽤模型进行比较分析&#xff0c;揭示了不安全的交易计费可能引发的网络安全风险。特别关注了CertiK团队发现并协助修复的Solana…

【学术会议征稿】第四届计算机、信息工程与电子材料国际学术会议 (CTIEEM 2024)

第四届计算机、信息工程与电子材料国际学术会议 (CTIEEM 2024) The 4th International Conference on Computer Technology, Information Engineering and Electron Materials 随着信息技术的迅猛发展&#xff0c;计算机技术、信息工程以及电子材料领域的研究与创新成为推动现…

光伏设计软件的基本功能

一、屋顶绘制 光伏设计软件的首要功能是屋顶绘制。通过直观易用的界面&#xff0c;可以轻松绘制出建筑物的屋顶轮廓、结构细节等基本信息。软件支持多种屋顶类型的绘制&#xff0c;并允许用户自定义屋顶尺寸和形状。 二、参照物、障碍物放置 在光伏系统设计中&#xff0c;参照…

linux如何对c++进行内存分析

linux如何对c进行内存分析 背景分析方法以及原理原理分析结果以及重点关注 背景 在工作中&#xff0c;我遇到一个问题&#xff0c;需要将c写的进程部署到MCU上。由于MCU上可用的RAM 非常有限&#xff0c;所以在部署时就需要考虑到使用内存大小。所以为了搞清楚&#xff0c;内存…

go注册中心Eureka,注册到线上和线下,都可以访问

go注册中心Eureka&#xff0c;注册到线上和线下&#xff0c;都可以访问 本地通过127访问&#xff0c; 线上通过内网ip访问 package mainimport ("github.com/SimonWang00/goeureka""github.com/gin-gonic/gin""wbGo/controller""wbGo/task…

论文阅读 - MDFEND: Multi-domain Fake News Detection

https://arxiv.org/pdf/2201.00987 目录 ABSTRACT INTRODUCTION 2 RELATED WORK 3 WEIBO21: A NEW DATASET FOR MFND 3.1 Data Collection 3.2 Domain Annotation 4 MDFEND: MULTI-DOMAIN FAKE NEWS DETECTION MODEL 4.1 Representation Extraction 4.2 Domain Gate 4.…

机房动力环境监控系统组成

机房动力环境监控系统已经广泛应用于各种类型的机房,尤其稍微重要的机房,都需要做环境监控系统,因此我们要熟知这个系统,如果你还不懂的话,可以看看这篇文章。 一、动环系统简介 计算机系统数量与日俱增,其配套的环境设备也日益增多,计算机房已成为各大单位的重要组成…

线性规划中可行域为什么一定是凸的--证明

线性规划中的凸性证明 线性规划中可行域是凸的&#xff0c;这是自然能够想到和容易理解的道理。直观上&#xff0c;线性约束定义的可行域是由半平面的交集构成的&#xff0c;这些半平面的交集总是形成凸区域。 这么一个自然想到、容易理解的道理&#xff0c;怎么从数学上完备…