当前位置: 首页 > news >正文

昭通网站建设公司海南网络广播电视台官网

昭通网站建设公司,海南网络广播电视台官网,单页面的网站模板,如何做好seo一、Raft前置简介 Raft目前是最著名的分布式共识性算法#xff0c;被广泛的应用在各种分布式框架、组件中#xff0c;如Redis、RocketMq、Kafka、Nacos#xff08;CP#xff09;等 根据Raft论文#xff0c;可将Raft拆分为如下4个功能模块#xff1a; 领导者选举日志同…一、Raft前置简介 Raft目前是最著名的分布式共识性算法被广泛的应用在各种分布式框架、组件中如Redis、RocketMq、Kafka、NacosCP等 根据Raft论文可将Raft拆分为如下4个功能模块 领导者选举日志同步、心跳持久化日志压缩快照本文未实现 这4个模块彼此并不完全独立如日志的同步情况左右着领导者选举快照也影响着日志同步等等为了前后的递进性对于一些功能的实现可能会出现改动和优化比如日志同步实现后在数据持久化部分又会对同步做一些优化提高主、从节点日志冲突解决的性能。 这里就不再过多的介绍了看本文之前请先简单了解一下Raft算法提供如下资料 Raft算法在线动画演示图解分布式共识算法Paxos协议浅谈分布式一致性Raft 与 SOFAJRaft深入剖析共识性算法 Raft深入解读Raft算法与etcd工程实现Raft一致性算法论文-译文SOFA-JRaft蚂蚁金服的Raft算法实现库JAVA版 本文实现不完全和Raft论文一致做了不少改动核心思想不变请悉知 二、功能流程简介 你看完上述资料应该对Raft有一个基本了解了本文我们实现了一个Raft算法下的简易版的KV存储我将它拆分成一下几个角色 RPC模块复制各节点间的信息传递如心跳、日志、选举等等 节点模块节点有三种状态leader、follow、candidate每种状态下所要做的事是不一样的 状态机负责节点状态的变更日志持久化一致性处理投票一致性处理 定时任务leader需要定时发送心跳follow需要定时检测leader是否存活等 日志模块日志需要持久化在本地文件还需要给其他节点同步 以上几个角色相互配合实现以下几个主要功能流程 1.选举流程 实现细节下面深究这里暂不过多介绍简单了解一下大致流程大体就是 Follow节点发现Leader节点挂了则升级为Candidate节点发起投票其他Follow节点收到投票请求后根据条件判断是否投票给它True或者FalseCandidate一旦收到的投票通过请求过半则升级为Leader升级Leader后发送心跳阻止其他Follow变成Candidate 2.心跳流程 注意这里和原文有区别我将心跳和日志做了拆分不再耦合了因为我觉得在没有客户端请求的情况下记录这些心跳日志没有意义在没有数据日志或者说数据日志水平都是一样的情况下谁做Leader我觉得都OK 实现细节下面深究这里暂不过多介绍简单了解一下大致流程大体就是 Leader会定时发送心跳请求给Follow告诉它我还活着防止它篡位Follow收到心跳后返回一个心跳响应Leader收到的心跳响应没有过半则自动降级成为Follow停止对外服务 为什么要心跳响应还要自动降级后面咱们细说 3.KV客户端请求流程 因为我们要做的是一个简易版KV嘛那肯定有客户端发送命令嘛对不对 客户端发送SET或者GET命令集群返回成功或者数据发送SET命令只有Leader会处理同步给其他Follow然后根据结果返回成功还是失败发送GET命令目前也只有Leader会处理返回对应数据没有就nullGET没有日志产生 节点间日志的同步持久化后面细说这里也看的出来为什么分布式体系下CAP不能共存了你想要高可用性能好就必须在请求leader刷盘成功后返回甚至异步刷盘这就必然导致可能存在数据丢失或者主从数据不一致的情况如果你想要一致性就必须在节点日志都同步完成后才返回(下面我们将日志同步流程 4.日志同步流程 上面说过了我们将心跳和日志做了拆分只有客户端请求SET命令才会产生日志 Leader收到客户端请求后先预提交到内存中后发送预提交命令给所有FollowFollow收到Leader的预提交命令同样先提交到内存然后响应LeaderLeader一旦收到超过半数的Follow响应则执行刷盘持久化否则给客户端响应失败Leader刷盘成功后给所有Follow发送刷盘请求然后给客户端响应成功无需关心Follow刷盘结果 这就是很典型的CP流程保持了一致性和数据不丢失但大大降低了性能发现没有尽管这样做依旧可能存在Follow数据丢失的情况比如我是新加入的Follow节点、Follow节点刷盘失败等等情况那该怎么办呢我们下面接着来补充 5.日志校验流程 正如上所说日志依旧存在丢失的风险我们需要做一个日志校验定时任务定时校验日志是否丢失由于这个和日志的设计息息相关所以我们后面在细说这里简单过一遍流程 follow会有一个定时任务定时Check日志文件寻找缺失的日志如果有则拿到缺失的日志发送拉取请求到Leader获取对应的日志然后填充进日志文件这样就一定保持了和Leader日志数据对齐了 难道每次都要从头到尾扫描一次文件吗当然不是扫描过的不需要扫描有checkPoint每次只是从checkPoint扫描到lastLogIndex 三、模块简介 1.RPC模块 这里我们采用Netty框架来做每个节点即是Client又是Server 按原Raft算法来说一共有以下几种RPC类型的通信 RequestVote RPC - 请求投票 RPC由 Candidate 在选举期间发起。 AppendEntries RPC - 附加条目 RPC由 Leader 发起用来复制日志和提供一种心跳机制。 但是我将它进行了一个拆分拆分的更细了 RequestVoteRPC-请求投票 RPC由 Candidate 在选举期间发起。RequestVoteResult-投票响应RPC由follow投票HeartBeatRequest-心跳RPC由leader定时不间断发起HeartBeatResult-心跳响应RPC由follow响应AppendEntriesPreCommit-日志预提交RPC由leader发起预提交AppendEntriesPreCommitResult-日志预提交响应RPC由follow响应AppendEntriesCommit-日志提交RPC预提交成功后leader会发起真正提交的命令LogIndexPull-日志拉取RPCfollow定时检测发现自身存在日志丢失向leader主动拉取日志LogIndexPullResult-日志拉取响应RPCleader发现follow存在日志缺失把日志发给followClientRequest-客户端请求RPCKV存储的客户端向集群发出的命令ClientResponse-客户端请求响应RPC对客户端的响应 分别对应着一个实体类 RPC整体的编解码设计序列化等等都和我之前写的RPC框架差不多这里就不在过多的介绍了有兴趣可以看看我的如何从0-1手写一个RPC框架 这里只介绍一下相比原来做出的调整原来RPC框架传输的实体是固定的而现在多了很多而且大量涉及到同步请求返回所以相比原来新增了泛型的处理如下示例两行代码就搞定了一次请求 RpcSessionClientResponse, ClientRequest rpcSession RpcSessionFactory.ClientResponse, ClientRequestopenSession(serverConfig, clientRequest); ClientResponse clientResponse rpcSession.ClientResponsesyncSend(4000L);同时支持同步等待、超时等待、异步三种请求方式 public interface RpcSessionR,T{R R syncSend();R R syncSend(long timeout);void asyncSend(); } 感兴趣的建议自己看看RPC所在目录和Netty所有Handler如下 2.节点模块 节点有三种类型leader、candidate、follow所以我这抽象出一个节点接口三种实现一个统一对外服务一个全局节点信息类 一个节点接口 public interface RaftNode {/** 客户端的请求,会产生日志 : 只有leader才会处理follow返回leader地址candidate拒绝 */ClientResponse clientRequestHandler(ClientRequest command,ListServerConfig serverConfigs) throws ExecutionException, InterruptedException;/** leader发来的log预处理:会先缓存 */AppendEntriesPreCommitResult logPreCommitHandler(AppendEntriesPreCommit appendEntriesPreCommit);/** leader发来的log提交请求 */void logCommitHandler(AppendEntriesCommit appendEntriesCommit);/** follow发来的log拉取请求 */LogIndexPullResult sendLogPullRequest(ListLong pullLogIndex);/** leader要处理follow的拉取请求 */LogIndexPullResult logPullRequestHandler(LogIndexPull logIndexPull);/** 发起投票 : 只有候选者 才会发起 */boolean callVoteRequest(ListServerConfig serverConfigs) throws ExecutionException, InterruptedException;/** 投票请求处理 */RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC);/** 发起心跳 : 只有领导才会发起心跳 阻止其他节点成为候选人*/boolean callHeartBeatRequest(ListServerConfigserverConfigs) throws ExecutionException, InterruptedException;/** 心跳请求处理 : 只有追随者/候选人才会处理*/HeartBeatResult heartBeatHandler(HeartBeatRequest heartBeatRequest);}三种实现 一个对外服务 public class RaftNodeService {private static final Logger log LoggerFactory.getLogger(RaftNodeService.class);// 心跳间隔时间private final static long INTERVAL_TIME 1500L;private static MapNodeStatusEnums, RaftNode raftNodeMap new ConcurrentHashMap(8);static {raftNodeMap.put(NodeStatusEnums.LEADER, new LeaderRaftNode());raftNodeMap.put(NodeStatusEnums.CANDIDATE, new CandidateRaftNode());raftNodeMap.put(NodeStatusEnums.FOLLOW, new FollowRaftNode());}/*** 节点信息初始化*/public static void raftNodeInit(ServerConfig self, ListServerConfig clusterConfig) {RaftNodeInfo.getInstance().setSelf(self);RaftNodeInfo.getInstance().setClusterConfig(clusterConfig);RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);createElectionTask();}/*** 发送心跳*/public synchronized static void sendHeartBeat() {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());boolean result false;try {result raftNode.callHeartBeatRequest(RaftNodeInfo.getInstance().getClusterConfig());} catch (ExecutionException e) {log.debug( {}: 完了作为leader发送心跳失败了:{}, RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);} catch (InterruptedException e) {log.debug( {}: 完了作为leader发送心跳失败了:{}, RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);}if (!result) {// 代表心跳失败了状态已经变更了// 需要停止心跳开启心跳检测heartBeatTestDestroy();createElectionTask();}}/*** 心跳处理*/public static void heartBeatHandler(HeartBeatRequest request, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() - {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());channel.writeAndFlush(new RpcRemoteMsgHeartBeatResult(raftNode.heartBeatHandler(request)));// 收到了心跳所以就要停止当前心跳的检测然后重新开启一个检测任务createElectionTask();});}// 命令合规性校验 目前就get set 随便校验一下public static boolean commandCheck(String command) {if (command null || !SET_GET.contains(command.split( )[0]) || command.split( ).length 2) {return false;}return true;}/*** 客户端的请求以KV为例 就是set命令 , 这里请求返回就简陋一点*/public static void clientRequestHandler(ClientRequest request, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() - {ClientResponse clientResponse ClientResponse.builder().build();clientResponse.setRequestId(request.getRequestId());if (!commandCheck(request.getCommand())) {clientResponse.setCode(401);clientResponse.setMsg(命令格式不正确);channel.writeAndFlush(new RpcRemoteMsgClientResponse(clientResponse));return;}// 只有set命令才需要发送日志get命令直接取数据就行了String[] command request.getCommand().split( );if (command[0].equals(SET)) {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());try {channel.writeAndFlush(new RpcRemoteMsgClientResponse(raftNode.clientRequestHandler(request, RaftNodeInfo.getInstance().getClusterConfig())));return;} catch (ExecutionException e) {log.debug( {}: 日志提交失败了:{}, request.getCommand(), e.getMessage(), e);clientResponse.setCode(500);clientResponse.setMsg(e.getMessage());channel.writeAndFlush(new RpcRemoteMsgClientResponse(clientResponse));} catch (InterruptedException e) {log.debug( {}: 日志提交失败了:{}, RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);clientResponse.setCode(500);clientResponse.setMsg(e.getMessage());channel.writeAndFlush(new RpcRemoteMsgClientResponse(clientResponse));}} else {// get命令直接取值clientResponse.setCode(200);clientResponse.setData(RaftNodeInfo.getInstance().getLogManage().getDataByKey(command[1]));channel.writeAndFlush(new RpcRemoteMsgClientResponse(clientResponse));}});}/*** Log预提交请求*/public static void logPreCommitHandler(AppendEntriesPreCommit request, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() - {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());channel.writeAndFlush(new RpcRemoteMsgAppendEntriesPreCommitResult(raftNode.logPreCommitHandler(request)));});}/*** Log提交请求*/public static void logCommitHandler(AppendEntriesCommit request) {ThreadPoolUtils.nettyServerAsyncPool.execute(() - {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());raftNode.logCommitHandler(request);// 收到了日志所以就要停止当前心跳的检测然后重新开启一个检测任务createElectionTask();});}/*** 发起投票*/public synchronized static void sendCallVote() {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());boolean result false;try {result raftNode.callVoteRequest(RaftNodeInfo.getInstance().getClusterConfig());} catch (ExecutionException e) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug( {}: 完了作为candidate发起投票失败了:{}, RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);} catch (InterruptedException e) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug( {}: 完了作为candidate发起投票失败了:{}, RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);}if (!result) {// 代表发起投票失败了状态已经变更了// 需要重新开启一个检测任务createElectionTask();return;}// 投票成功了 需要开启心跳任务createHearBeatTask();}/*** 发起投票请求处理*/public synchronized static void callVoteHandler(RequestVoteRPC requestVoteRPC, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() - {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());channel.writeAndFlush(new RpcRemoteMsgRequestVoteResult(raftNode.voteRequestHandler(requestVoteRPC)));});}/*** 发起Log拉取请求*/public synchronized static LogIndexPullResult sendLogPullRequest(ListLong pullLogIndex) {if (CollectionUtil.isNotEmpty(pullLogIndex)) {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());return raftNode.sendLogPullRequest(pullLogIndex);}return null;}/*** 发起Log拉取请求处理*/public synchronized static void logPullRequestHandler(LogIndexPull logIndexPull, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() - {RaftNode raftNode raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());log.debug({}:拉取日志{},channel.remoteAddress(),JSONObject.toJSON(logIndexPull));channel.writeAndFlush(new RpcRemoteMsgLogIndexPullResult(raftNode.logPullRequestHandler(logIndexPull)));});}/*** 销毁并创建心跳检测任务*/public static void createElectionTask() {long randomTime getRandomTime();final long intervalTime INTERVAL_TIME randomTime;// 先销毁之前的electionTaskDestroy();//开启新的ScheduledFuture? schedule ThreadPoolUtils.hearBeatAsyncPool.schedule(new ElectionTask(intervalTime), intervalTime, TimeUnit.MILLISECONDS);RaftNodeInfo.getInstance().setElectionTask(schedule);}/*** 销毁并创建心跳任务*/public static void createHearBeatTask() {// 先销毁之前的heartBeatTestDestroy();//开启新的ScheduledFuture? schedule ThreadPoolUtils.hearBeatAsyncPool.scheduleAtFixedRate(new HeartBeatTask(), 0L, INTERVAL_TIME, TimeUnit.MILLISECONDS);RaftNodeInfo.getInstance().setElectionTask(schedule);}public static long getRandomTime() {// 要比心跳慢一点return RandomUtil.randomLong(250L, 1000L);}/*** 销毁心跳检测任务*/public static void electionTaskDestroy() {if (null ! RaftNodeInfo.getInstance().getElectionTask()) {RaftNodeInfo.getInstance().getElectionTask().cancel(true);RaftNodeInfo.getInstance().setElectionTask(null);}}/*** 销毁心跳任务*/public static void heartBeatTestDestroy() {if (null ! RaftNodeInfo.getInstance().getHeartBeatTask()) {RaftNodeInfo.getInstance().getHeartBeatTask().cancel(true);RaftNodeInfo.getInstance().setHeartBeatTask(null);}}一个全局节点信息类 public class RaftNodeInfo {/*** 自己*/private ServerConfig self;/*** 集群其他节点信息*/private ListServerConfig clusterConfig;/*** 当前节点状态 默认FOLLOW*/private volatile NodeStatusEnums currentNodeStatus NodeStatusEnums.FOLLOW;/*** 当前节点任期*/private volatile long currentTerm 0L;/*** 当前leader*/private volatile String currentLeaderId;/*** 最后日志索引 已提交的*/private volatile long lastLogIndex 0L;/*** 最后的日志任期 这我这没用到*/private volatile long lastLogTerm 0L;/*** 当前任期给谁投过票*/private volatile String voteFor;/*** 最近更新时间 心跳或者日志更新**/private volatile long lastUpdateTime 0L;/*** 心跳任务**/private ScheduledFuture heartBeatTask;/*** 心跳检测任务**/private ScheduledFuture electionTask;/*** 日志管理**/private LogManage logManage;/*** 日志文件**/private String logPath; }3.状态机 提供节点状态变更、心跳结果处理、投票结果处理、日志一致性处理 public class StateMachines {private static final Logger log LoggerFactory.getLogger(StateMachines.class);/** 候选人-》leader */public static void becomeLeader(){// 变为leaderRaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.LEADER);// leader设置为自己RaftNodeInfo.getInstance().setCurrentLeader(RaftNodeInfo.getInstance().getSelf().toString());// 票清了RaftNodeInfo.getInstance().setVoteFor(null);}/** follow-》候选人 */public static void becomeCandidate(){// 变为候选人RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.CANDIDATE);// 任期1RaftNodeInfo.getInstance().setCallVoteTerm();// 给自己投一票RaftNodeInfo.getInstance().setVoteFor(RaftNodeInfo.getInstance().getSelf().toString());}/** 候选人、leader-follow */public static void becomeFollow(long term,String leaderId,String voteFor){RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);RaftNodeInfo.getInstance().setCurrentLeader(leaderId);RaftNodeInfo.getInstance().setCurrentTerm(term);RaftNodeInfo.getInstance().setVoteFor(voteFor);RaftNodeInfo.getInstance().setLastUpdateTime(System.currentTimeMillis());}/** 投票结果一致性处理 */public static boolean voteResultHandler(ListFutureRequestVoteResult taskList,Integer nodeNum) throws ExecutionException, InterruptedException {int voteNum 0;for (FutureRequestVoteResult future : taskList) {RequestVoteResult voteResult future.get();// 判断leader是否还存活 存活的话肯定要把我给否了呀if (leaderIsLive(voteResult)) {return false;}if(voteResult!null){log.debug(投票结果,我的term{} 结果{},RaftNodeInfo.getInstance().getCurrentTerm(), JSONObject.toJSON(voteResult));}if (null ! voteResult voteResult.isVoteGranted()) {voteNum;}}if (voteNum ! 0 voteNum (nodeNum / 2)) {// 投票通过 升级为leaderStateMachines.becomeLeader();log.debug( {}: 哈哈哈我升级为leader啦, RaftNodeInfo.getInstance().getSelf().toString());return true;} else {// 投票不通过退成follow 继续苟着StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug( {}: 完了这帮人不支持我等待机会再试, RaftNodeInfo.getInstance().getSelf().toString());return false;}}// 判断leader是否还存活 存活的话肯定要把我给否了呀private static boolean leaderIsLive(RequestVoteResult voteResult) {if (null ! voteResult StrUtil.isNotEmpty(voteResult.getLeaderId())) {// 被leader一票否决退成follow 继续苟着StateMachines.becomeFollow(voteResult.getTerm(), voteResult.getLeaderId(), null);return true;}return false;}/** 心跳结果一致性处理 */public static boolean heartBeatResultHandler(ListFutureHeartBeatResult taskList,Integer nodeNum) throws ExecutionException, InterruptedException {int responseNum 0;for (FutureHeartBeatResult future : taskList) {HeartBeatResult heartBeatResult future.get();if (null ! heartBeatResult) {responseNum;}}if (responseNum ! 0 responseNum (nodeNum / 2)) {log.debug({}: 万众一心我再接再厉, RaftNodeInfo.getInstance().getSelf().toString());return true;} else {// 没有应答或者应答数量小于一半 就退化为候选者并停止对外提供服务// 状态变更StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug({}: 我找不到追随者了我暂时停止对外服务, RaftNodeInfo.getInstance().getSelf().toString());return false;}}/** 日志预提交结果 */public static boolean logPreCommitHandler(ListFutureAppendEntriesPreCommitResult taskList, Integer nodeNum) throws ExecutionException, InterruptedException {int responseNum 0;for (FutureAppendEntriesPreCommitResult future : taskList) {AppendEntriesPreCommitResult preCommitResult future.get();if (null ! preCommitResult preCommitResult.isSuccess()) {responseNum;}}return responseNum ! 0 responseNum (nodeNum / 2);} }4.日志模块 public interface LogManage extends ResourceLifeCycle{/** leader预提交 */long preCommitLog(LogEntity logEntity);/** follow预提交 */void preCommitLog(long preCommitLogId,LogEntity logEntity);/** 缓存移除 */void cacheLogRemove(long cacheLogId);/** leader日志提交 */long commitLog(long cacheLogId);/** follow日志提交 */void commitLog(long cacheLogId,long logIndex);/** follow日志Check */void logIndexCheck();/** 根据日志索引获取日志内容 */LogEntity getLogEntityByIndex(long logIndex, RandomAccessFile file);/** 命令数据处理 */void dataHandler(String command);/** 根据Key获取数据 */String getDataByKey(String key); }5.定时任务 ElectionTask心跳检测任务不通过则升级为CandidateHeartBeatTask心跳任务不断给Follow发送心跳阻止其成为CandidateLogIndexCheckTaskFollow日志Check定时任务 四、核心流程介绍 其实流程图已经很清楚了这里挑部分来聊聊 1.选举 目前心跳设置的时间为1500ms心跳检测的时间为1750ms0-750ms随机数之前随机数设置的很短算上网络延迟等因素导致两个Candidate同任期的几率非常之高follow收到心跳会更新lastUpdateTIme而心跳检测则会检测这个时间到当前时间是否超过检测时间间隔超过了则会变成candidate发起选举 CandidateRaftNode发起选举RPC 选举RPC实体类 public class RequestVoteRPC extends RpcMsgId implements Serializable {/** 候选人的任期号 */private long term;/** 请求选票的候选人的 Id(ip:selfPort) */private String candidateId;/** 候选人的最后日志条目的索引值 */private long lastLogIndex;/** 候选人最后日志条目的任期号 */private long lastLogTerm;}选举方法 public boolean callVoteRequest(ListServerConfig serverConfigs) throws ExecutionException, InterruptedException {if (CollectionUtil.isEmpty(serverConfigs)) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.error(只有一个节点还发起什么投票);return false;}// candidate 会发起投票请求RaftNodeInfo instance RaftNodeInfo.getInstance();// 投票过程中 可能又收到了心跳或者日志状态已经变为followif (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {return false;}log.debug( {}: 哈哈哈我发起了投票, RaftNodeInfo.getInstance().getSelf().toString());ListFutureRequestVoteResult taskList new ArrayList(serverConfigs.size());// 加上自己的一票 需要 大于 n/21// 所以直接 n/2 就算通过了// 但是注意此时如果已经存在leader日志数又不比当前leader大所以leader还是leader 具有一票否决权for (ServerConfig serverConfig : serverConfigs) {FutureRequestVoteResult voteResultFuture ThreadPoolUtils.sendAsyncMsgPool.submit(() - {// 构建投票RequestVoteRPC voteRPC RequestVoteRPC.builder().candidateId(instance.getSelf().toString()).term(instance.getCurrentTerm()) // 成为候选 的时候任期就1了.lastLogIndex(instance.getLastLogIndex()).build();RpcSessionRequestVoteResult, RequestVoteRPC voteRPCRpcSession RpcSessionFactory.RequestVoteResult, RequestVoteRPCopenSession(serverConfig, voteRPC);return voteRPCRpcSession null ? null : voteRPCRpcSession.syncSend(1000L);});taskList.add(voteResultFuture);}// 投票过程中 可能状态又已经变为followif (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {return false;}return StateMachines.voteResultHandler(taskList, serverConfigs.size());}Follow选举响应 任期比我大我就同意任期跟我一样记录的日志比我多而且我没有投过票我也同意 Follow同一个任期内只能投一票 public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {// follow 需要处理投票请求RaftNodeInfo instance RaftNodeInfo.getInstance();RequestVoteResult voteResult RequestVoteResult.builder().term(instance.getCurrentTerm()).build();voteResult.setRequestId(voteRPC.getRequestId());// 1.任期比我大我直接就同意if (voteRPC.getTerm() instance.getCurrentTerm()) {return agreeVote(voteResult, voteRPC);}// 2.任期跟我一样记录的日志比我多 而且 我没有投过票// 我只能投一票if ((voteRPC.getTerm() instance.getCurrentTerm() voteRPC.getLastLogIndex() instance.getLastLogIndex()) (instance.getVoteFor() null || instance.getVoteFor().equals(voteRPC.getCandidateId()))) {return agreeVote(voteResult, voteRPC);}voteResult.setTerm(instance.getCurrentTerm());voteResult.setVoteGranted(false);log.info( {}: 我身为现任Follow我不认可你的实力我不能给你投票{}, instance.getSelf().toString(), voteRPC.getCandidateId());return voteResult;}private RequestVoteResult agreeVote(RequestVoteResult voteResult, RequestVoteRPC voteRPC) {voteResult.setTerm(RaftNodeInfo.getInstance().getCurrentTerm());voteResult.setVoteGranted(true);RaftNodeInfo.getInstance().setCurrentTerm(voteRPC.getTerm());RaftNodeInfo.getInstance().setVoteFor(voteRPC.getCandidateId());log.info( {}: 我身为现任Follow我认可你的实力我给你投票{}, RaftNodeInfo.getInstance().getSelf().toString(), voteRPC.getCandidateId());return voteResult;}Leader响应 leader有没有可能收到投票有可能假设某一个Follow延迟收到心跳或者没有收到心跳就会发起那leader就会收到它发起的投票那怎么办判断任期和日志任期和日志都比Leader大则Leader需要退位否则Leader应该具有一票否决权这样就防止了某个follow无限发起投票任期无限1这种情况 一个candidate任期非常大的时候其他follow必然会给他投票那这样就升为leader就导致了同时存在两个leader的情况所以这时候的当期leader应该具有一票否决权 public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {// leader 有可能收到 候选者的投票申请RaftNodeInfo instance RaftNodeInfo.getInstance();RequestVoteResult requestVoteResult RequestVoteResult.builder().build();requestVoteResult.setRequestId(voteRPC.getRequestId());// 候选人的任期比我大 而且日志还比我大 说明我已经out了我需要退位if (voteRPC.getTerm() instance.getCurrentTerm() voteRPC.getLastLogIndex() instance.getLastLogIndex()) {// 状态变更StateMachines.becomeFollow(voteRPC.getTerm(), voteRPC.getCandidateId(), null);requestVoteResult.setTerm(voteRPC.getTerm());requestVoteResult.setVoteGranted(true);log.info( {}: 我身为现任leader我认可你的实力我下位让贤{}, instance.getSelf().toString(), voteRPC.getCandidateId());return requestVoteResult;}log.info( {}: 我身为现任leader不同你的上任请求{}, instance.getSelf().toString(), voteRPC.getCandidateId());// 否则就不同意而且你还得给我老实点requestVoteResult.setTerm(instance.getCurrentTerm());requestVoteResult.setVoteGranted(false);requestVoteResult.setLeaderId(instance.getSelf().toString());return requestVoteResult;}2.心跳 心跳这里我做了一个响应降级的操作其实正常是不需要的我这的目的是防止网络分区 假设原本是这样 一旦网络分区则会变成这样导致两个leader的出现所以这时候心跳的响应就至关重要一旦响应少于半数则leader应该自动降级 LeaderRaftNode发起心跳 public boolean callHeartBeatRequest(ListServerConfig serverConfigs) throws ExecutionException, InterruptedException {if (CollectionUtil.isEmpty(serverConfigs)) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug( {}: 只有一个leader还发什么心跳, RaftNodeInfo.getInstance().getSelf().toString());return false;}ListFutureHeartBeatResult taskList new ArrayList(serverConfigs.size());// leader 需要发送心跳 防止网络分区一旦心跳返回不足 n/2 则自动降级for (ServerConfig serverConfig : serverConfigs) {FutureHeartBeatResult heartBeatResultFuture ThreadPoolUtils.sendAsyncMsgPool.submit(() - {HeartBeatRequest build HeartBeatRequest.builder().leaderId(RaftNodeInfo.getInstance().getSelf().toString()).leaderLastCommitIndex(RaftNodeInfo.getInstance().getLastLogIndex()).term(RaftNodeInfo.getInstance().getCurrentTerm()).build();RpcSessionHeartBeatResult, HeartBeatRequest heartBeatRequestRpcSession RpcSessionFactory.HeartBeatResult, HeartBeatRequestopenSession(serverConfig, build);return heartBeatRequestRpcSession null ? null : heartBeatRequestRpcSession.syncSend(200L);});taskList.add(heartBeatResultFuture);}// 响应结果处理return StateMachines.heartBeatResultHandler(taskList, serverConfigs.size());}3.日志 日志设计的非常之简陋就不做过多的介绍了本文目的还是以实现Raft为主性能问题暂不考虑不过还是说一下测试结果因为KV存储项目启动需要读取数据放入内存目前读取50m左右文件10w条日志需要8s左右肯定是不合理的目前并没有做日志压缩和快照也没有用零拷贝技术因为不想搞的太过复杂 关于日志check这里放上两种测试常见的结果 1.新的节点加入需要拉取一次所有数据 2.日志中间缺失 两种情况都是没问题的 五、遗留的问题 注意尽管这样还是有几率导致数据丢失的 再次强调本文不完全和Raft论文对标加了不少个人的想法进去所以在这个过程中都是遇到问题、思考问题、解决问题这本就是一个学习的过程目前最大的一个问题就是 新加入的节点已经收到了Leader的数据更新的lastCommitIndex但是还没来得及向Leader同步以前的数据而这时Leader挂了所以这时候这个节点就有几率通过投票成为Leader这时候数据就有几率丢失文章中可能看不太出来具体得看看代码这算是一个很严重的BUG各位想想可以怎么解决而Raft又是怎么解决的 当然可能还有其他问题各位大佬如果知道的也可以提出来 六、总结 只有深入本质才能顺应发展在分布式体系下共识算法是必不可少的光看不实践就容易眼高手低当初我看Raft的时候也感觉挺简单的不就是三种状态做不同的事然后状态变更嘛真正一做起来就发现好多细节都需要考虑这还只是个demo回头想想RocketMq和kafka的存储设计是真的厉害做完这个又收获不少 七、项目\个人博客地址 项目地址 个人博客 : 无八股全干货
http://www.hkea.cn/news/14282887/

