Spring Boot集成Akka Cluster快速入门Demo

1.什么是Akka Cluster?

Akka Cluster将多个JVM连接整合在一起,实现消息地址的透明化和统一化使用管理,集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序,部署到很多JVM上去实现程序的分布式并行运算(单机也可以起很多节点构成集群)。更重要的是, Akka Cluster集群构建与Actor编程没有直接的联系,集群构建是在ActorSystem层面上,实现了Actor消息地址的透明化,无需考虑目标运行环节是否分布式,可以按照正常的Actor编程模式进行开发。 我们知道,分布式集群是由若干节点组成的,那么节点的发现及状态管理是分布式系统一个比较重要的任务。Akka Cluster中将节点的生命周期划分为:

member-states

 

  • joining - 当尝试加入集群时的初始状态
  • up - 加入集群后的正常状态
  • leaving / exiting - 节点退出集群时的中间状态
  • down - 集群无法感知某节点后,将其标记为down
  • removed - 从集群中被删除,以后也无法再加入集群

其实当参数akka.cluster.allow-weakly-up-members启用时(默认是启用的),还有个weakly up,它是用于集群出现分裂时,集群无法收敛,则leader无法将状态置为up的临时状态。这个后面再解释。 图中还有两个特殊的名词:

  • fd* - 这个表示akka的错误检测机制Faiulre Detector被触发后,将节点标记为unreachable
  • unreachable* - unreachable不是一个真正的节点状态,更多的像是一个flag,用来描述集群无法与该节点进行通讯。当错误检测机制侦测到这个节点又能正常通讯时,会移除这个flag。

市面上大多数产品的分布式管理一般用的是注册中心机制,例如zk、consul或etcd。其实是节点把自己的信息注册到所使用的注册中心里,而master通过接受注册中心的通知得知新节点信息。显然本质上是一种master/slave的架构。这种架构有两个问题:

  1. master节点一般是单一的,一旦挂了影响就比较大(所以很多master都采用了HA机制),也就是所谓的系统单点故障;
  2. 通常节点的地址发现是要走master去获取的,当系统并发大时,master节点就可能成为性能瓶颈,即单点性能瓶颈。

Akka可能就是考虑这两点,采用了P2P的模式,这样任何一个节点都可以作为”master”,任何的节点都可以用来寻找其他节点地址。那它是怎么做到的呢?答案是Gossip协议和CRDT。这里不做过多解释,感兴趣的话可以自己去翻阅相关介绍

2.代码工程

实验目的

搭建一个简单akka custer集群

pom.xml

<!-- Akka Cluster dependency -->
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster-typed_2.13</artifactId><version>2.6.0</version>
</dependency>

cluster

node1.conf

akka {actor {provider = "cluster"  }remote {artery {canonical.hostname = "127.0.0.1"canonical.port = 2551 }}cluster {seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551","akka://ClusterSystem@127.0.0.1:2552"]}
}

node2.conf

akka {actor {provider = "cluster"}remote {artery {canonical.hostname = "127.0.0.1"canonical.port = 2552  }}cluster {seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551","akka://ClusterSystem@127.0.0.1:2552"]}
}

集群监听器

package com.et.akka.cluster;import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
import akka.cluster.ClusterEvent;public class ClusterListener extends AbstractBehavior<ClusterEvent.ClusterDomainEvent> {public ClusterListener(ActorContext<ClusterEvent.ClusterDomainEvent> context) {super(context);Cluster cluster = Cluster.get(context.getSystem());cluster.subscriptions().tell(Subscribe.create(getContext().getSelf(), ClusterEvent.ClusterDomainEvent.class));}@Overridepublic Receive<ClusterEvent.ClusterDomainEvent> createReceive() {return newReceiveBuilder().onMessage(ClusterEvent.MemberUp.class, this::onMemberUp).onMessage(ClusterEvent.MemberRemoved.class, this::onMemberRemoved).onAnyMessage(event -> {System.out.println("Received cluster event: " + event);return this;}).build();}private Behavior<ClusterEvent.ClusterDomainEvent> onMemberUp(ClusterEvent.MemberUp memberUp) {System.out.println("Member is Up: " + memberUp.member());return this;}private Behavior<ClusterEvent.ClusterDomainEvent> onMemberRemoved(ClusterEvent.MemberRemoved memberRemoved) {System.out.println("Member is Removed: " + memberRemoved.member());return this;}public static Behavior<ClusterEvent.ClusterDomainEvent> create() {return Behaviors.setup(ClusterListener::new);}
}

启动集群

package com.et.akka.cluster;import akka.actor.typed.ActorSystem;
import akka.cluster.ClusterEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;import java.io.File;public class ClusterApp {public static void main(String[] args) {Config configNode1 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node1.conf")).withFallback(ConfigFactory.load());ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode1 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode1);System.out.println("Node 1 started with config from node1.conf");Config configNode2 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node2.conf")).withFallback(ConfigFactory.load());ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode2 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode2);System.out.println("Node 2 started with config from node2.conf");}
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)

