荆州网站推广怎么做,免费php网站模板下载,装饰公司网站建设方案,市场运营和市场营销的区别引言小故事
张三在一家小型互联网公司上班#xff0c;由于公司实行的996#xff0c;因此经常有同事“不辞而别”#xff0c;为了工作的正常推进#xff0c;团队内达成了某种默契#xff0c;这种默契就是通过某个规则来选出一个同事#xff0c;这个同事除了工作之余还有额…引言小故事
张三在一家小型互联网公司上班由于公司实行的996因此经常有同事“不辞而别”为了工作的正常推进团队内达成了某种默契这种默契就是通过某个规则来选出一个同事这个同事除了工作之余还有额外看看每天是否有同事“不辞而别”当发现有同事李四离职时就会去把李四负责的工作的内容进行拆分给其他的同事进行处理。整个过程大致如下图
由上图可以看到这个公司通过一个签到本和工作进度表来完成整个流程每个同事上班时都要在签到本上进行签到每天下班前要在工作进度表上同步今天的工作进展例如今天李四“不辞而别”溜了张三在签到本上看到李四没有签到记录就判定这家伙不干了同时在工作进度表中把李四的任务进行拆分给大狗和二狗来做…
通过上面的故事会发现有几个问题
张三是通过什么规则被选成“监督者”的如果张三也不辞而别呢为啥要通过签到本的方式而不是张三直接去挨个挨个看…
咱们可以带着这些种种心里的疑惑看下面的文章这个故事其实是一个分布式存储组件的雏形刚刚所讨论的那些问题也是这些组件所会遇到的且大部分都是有解法的所以咱们接下来就来看看bookkeeper这个分布式存储组件是如何解决上述问题的
bookkeeper基础
“硬件无法保证不故障”在这个大前提下所有运行在硬件上的存储组件都一定会做一件很重要的事情这件事就是数据恢复要么是在组件内部来做要么是在组件外部来做。
bk是一个具有容错的分布式存储组件同一份数据会有多个副本分别存在多个bookie中来提供容错保证那么当一台bookie不可用时其上面保存的数据都少了一个副本如果不进行数据恢复/复制的话再有其他的bookie不可用就很容易造成数据的丢失。因此bk自身内部提供了数据恢复的机制今天通篇大论都是围绕bk的这个机制进行展开的
数据恢复一般分为手动和自动bk同时支持这两种方式接下来就看看具体怎么操作的
手动恢复
bin/bookkeeper shell recover 192.168.1.10:3181 指定bookie机器来恢复
bin/bookkeeper shell recover 192.168.1.10:3181 --ledger ledgerID 指定bookie机器上的某个ledgerId进行恢复在执行手动恢复时会发生以下四个步骤
客户端从zookeeper读取Ledger信息根据Ledger信息确定需要做数据复制的Ledger(根据Ledger中存有被哪些bookie存储的元信息来确定)在客户端启动一个做数据恢复的进程针对需要做数据复制的Ledger进行数据恢复一旦所有Ledger被标记为全副本了则恢复动作完成
自动恢复
bin/bookkeeper autorecovery bookie 集群开启自动恢复机制
bin/bookkeeper shell autorecovery -disable 关闭自动恢复机制
bin/bookkeeper shell autorecovery -enable 关闭恢复后再重新开启除了通过指令的方式启动bk还支持配置的方式只需要在bookie节点配置autoRecoveryDaemonEnabled为false这个bookie节点在启动的时候也同样会启动autorecovery服务
autorecovery机制
上一章节讲了怎么使用本章节主要讲明autorecovery这个机制
自动恢复机制中有两个角色Auditor和replication worker在启动自动恢复机制后会在每个bookie实例中启动这两个角色
Auditor
bk集群中的Auditor们会通过zookeeper选举产生一位leader这个leader负责监听 zookeeper /ledgers/available 节点变化情况来判定是否要做数据恢复动作因为所有节点启动都会注册在上面如果有服务不可用由于zookeeper的临时目录机制会自动删除在此目录下自己节点的信息因此leader通过watch机制可以轻松感知到有节点不可用当Auditor leader感知到有节点不可用时会将此bookie所负责的所有Ledger加在zookeeper /ledgers/underreplicated 路径下通过这种方式通知replication worker做数据恢复过程
replication worker
每个replication worker都会监听 /ledgers/underreplicated 地址在监听到有数据恢复任务时会在 /ledgers/underreplication/locks下添加锁从而避免并发问题如果在开始恢复前发下当前Ledger的Fragment还处于写入中的状态replication worker会先尝试等待它写完再做数据恢复动作但如果等了一段时间还没写完会通过Fence机制处理再做复制同时开启一个新的Fragment给客户端做数据的继续写入
启动工作流程
参照上图在服务器节点上执行bin/bookkeeper autorecovery bookie 后会发生以下步骤
通过exec shell指令调用操作系统拉起AutoRecoveryMain 这个Java进程AutoRecoveryMain进程启动时会同时启动Auditor线程和ReplicationWorker线程由于环境中可能会启动多个AutoRecoveryMain进程来做HA高可用因此多个Auditor线程会通过zookeeper选举来产生一个Auditor Leader由于bookie集群用zookeeper来做集群感知因此Auditor Leader只需要通过watch监听zookeeper上 bookie所注册的地址就能感知到是否有bookie节点不可用当bookie节点不可用时一般就不会上报心跳给zookeeperzookeeper就会将该节点创建的临时目录进行删除并告知添加watch的Auditor LeaderAuditor Leader收到通知后会去zookeeper查询该不可用bookie所负责的Ledger列表理论上这些Ledger都是需要做数据恢复的因此会将它们放在zookeeper的/ledgers/underreplicated 目录下来通知ReplicationWorkerReplicationWorker通过watch监听到此目录有需要做数据恢复的Ledger后会先在zk加锁再进行数据恢复逻辑通过将Ledger划分为多个Fragment来轮训进行数据恢复通过读取其他正常bookie上该Ledger的数据并写到其他没有该数据的bookie的节点上从而保证每份数据都有多个副本直到将/ledgers/underreplicated 下的所有Ledger进行复制完本次 autorecovery就算完成了。而Auditor线程和ReplicationWorker线程会不停的监听zookeeper直到下一个bookie节点不可用
通过此机制给bookkeeper提高了稳定性以及高可用能力在有个别节点挂掉的时候依然能自动做到数据完备不丢这种设计是一个成熟的组件该具备的能力
autorecovery启动源码
源码主要分 启动流程以及工作流程进行讲解同时在这里给需要阅读的朋友提供一个可能会用上的“词典”
AutoRecoveryMain核心类, 主要负责启动AutoRecovery服务
AutoRecoveryService核心类主要负责AutoRecovery相关的服务
LedgerManager 对外提供一个管理ledger的api对内负责如何将ledger的元数据存储在kv存储上。提供增删、读写、注册/注销六个核心接口
AbstractZkLedgerManager 抽象类
LedgerIdGenerator基于zk实现全局唯一递增的ledgerId
ZkLedgerUnderreplicationManager管理未完成复制的Ledger
ZkLedgerAuditorManager管理Auditor
ReplicationWorker负责从ZkLedgerUnderreplicationManager中获取未完成复制的Ledger并进行复制每隔rwRereplicateBackoffMs触发一次
LedgerFragment组成Ledger的单元也是恢复复制的单元
EmbeddedServer启动bk实例的节点从现在开始跟踪启动的源码在客户端执行 bin/bookkeeper autorecovery bookie 后会走到 bookkeeper/bin/bookkeeper 这个脚本下面的这行逻辑
if [ ${COMMAND} autorecovery ]; thenexec ${JAVA} ${OPTS} ${JMX_ARGS} org.apache.bookkeeper.replication.AutoRecoveryMain --conf ${BOOKIE_CONF} $逻辑非常清晰其实就是通过shell启动AutoRecovery 这样一个独立的Java进程专门负责做故障数据恢复。JVM会从启动类的main方法进行引导执行因此咱们接下来从AutoRecoveryMain的main方法作为入口来看看后面会发生哪些事情 public static void main(String[] args) {//调用真正执行的方法开源项目中真正执行某个操作会以do前缀来进行修饰int retCode doMain(args);....}static int doMain(String[] args) {ServerConfiguration conf;try {//根据shell启动命令中指定的配置地址加载成配置对象conf parseArgs(args);} catch (IllegalArgumentException iae) {....}LifecycleComponent server;try {//构建AutoRecoveryServer对象比较重要的方法server buildAutoRecoveryServer(new BookieConfiguration(conf));} catch (Exception e) {....}try {//启动AutoRecoveryServer对象ComponentStarter.startComponent(server).get();} catch (InterruptedException ie) {....}return ExitCode.OK;}通过这里可以发现AutoRecoveryMain的main方法只是做一个引导的动作最终启动的是AutoRecoveryServer对象。因此让我们深入看看这个服务的构造以及启动的流程 public static LifecycleComponentStack buildAutoRecoveryServer(BookieConfiguration conf) throws Exception {LifecycleComponentStack.Builder serverBuilder LifecycleComponentStack.newBuilder().withName(autorecovery-server);// 1. 创建StatsProviderService对象主要用来记录AutoRecovery服务的各项指标状态StatsProviderService statsProviderService new StatsProviderService(conf);....// 2. 通过构造函数的方式创建AutoRecoveryService对象这是核心的代码AutoRecoveryService autoRecoveryService new AutoRecoveryService(conf, rootStatsLogger);....// 3. 创建BKHttpServiceProvider对象主要用来对外提供http服务支持通过http方式读取内部状态信息等if (conf.getServerConf().isHttpServerEnabled()) {BKHttpServiceProvider provider new BKHttpServiceProvider.Builder().setAutoRecovery(autoRecoveryService.getAutoRecoveryServer()).setServerConfiguration(conf.getServerConf()).setStatsProvider(statsProviderService.getStatsProvider()).build();HttpService httpService new HttpService(provider, conf, rootStatsLogger);....}return serverBuilder.build();}再看AutoRecoveryService的构造函数 public AutoRecoveryService(BookieConfiguration conf, StatsLogger statsLogger) throws Exception {super(NAME, conf, statsLogger);//通过构造函数创建AutoRecoveryMainAutoRecoveryMain是AutoRecoveryService的成员变量//进入看看它的实现this.main new AutoRecoveryMain(conf.getServerConf(),statsLogger);}public AutoRecoveryMain(ServerConfiguration conf, StatsLogger statsLogger)throws IOException, InterruptedException, KeeperException, UnavailableException,CompatibilityException {....//创建AuditorElector对象负责选举产生Auditor LeaderauditorElector new AuditorElector(BookieImpl.getBookieId(conf).toString(),conf,bkc,statsLogger.scope(AUDITOR_SCOPE),false);//创建ReplicationWorker对象负责做数据的拷贝工作replicationWorker new ReplicationWorker(conf,bkc,false,statsLogger.scope(REPLICATION_WORKER_SCOPE));deathWatcher new AutoRecoveryDeathWatcher(this);}服务构造的逻辑差不多就跟到这了我们知道最终是为了创建AuditorElector和ReplicationWorker这两个对象就够了。服务启动这块从上面的 ComponentStarter.startComponent(server).get(); 进行跟踪 public static CompletableFutureVoid startComponent(LifecycleComponent component) {....//调用start方法这里涉及上采用了模版方法设计模式以及闭包本质上就是就是调用创建的//StatsProviderService、 AutoRecoveryService、HttpService这三个服务的doStart方法component.start();....}protected void doStart() {//还是调的AutoRecoveryMain方法的start方法this.main.start();}public void start() {//启动auditorElector服务auditorElector.start();//启动replicationWorker服务replicationWorker.start();....deathWatcher.start();}结合上面的可以发现AutoRecovery的启动本质上就是启动AuditorElector和ReplicationWorker这两个服务因此接下来咱们就来看看这两个服务的start过程先来看看AuditorElector public Future? start() {running.set(true);//提交选举任务return submitElectionTask();}Future? submitElectionTask() {Runnable r new Runnable() {Overridepublic void run() {....//创建一个Auditor对象并进行启动auditor new Auditor(bookieId, conf, bkc, false, statsLogger);auditor.start();}};try {//异步执行以上逻辑return executor.submit(r);} catch (RejectedExecutionException e) {....}}在这里其实就是对Auditor对象进行初始化以及启动再进一步跟踪 public Auditor(final String bookieIdentifier,ServerConfiguration conf,BookKeeper bkc,boolean ownBkc,BookKeeperAdmin admin,boolean ownAdmin,StatsLogger statsLogger)throws UnavailableException {....//调用初始化Auditor对象逻辑initialize(conf, bkc);....}private void initialize(ServerConfiguration conf, BookKeeper bkc)throws UnavailableException {try {LedgerManagerFactory ledgerManagerFactory bkc.getLedgerManagerFactory();ledgerManager ledgerManagerFactory.newLedgerManager();this.bookieLedgerIndexer new BookieLedgerIndexer(ledgerManager);this.ledgerUnderreplicationManager ledgerManagerFactory.newLedgerUnderreplicationManager();....lostBookieRecoveryDelayBeforeChange this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();} catch (CompatibilityException ce) {....}}看完了初始化逻辑再继续看下Auditor的启动逻辑
public void start() {LOG.info(Im starting as Auditor Bookie. ID: {}, bookieIdentifier);synchronized (this) {....try {//1. 监听bookie变更事件本质上就是在zk /ledgers/available 目录下增加watch监听节点的变动//这里还会监听 只读bookie 节点的变动watchBookieChanges();//从zk获取处于可用的bk节点列表knownBookies getAvailableBookies();} catch (BKException bke) {....}try {//1. 在感知到有bookie节点不可用时回调LostBookieRecoveryDelayChangedCb进行逻辑处理this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());} catch (UnavailableException ue) {....}try {//1. 感知到有Ledger的副本少时触发跟上面一样也是通过回调方式进行处理this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(new UnderReplicatedLedgersChangedCb());} catch (UnavailableException ue) {....}scheduleBookieCheckTask();//启动一个线程检查Ledger的状态scheduleCheckAllLedgersTask();schedulePlacementPolicyCheckTask();scheduleReplicasCheckTask();}}这些就是Auditor启动的逻辑接下来再看看ReplicationWorker的启动逻辑 public void start() {//workerThread实际上就是一个BookieThread对象this.workerThread.start();}public void run() {workerRunning true;while (workerRunning) {try {//核心逻辑就是循环调用rereplicate方法if (!rereplicate()) {LOG.warn(failed while replicating fragments);waitBackOffTime(rwRereplicateBackoffMs);}} catch (InterruptedException e) {....}}LOG.info(ReplicationWorker exited loop!);}private boolean rereplicate() throws InterruptedException, BKException,UnavailableException {//获取需要做数据恢复的Ledgerlong ledgerIdToReplicate underreplicationManager.getLedgerToRereplicate();Stopwatch stopwatch Stopwatch.createStarted();boolean success false;try {//进行数据恢复success rereplicate(ledgerIdToReplicate);} finally {....}return success;}autorecovery工作源码
这块由于逻辑相对较多因此针对autorecovery工作流程单独开一章。经过上面我们可以清晰的知道在经过启动后都发生了哪些事情接下来咱们看看autorecovery真正工作的逻辑。在Auditor start的时候会通过监听zookeeper来感知数据的动态变化
public void start() {//感知bookie节点下线将这些bookie上管理的Ledger标记为需要备份放到zookeeper上this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());//感知Ledger副本变动统计到指标里this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(new UnderReplicatedLedgersChangedCb());
}上述两个唤醒方法主要是通过watch感知zookeeper事件所以咱们主要看回调类里面的处理逻辑先看下LostBookieRecoveryDelayChangedCb类 private class LostBookieRecoveryDelayChangedCb implements GenericCallbackVoid {Overridepublic void operationComplete(int rc, Void result) {....Auditor.this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this);....//提交事件变动处理任务进去看看Auditor.this.submitLostBookieRecoveryDelayChangedEvent();}}synchronized Future? submitLostBookieRecoveryDelayChangedEvent() {return executor.submit(() - {int lostBookieRecoveryDelay -1;try {waitIfLedgerReplicationDisabled();lostBookieRecoveryDelay Auditor.this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();....//核心逻辑,进去看看都做了些什么auditorBookieCheckTask.startAudit(false);} else if (auditTask ! null) {LOG.info(lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly,lostBookieRecoveryDelay);auditTask executor.schedule(() - {auditorBookieCheckTask.startAudit(false);auditTask null;bookiesToBeAudited.clear();}, lostBookieRecoveryDelay, TimeUnit.SECONDS);auditorStats.getNumBookieAuditsDelayed().inc();}} catch (InterruptedException ie) {....} finally {if (lostBookieRecoveryDelay ! -1) {lostBookieRecoveryDelayBeforeChange lostBookieRecoveryDelay;}}});}void startAudit(boolean shutDownTask) {try {//看起来是开始做Auditor的主要任务了继续往下auditBookies();shutDownTask false;} catch (BKException bke) {....}}void auditBookies()throws ReplicationException.BKAuditException, InterruptedException, BKException {....ListString availableBookies getAvailableBookies();// find lost bookiesSetString knownBookies ledgerDetails.keySet();//通过之前内存中存的bookie集合减去 zk当前bookie集合即可得出都有哪些bookie节点不可用了CollectionString lostBookies CollectionUtils.subtract(knownBookies,availableBookies);....//如果本次变动涉及到bookie节点不可用则调用handleLostBookiesAsync方法处理不可用的节点if (lostBookies.size() 0) {try {FutureUtils.result(handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);} catch (ReplicationException e) {....}....}....}private CompletableFuture? handleLostBookiesAsync(CollectionString lostBookies,MapString, SetLong ledgerDetails) {LOG.info(Following are the failed bookies: {}, and searching its ledgers for re-replication, lostBookies);return FutureUtils.processList(Lists.newArrayList(lostBookies),//看方法名大概能猜得出来计算这些bookie上的Ledger并针对这些Ledger进行数据恢复//由于之前有存节点跟Ledger的映射关系因此直接通过ledgerDetails映射表来获取这些不可用节点所负责的LedgerbookieIP - publishSuspectedLedgersAsync(Lists.newArrayList(bookieIP), ledgerDetails.get(bookieIP)),null);}protected CompletableFuture? publishSuspectedLedgersAsync(CollectionString missingBookies, SetLong ledgers) {....LongAdder underReplicatedSize new LongAdder();FutureUtils.processList(Lists.newArrayList(ledgers),ledgerId -//通过读取这些Ledger的元数据方便后续的数据恢复动作ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) - {if (exception null) {underReplicatedSize.add(metadata.getValue().getLength());}}), null).whenComplete((res, e) - {....});return FutureUtils.processList(Lists.newArrayList(ledgers),//主流程继续往下ledgerId - ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies),null);}public CompletableFutureVoid markLedgerUnderreplicatedAsync(long ledgerId, CollectionString missingReplicas) {....final String znode getUrLedgerZnode(ledgerId);//标记需要做备份的LedgertryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);return createFuture;}private void tryMarkLedgerUnderreplicatedAsync(final String znode,final CollectionString missingReplicas,final ListACL zkAcls,final CompletableFutureVoid finalFuture) {....//将需要做数据恢复的副本进行进行proto编码missingReplicas.forEach(builder::addReplica);....ZkUtils.asyncCreateFullPathOptimistic(zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT,(rc, path, ctx, name) - {if (Code.OK.intValue() rc) {FutureUtils.complete(finalFuture, null);} else if (Code.NODEEXISTS.intValue() rc) {//要在zookeeper将这些Ledger标记为需要做数据恢复handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);} else {FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc)));}}, null);}private void handleLedgerUnderreplicatedAlreadyMarked(final String znode,final CollectionString missingReplicas,final ListACL zkAcls,final CompletableFutureVoid finalFuture) {// get the existing underreplicated ledger datazkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) - {if (Code.OK.intValue() getRc) {// deserialize existing underreplicated ledger datafinal UnderreplicatedLedgerFormat.Builder builder UnderreplicatedLedgerFormat.newBuilder();try {TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder);} catch (ParseException e) {....}UnderreplicatedLedgerFormat existingUrLedgerFormat builder.build();boolean replicaAdded false;for (String missingReplica : missingReplicas) {if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) {continue;} else {builder.addReplica(missingReplica);replicaAdded true;}}....//盲猜这里在zk将Ledger标志为需要做数据同步zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) - {if (Code.OK.intValue() setRc) {FutureUtils.complete(finalFuture, null);} else if (Code.NONODE.intValue() setRc) {tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);} else if (Code.BADVERSION.intValue() setRc) {handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);} else {FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc)));}}, null);} else if (Code.NONODE.intValue() getRc) {tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);} else {FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc)));}}, null);}从ReplicationWorker的rereplicate方法开始就是真正做数据恢复的过程 private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,UnavailableException {....//获取需要做数据恢复的Ledger的处理对象LedgerHandle try (LedgerHandle lh admin.openLedgerNoRecovery(ledgerIdToReplicate)) {//通过对Ledger进行分解成更小数据恢复单位LedgerFragment后续分别对LedgerFragment进行数据恢复SetLedgerFragment fragments getUnderreplicatedFragments(lh,conf.getAuditorLedgerVerificationPercentage());....for (LedgerFragment ledgerFragment : fragments) {....try {//对LedgerFragment进行数据恢复admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback);numFragsReplicated;if (ledgerFragment.getReplicateType() LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT) {numNotAdheringPlacementFragsReplicated;}} catch (BKException.BKBookieHandleNotAvailableException e) {....}}....fragments getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage());....} catch (BKNoSuchLedgerExistsOnMetadataServerException e) {....} finally {....}}继续进一步看admin.replicateLedgerFragment的实现 public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,final BiConsumerLong, Long onReadEntryFailureCallback) throws InterruptedException, BKException {....//继续往下跟踪replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);}private void replicateLedgerFragment(LedgerHandle lh,final LedgerFragment ledgerFragment,final MapInteger, BookieId targetBookieAddresses,final BiConsumerLong, Long onReadEntryFailureCallback)throws InterruptedException, BKException {....//在这里看到这个恢复其实是异步处理的过程继续往下asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieSet, onReadEntryFailureCallback);....}private void asyncRecoverLedgerFragment(final LedgerHandle lh,final LedgerFragment ledgerFragment,final AsyncCallback.VoidCallback ledgerFragmentMcb,final SetBookieId newBookies,final BiConsumerLong, Long onReadEntryFailureCallback) throws InterruptedException {//发现会调用LedgerFragmentReplicator对象进行数据恢复继续往下lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies, onReadEntryFailureCallback);}void replicate(final LedgerHandle lh, final LedgerFragment lf,final AsyncCallback.VoidCallback ledgerFragmentMcb,final SetBookieId targetBookieAddresses,final BiConsumerLong, Long onReadEntryFailureCallback)throws InterruptedException {SetLedgerFragment partionedFragments splitIntoSubFragments(lh, lf,bkc.getConf().getRereplicationEntryBatchSize());....//继续往下看实现replicateNextBatch(lh, partionedFragments.iterator(),ledgerFragmentMcb, targetBookieAddresses, onReadEntryFailureCallback);}private void replicateNextBatch(final LedgerHandle lh,final IteratorLedgerFragment fragments,final AsyncCallback.VoidCallback ledgerFragmentMcb,final SetBookieId targetBookieAddresses,final BiConsumerLong, Long onReadEntryFailureCallback) {if (fragments.hasNext()) {try {//来了一般有Internal的地方都会有实现的干货继续进去replicateFragmentInternal(lh, fragments.next(),new AsyncCallback.VoidCallback() {Overridepublic void processResult(int rc, String v, Object ctx) {if (rc ! BKException.Code.OK) {ledgerFragmentMcb.processResult(rc, null,null);} else {replicateNextBatch(lh, fragments,ledgerFragmentMcb,targetBookieAddresses,onReadEntryFailureCallback);}}}, targetBookieAddresses, onReadEntryFailureCallback);} catch (InterruptedException e) {.....}} else {ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);}}private void replicateFragmentInternal(final LedgerHandle lh,final LedgerFragment lf,final AsyncCallback.VoidCallback ledgerFragmentMcb,final SetBookieId newBookies,final BiConsumerLong, Long onReadEntryFailureCallback) throws InterruptedException {....//针对每个Entry对象循环做数据恢复Entry是BK里最小的数据单元for (final Long entryId : entriesToReplicate) {recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,newBookies, onReadEntryFailureCallback);}}private void recoverLedgerFragmentEntry(final Long entryId,final LedgerHandle lh,final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,final SetBookieId newBookies,final BiConsumerLong, Long onReadEntryFailureCallback) throws InterruptedException {....long startReadEntryTime MathUtils.nowInNano();/** Read the ledger entry using the LedgerHandle. This will allow us to* read the entry from one of the other replicated bookies other than* the dead one.*///到了真正读取Entry的逻辑继续往下lh.asyncReadEntries(entryId, entryId, new ReadCallback() {....}}, null);}public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {....//调用异步读取逻辑asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);}void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb,Object ctx, boolean isRecoveryRead) {if (!clientCtx.isClientClosed()) {//继续往下跟踪readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead).whenCompleteAsync(new FutureEventListenerLedgerEntries() {....}, clientCtx.getMainWorkerPool().chooseThread(ledgerId));} else {cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx);}}CompletableFutureLedgerEntries readEntriesInternalAsync(long firstEntry,long lastEntry,boolean isRecoveryRead) {//构造读数据的对象PendingReadOp op new PendingReadOp(this, clientCtx,firstEntry, lastEntry, isRecoveryRead);//运行起来跟进去瞅瞅op.run();return op.future();}public void run() {//无他继续往下initiate();}void initiate() {....do {//决定是串行读取数据还是并行读取数据if (parallelRead) {entry new ParallelReadRequest(ensemble, lh.ledgerId, i);} else {entry new SequenceReadRequest(ensemble, lh.ledgerId, i);}seq.add(entry);i;} while (i endEntryId);// read the entries.for (LedgerEntryRequest entry : seq) {//核心逻辑这里进行数据读取操作entry.read();}}void read() {//继续往下看sendNextRead();}synchronized BookieId sendNextRead() {....try {BookieId to ensemble.get(bookieIndex);//发送读取请求的操作sendReadTo(bookieIndex, to, this);....} catch (InterruptedException ie) {....}}void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) throws InterruptedException if (isRecoveryRead) {....} else {//调用BK客户端进行数据的读取clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE);}}default void readEntry(BookieId address, long ledgerId, long entryId,ReadEntryCallback cb, Object ctx, int flags) {//继续往下readEntry(address, ledgerId, entryId, cb, ctx, flags, null);}default void readEntry(BookieId address, long ledgerId, long entryId,ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey) {//继续往下readEntry(address, ledgerId, entryId, cb, ctx, flags, masterKey, false);}public void readEntry(final BookieId addr, final long ledgerId, final long entryId,final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,final boolean allowFastFail) {//获取要访问的客户端对象final PerChannelBookieClientPool client lookupClient(addr);....client.obtain((rc, pcbc) - {if (rc ! BKException.Code.OK) {completeRead(rc, ledgerId, entryId, null, cb, ctx);} else {//调用读取逻辑pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);}}, ledgerId);}public void readEntry(final long ledgerId,final long entryId,ReadEntryCallback cb,Object ctx,int flags,byte[] masterKey,boolean allowFastFail) {//看到Internal就知道要有东西了继续往下readEntryInternal(ledgerId, entryId, null, null, false,cb, ctx, (short) flags, masterKey, allowFastFail);}private void readEntryInternal(final long ledgerId,final long entryId,final Long previousLAC,final Long timeOutInMillis,final boolean piggyBackEntry,final ReadEntryCallback cb,final Object ctx,int flags,byte[] masterKey,boolean allowFastFail) {....//构造请求对象ReadRequest.Builder readBuilder ReadRequest.newBuilder().setLedgerId(ledgerId).setEntryId(entryId);....request withRequestContext(Request.newBuilder()).setHeader(headerBuilder).setReadRequest(readBuilder).build();....//继续往下writeAndFlush(channel, completionKey, request, allowFastFail);}private void writeAndFlush(final Channel channel,final CompletionKey key,final Object request,final boolean allowFastFail) {....try {....//跟到这里就知道最终调用了Netty的客户端来发起请求channel.writeAndFlush(request, promise);} catch (Throwable e) {....}}到这里数据就发出去了我们也能知道AutoRecovery进程是通过Netty向BK的服务端进行数据读取那么服务端在接收到请求后又是怎么处理的呢这里咱们从服务端接收请求的逻辑开始跟由于BK本身也是通过Netty实例进行网络请求处理的因此可以轻松找到BookieRequestHandler的channelRead方法监听外部网络请求 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {....//职责分离做得很好BookieRequestHandler只负责接收请求逻辑处理相关的全权交给requestProcessor对象requestProcessor.processRequest(msg, this);}public void processRequest(Object msg, BookieRequestHandler requestHandler) {Channel channel requestHandler.ctx().channel();if (msg instanceof BookkeeperProtocol.Request) {BookkeeperProtocol.Request r (BookkeeperProtocol.Request) msg;restoreMdcContextFromRequest(r);try {BookkeeperProtocol.BKPacketHeader header r.getHeader();//非常好的一种设计kafka的KafkaApis类里也是这样设计服务端支持的操作在这里就能很清晰的看到switch (header.getOperation()) {case ADD_ENTRY:processAddRequestV3(r, requestHandler);break;case READ_ENTRY://在这里可以处理的读取请求从这里进去看看processReadRequestV3(r, requestHandler);break;case FORCE_LEDGER:processForceLedgerRequestV3(r, requestHandler);break;....case WRITE_LAC:processWriteLacRequestV3(r, requestHandler);break;case READ_LAC:processReadLacRequestV3(r, requestHandler);break;case GET_BOOKIE_INFO:processGetBookieInfoRequestV3(r, requestHandler);break;case START_TLS:processStartTLSRequestV3(r, requestHandler);break;case GET_LIST_OF_ENTRIES_OF_LEDGER:processGetListOfEntriesOfLedgerProcessorV3(r, requestHandler);break;default:....break;}} finally {MDC.clear();}} else {....}}private void processReadRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {//能看到BK也同时支持长轮询的方式读取数据if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {....read new LongPollReadEntryProcessorV3(r, requestHandler, this, fenceThread,lpThread, requestTimer);} else {read new ReadEntryProcessorV3(r, requestHandler, this, fenceThread);....}if (null threadPool) {//跟进去看看实现read.run();} else {....}}public void run() {....//执行读取操作executeOp();}protected void executeOp() {//这里感觉设计得不是很清晰应该先读取数据出来再构造返回对象的你们觉得呢ReadResponse readResponse getReadResponse();if (null ! readResponse) {sendResponse(readResponse);}}protected ReadResponse getReadResponse() {// 读取Entry数据的地方在此处深入探索下return readEntry(readResponse, entryId, startTimeSw);} catch (Bookie.NoLedgerException e) {....}}protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,long entryId,Stopwatch startTimeSw)throws IOException, BookieException {//继续深入return readEntry(readResponseBuilder, entryId, false, startTimeSw);}protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,long entryId,boolean readLACPiggyBack,Stopwatch startTimeSw)throws IOException, BookieException {//调用Bookie的readEntry来读取数据ByteBuf entryBody requestProcessor.getBookie().readEntry(ledgerId, entryId);....}public ByteBuf readEntry(long ledgerId, long entryId)throws IOException, NoLedgerException, BookieException {....try {LedgerDescriptor handle handles.getReadOnlyHandle(ledgerId);....//调用真正读数据的逻辑因为在这里能看到获取的值entry也是对外返回的//这里调用的是LedgerDescriptor类的readEntry方法ByteBuf entry handle.readEntry(entryId);....return entry;} finally {....}}ByteBuf readEntry(long entryId) throws IOException, BookieException {//调用LedgerStorage接口SingleDirectoryDbLedgerStorage实现类来读取Entryreturn ledgerStorage.getEntry(ledgerId, entryId);}public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException {long startTime MathUtils.nowInNano();try {//继续往下跟踪ByteBuf entry doGetEntry(ledgerId, entryId);recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);return entry;} catch (IOException e) {....}}private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, BookieException {....//尝试从BK本地缓存中读取数据ByteBuf entry localWriteCache.get(ledgerId, entryId);//尝试从本地缓存flush中进行数据命中数据entry localWriteCacheBeingFlushed.get(ledgerId, entryId);// 尝试从读缓存中进行数据读取entry readCache.get(ledgerId, entryId);//从磁盘文件中进行数据读取, 调用的是EntryLogger接口DefaultEntryLogger对象的readEntry方法entry entryLogger.readEntry(ledgerId, entryId, entryLocation);//写到读缓存中readCache.put(ledgerId, entryId, entry);....return entry;}public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)throws IOException, Bookie.NoEntryException {//再进一步探索return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */);}private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)throws IOException, Bookie.NoEntryException {//获取entry所在的LogIdlong entryLogId logIdForOffset(location);long pos posForOffset(location);BufferedReadChannel fc null;int entrySize -1;try {fc getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);ByteBuf sizeBuff readEntrySize(ledgerId, entryId, entryLogId, pos, fc);entrySize sizeBuff.getInt(0);if (validateEntry) {validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff);}} catch (EntryLookupException e) {....}ByteBuf data allocator.buffer(entrySize, entrySize);//进行数据读取int rc readFromLogChannel(entryLogId, fc, data, pos);....data.writerIndex(entrySize);return data;}private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos)throws IOException {BufferedLogChannel bc entryLogManager.getCurrentLogIfPresent(entryLogId);....//继续往下return channel.read(buff, pos);}public int read(ByteBuf dest, long pos) throws IOException {//继续往下return read(dest, pos, dest.writableBytes());}public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {....while (length 0) {// Check if the data is in the buffer, if so, copy it.if (readBufferStartPosition currentPosition currentPosition readBufferStartPosition readBuffer.readableBytes()) {int posInBuffer (int) (currentPosition - readBufferStartPosition);int bytesToCopy Math.min(length, readBuffer.readableBytes() - posInBuffer);dest.writeBytes(readBuffer, posInBuffer, bytesToCopy);currentPosition bytesToCopy;length - bytesToCopy;cacheHitCount;} else {// We dont have it in the buffer, so put necessary data in the bufferreadBufferStartPosition currentPosition;int readBytes 0;//从磁盘读取数据到readBuffer中再将readBuffer的数据写到 dest中作为返回值//这里调用的是Java NIO FileChannel类的read方法来从磁盘进行数据的读取if ((readBytes validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity),currentPosition)) 0) {throw new IOException(Reading from filechannel returned a non-positive value. Short read.);}readBuffer.writerIndex(readBytes);}}return (int) (currentPosition - pos);}
总结
在这里解答下引言小故事 张三是通过什么规则被选成“监督者”的 张三是通过zookeeper的Paxos算法选举产生的 如果张三也不辞而别呢 大狗和二狗也会通过zookeeper监听张三的状态如果张三不辞而别的话大狗二狗会通过zookeeper选举成为新的“监督者” 为啥要通过签到本的方式而不是张三直接去挨个挨个看 通过签到本的方式比较节约张三的时间否则当员工比较多的时候并且对感知时间比较快的时候张三就要每隔几分钟就要跑去挨个挨个看这样没多久张三也要“不辞而别”了。通过签到本如果某个同事不签到了张三就能很轻松感知到并做相对应的处理了
参考资料
https://bookkeeper.apache.org/docs/admin/autorecovery/bk项目 site3/website/docs/admin/* 指令使用说明