相关文章:

  • 厦门做网页网站的公司武义建设局官方网站
  • 家庭网站建设找人做淘宝网站
  • 二手房网站建设文字生成二维码
  • 深圳专业网站建设企业个人网站可以做app吗
  • 天津个人网站制作一个好的营销型网站模板
  • 网站建设给客户看的ppt模板网站自助建站
  • 怎么做运营网站服装企业网站策划书
  • phpcms套好的网站 放到空间上 后台打开的验证码不能显示wordpress网易音乐播放器
  • 怎样下载模板做网站中国卫生健康网官网
  • 各大网站域名北京专业网站建设
  • 浙江省建设厅查询官方网站做景观素材有哪几个网站
  • 家用电器网站建设网站备案核验单清晰
  • 网站模板与网站定制版的区别房地产怎么做网站推广
  • 网站开发 百度编辑器鹤壁网站推广公司
  • 天津建设合同怎么在网站录入html5国内网站
  • 网站整站下载器 全站克隆页面图片视频下载 仿站专用源码工具软件电子商务网站规划原则
  • win2008r2做网站服务器windows下搭建wordpress
  • 中国工厂网站官方网站软件开发net教程免费
  • 网站建设的一般流程是购销网站建设视频百度云
  • 石狮app网站开发企业网站如何进行seo
  • 做网站 注意自己怎么注册域名
  • 上海个人网站备案wordpress 自定义布局
  • 长长沙网站制作中文网页模板免费
  • 沈阳专业网站制作团队做游戏网站要通过什么审核
  • 重庆网站定制开发一凡招聘 建筑人才网
  • 玉树电子商务网站建设黄页网页的推广网站
  • 题库网站建设的绩效指标wordpress微信域名回调
  • 建筑网站的功能模块企业所得税交多少
  • 门户网站的分类建筑施工模板
  • 网站模板用什么打开福州网站建设方案优化