3.测试

启动集群(执行ClusterApp里面的main方法),查看日志可以看到2个节点都起来了

23:00:19.201 [ClusterSystem-akka.actor.default-dispatcher-6] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Welcome from [akka://ClusterSystem@127.0.0.1:2551]
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2551, status = Up)
Received cluster event: MemberJoined(Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Joining))
Received cluster event: LeaderChanged(Some(akka://ClusterSystem@127.0.0.1:2551))
Received cluster event: RoleLeaderChanged(dc-default,Some(akka://ClusterSystem@127.0.0.1:2551))
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
23:00:19.645 [ClusterSystem-akka.actor.default-dispatcher-5] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2552] to [Up]
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Up)
Received cluster event: SeenChanged(false,Set(akka://ClusterSystem@127.0.0.1:2551))
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Up)
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()

4.引用

  • Cluster Specification • Akka Documentation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/141475.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

编译原理之预处理

目录 生成预处理文件的的命令 预处理做了什么 实验 --------------------------------------------------------------------------------------------------------------------------------- 本篇文章主要是带着大家一起看看预处理阶段编译器都做了些什么 --------------…

十四,在Spring Boot当中对应“ Tomcat 服务器的相关配置”和“服务器的切换”的详细说明

十四&#xff0c;在Spring Boot当中对应“ Tomcat 服务器的相关配置”和“服务器的切换”的详细说明 文章目录 十四&#xff0c;在Spring Boot当中对应“ Tomcat 服务器的相关配置”和“服务器的切换”的详细说明1. 基本介绍2. 准备工作&#xff1a;3. 内置 Tomcat 的配置3.1 第…

Git项目管理工具

分布式版本控制系统

62. 不同路径、64. 最小路径和

思路 dp&#xff1a;代表到达当前位置的总方式 初始化&#xff1a;第一行的位置dp[0][j]&#xff1a;当前位置只能由左边的位置向右移动得到 所以只有1种方式 d[0][j]1, d[0][0]1 第一列的位置 dp[i][0]&#xff1a;当前位置只能由上一个位置向下移动得到 除此之外的位置可以由…

【Python】基本使用

目录 变量的类型 整数 int 浮点数 float 字符串 str 字符串长度 格式化字符串 布尔类型 动态类型 注释 获取输入 浮点数比较 多元赋值 for循环 函数的定义和调用 创建函数/定义函数 调用函数/使用函数 列表 创建列表 切片操作 遍历列表 新增元素 判断元…

2024最全网络安全工程师面试题(附答案),金九银十找工作必看!

《网安面试指南》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484339&idx1&sn356300f169de74e7a778b04bfbbbd0ab&chksmc0e47aeff793f3f9a5f7abcfa57695e8944e52bca2de2c7a3eb1aecb3c1e6b9cb6abe509d51f&scene21#wechat_redirect 《Java代码审…

[matlab]matlab调用python的各种方法

前言 在MATLAB中&#xff0c;可以使用 py 函数来调用Python模块和函数。在此基础上&#xff0c;我们可以很轻易的调用python中的各种模块&#xff0c;方便我们在神经网络上的应用仿真。 以下是使用MATLAB调用Python模块的基本步骤&#xff1a; 确保你的系统已经正确安装了Py…

文心一言 VS 讯飞星火 VS chatgpt (347)-- 算法导论23.2 6题

六、假定一个图中所有的边权重均匀分布在半开区间[0,1)内。Prim算法和Kruskal算法哪一个可以运行得更快&#xff1f;如果要写代码&#xff0c;请用go语言。 文心一言&#xff1a; 在比较Prim算法和Kruskal算法在特定条件下的性能时&#xff0c;我们需要考虑几个因素&#xff…

复杂情感识别系统

复杂情感识别系统&#xff08;CERS&#xff09;是一种先进的技术平台&#xff0c;旨在通过分析情感的组合、相互关系及其动态变化来解读和识别复杂的情感状态。这种系统通常采用以下技术和方法&#xff1a; 机器学习与深度学习&#xff1a; 通过训练算法识别和解释大量情感数据…

[Linux]:进程间通信(上)

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;Linux学习 贝蒂的主页&#xff1a;Betty’s blog 1. 进程间通信介绍 1.1 进程间通信的概念 进程间通信简称IPC&#xff08;In…

jdk相关介绍

JDK&#xff0c;全称Java Development Kit&#xff0c;是Java语言开发的基础工具包。它包含了Java运行时环境&#xff08;JRE&#xff09;以及用于开发Java应用程序的各种工具和库。JDK为Java程序员提供了编译、调试和运行Java应用程序所需的全部环境。 JDK的主要组成部分包括&…

离线数仓DWD层

离线数仓DWD层 DWD层设计要点&#xff1a;9.1 交易域加购事务事实表9.2 交易域下单事务事实表9.3 交易域取消订单事务事实表9.4 交易域支付成功事务事实表9.5 交易域退单事务事实表9.6 交易域退款成功事务事实表9.7 交易域购物车周期快照事实表9.8 工具域优惠券领取事务事实表9…

2024/9/15 408“回头看”之应用层小总结(下)

域名系统DNS: 本地域名服务器 本地域名服务器起着代理的作用&#xff0c;会将报文转发到根域名服务器、顶级域名服务器、权限域名服务器。 递归查询&#xff1a; 迭代查询&#xff1a; 文件传送协议FTP: FTP客户和FTP服务器之间使用的是tcp连接。 控制连接使用21端口&…

树莓派5上手

1 安装系统 Raspberry Pi OS 是基于 Debian 的免费操作系统&#xff0c;针对 Raspberry Pi 硬件进行了优化。Raspberry Pi OS 支持超过 35,000 个 Debian 软件包。树莓派 5 可以安装各种系统&#xff0c;但是如果对于系统没有特殊的要求&#xff0c;还是安装 Raspberry Pi OS …

基于SSM的二手车管理系统的设计与实现 (含源码+sql+视频导入教程)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 基于SSM的二手车管理系统4拥有三种角色 管理员&#xff1a;订单管理、在售车辆管理、下架车辆管理、品牌管理、分类管理、推荐管理、统计等 商家&#xff1a;登录注册、添加/下架/删除车辆…

各类元器件调试记录-E+H

一、EH压力传感器 适用型号为&#xff1a; Cerabar S PMC71, PMP71/75 Deltabar S FMD76/77/78, PMD70/75 Deltapilot S FMB70 调试过程&#xff1a;(后续补上图片) 一、湿标(湿调) 1、前提条件&#xff1a;罐体可以灌满和实际水箱水位高度 2、调试步骤&#xff1a; A、调节语…

网络安全有救了,37所高校新增网络安全空间安全专业

《网安面试指南》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484339&idx1&sn356300f169de74e7a778b04bfbbbd0ab&chksmc0e47aeff793f3f9a5f7abcfa57695e8944e52bca2de2c7a3eb1aecb3c1e6b9cb6abe509d51f&scene21#wechat_redirect 《Java代码审…

凸优化学习(3)——对偶方法、KKT条件、ADMM

&#x1f345; 写在前面 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;这里是hyk写算法了吗&#xff0c;一枚致力于学习算法和人工智能领域的小菜鸟。 &#x1f50e;个人主页&#xff1a;主页链接&#xff08;欢迎各位大佬光临指导&#xff09; ⭐️近…

鸿蒙交互事件开发07——手势竞争问题

如果你也对鸿蒙开发感兴趣&#xff0c;加入“Harmony自习室”吧&#xff01;扫描下方名片&#xff0c;关注公众号&#xff0c;公众号更新更快&#xff0c;同时也有更多学习资料和技术讨论群。 1、背景 在文章鸿蒙交互事件开发05——常用的6种手势类型中&#xff0c;有朋友留言…

C语言自定义类型-联合与枚举

在之前的文章中&#xff0c;我们学到了结构体类型&#xff0c;而结构体其实归属于一个大类——自定义类型。那么今天我们就继续讲解关于自定义类型的知识~ 一、类型命名关键字-typedef typedef的作用其实就是标题的意思——为一种类型赋予新的名字。 ① typedef在变量中的应…