电子商务网站建设服务外包,梵克雅宝手链,国家重点学科建设网站,seo软文外包公司1.什么是Akka Cluster#xff1f;
Akka Cluster将多个JVM连接整合在一起#xff0c;实现消息地址的透明化和统一化使用管理#xff0c;集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序#xff0c;部署到很多JVM上去实现程序的分布式并行运算#xf…1.什么是Akka Cluster
Akka Cluster将多个JVM连接整合在一起实现消息地址的透明化和统一化使用管理集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序部署到很多JVM上去实现程序的分布式并行运算单机也可以起很多节点构成集群。更重要的是, Akka Cluster集群构建与Actor编程没有直接的联系集群构建是在ActorSystem层面上实现了Actor消息地址的透明化无需考虑目标运行环节是否分布式可以按照正常的Actor编程模式进行开发。 我们知道分布式集群是由若干节点组成的那么节点的发现及状态管理是分布式系统一个比较重要的任务。Akka Cluster中将节点的生命周期划分为 joining - 当尝试加入集群时的初始状态up - 加入集群后的正常状态leaving / exiting - 节点退出集群时的中间状态down - 集群无法感知某节点后将其标记为downremoved - 从集群中被删除以后也无法再加入集群
其实当参数akka.cluster.allow-weakly-up-members启用时(默认是启用的)还有个weakly up它是用于集群出现分裂时集群无法收敛则leader无法将状态置为up的临时状态。这个后面再解释。 图中还有两个特殊的名词
fd* - 这个表示akka的错误检测机制Faiulre Detector被触发后将节点标记为unreachableunreachable* - unreachable不是一个真正的节点状态更多的像是一个flag用来描述集群无法与该节点进行通讯。当错误检测机制侦测到这个节点又能正常通讯时会移除这个flag。
市面上大多数产品的分布式管理一般用的是注册中心机制例如zk、consul或etcd。其实是节点把自己的信息注册到所使用的注册中心里而master通过接受注册中心的通知得知新节点信息。显然本质上是一种master/slave的架构。这种架构有两个问题
master节点一般是单一的一旦挂了影响就比较大所以很多master都采用了HA机制也就是所谓的系统单点故障通常节点的地址发现是要走master去获取的当系统并发大时master节点就可能成为性能瓶颈即单点性能瓶颈。
Akka可能就是考虑这两点采用了P2P的模式这样任何一个节点都可以作为”master”任何的节点都可以用来寻找其他节点地址。那它是怎么做到的呢答案是Gossip协议和CRDT。这里不做过多解释感兴趣的话可以自己去翻阅相关介绍
2.代码工程
实验目的
搭建一个简单akka custer集群 pom.xml
!-- Akka Cluster dependency --
dependencygroupIdcom.typesafe.akka/groupIdartifactIdakka-cluster-typed_2.13/artifactIdversion2.6.0/version
/dependency
cluster
node1.conf
akka {actor {provider cluster }remote {artery {canonical.hostname 127.0.0.1canonical.port 2551 }}cluster {seed-nodes [akka://ClusterSystem127.0.0.1:2551,akka://ClusterSystem127.0.0.1:2552]}
}
node2.conf
akka {actor {provider cluster}remote {artery {canonical.hostname 127.0.0.1canonical.port 2552 }}cluster {seed-nodes [akka://ClusterSystem127.0.0.1:2551,akka://ClusterSystem127.0.0.1:2552]}
}
集群监听器
package com.et.akka.cluster;import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
import akka.cluster.ClusterEvent;public class ClusterListener extends AbstractBehaviorClusterEvent.ClusterDomainEvent {public ClusterListener(ActorContextClusterEvent.ClusterDomainEvent context) {super(context);Cluster cluster Cluster.get(context.getSystem());cluster.subscriptions().tell(Subscribe.create(getContext().getSelf(), ClusterEvent.ClusterDomainEvent.class));}Overridepublic ReceiveClusterEvent.ClusterDomainEvent createReceive() {return newReceiveBuilder().onMessage(ClusterEvent.MemberUp.class, this::onMemberUp).onMessage(ClusterEvent.MemberRemoved.class, this::onMemberRemoved).onAnyMessage(event - {System.out.println(Received cluster event: event);return this;}).build();}private BehaviorClusterEvent.ClusterDomainEvent onMemberUp(ClusterEvent.MemberUp memberUp) {System.out.println(Member is Up: memberUp.member());return this;}private BehaviorClusterEvent.ClusterDomainEvent onMemberRemoved(ClusterEvent.MemberRemoved memberRemoved) {System.out.println(Member is Removed: memberRemoved.member());return this;}public static BehaviorClusterEvent.ClusterDomainEvent create() {return Behaviors.setup(ClusterListener::new);}
}
启动集群
package com.et.akka.cluster;import akka.actor.typed.ActorSystem;
import akka.cluster.ClusterEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;import java.io.File;public class ClusterApp {public static void main(String[] args) {Config configNode1 ConfigFactory.parseFile(new File(D:/IdeaProjects/ETFramework/akka/src/main/resources/node1.conf)).withFallback(ConfigFactory.load());ActorSystemClusterEvent.ClusterDomainEvent systemNode1 ActorSystem.create(ClusterListener.create(), ClusterSystem, configNode1);System.out.println(Node 1 started with config from node1.conf);Config configNode2 ConfigFactory.parseFile(new File(D:/IdeaProjects/ETFramework/akka/src/main/resources/node2.conf)).withFallback(ConfigFactory.load());ActorSystemClusterEvent.ClusterDomainEvent systemNode2 ActorSystem.create(ClusterListener.create(), ClusterSystem, configNode2);System.out.println(Node 2 started with config from node2.conf);}
}
以上只是一些关键代码所有代码请参见下面代码仓库
代码仓库
GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)
3.测试
启动集群执行ClusterApp里面的main方法查看日志可以看到2个节点都起来了
23:00:19.201 [ClusterSystem-akka.actor.default-dispatcher-6] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem127.0.0.1:2552] - Welcome from [akka://ClusterSystem127.0.0.1:2551]
Member is Up: Member(address akka://ClusterSystem127.0.0.1:2551, status Up)
Received cluster event: MemberJoined(Member(address akka://ClusterSystem127.0.0.1:2552, status Joining))
Received cluster event: LeaderChanged(Some(akka://ClusterSystem127.0.0.1:2551))
Received cluster event: RoleLeaderChanged(dc-default,Some(akka://ClusterSystem127.0.0.1:2551))
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem127.0.0.1:2551, akka://ClusterSystem127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem127.0.0.1:2551, akka://ClusterSystem127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
23:00:19.645 [ClusterSystem-akka.actor.default-dispatcher-5] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem127.0.0.1:2552] to [Up]
Member is Up: Member(address akka://ClusterSystem127.0.0.1:2552, status Up)
Received cluster event: SeenChanged(false,Set(akka://ClusterSystem127.0.0.1:2551))
Member is Up: Member(address akka://ClusterSystem127.0.0.1:2552, status Up)
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem127.0.0.1:2551, akka://ClusterSystem127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
4.引用
Cluster Specification • Akka Documentation