当前位置: 首页 > news >正文

建站公司的服务内容杭州房产信息网官网

建站公司的服务内容,杭州房产信息网官网,seo产品推广,网站开发使用软件有哪些队列这种数据结构都不陌生#xff0c;特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能#xff0c;这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点消费者watcher监听节点新增事件来消费消息。 生产者 CuratorFramework client ... client.start(); String path /testqueue; client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,11.getBytes()) 消费者 CuratorFramework client ... client.start(); String path /testqueue; PathChildrenCache pathCache new PathChildrenCache(client,path,true); pathCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType() PathChildrenCacheEvent.Type.CHILD_ADDED){ChildData data event.getData();//handle msgclient.delete().forPath(data.getPath());}} }); pathCache.start();使用curator queue 先来使用基本的队列类DistributedQueue。 DistributedQueue的初始化需要提交准备几个参数 client连接就不多说了: CuratorFramework client ...QueueSerializer这个主要是用来指定对消息data进行序列化和反序列化 这里就搞一个简单的字符串类型 QueueSerializerString serializer new QueueSerializerString() {Overridepublic byte[] serialize(String item) {return item.getBytes();}Overridepublic String deserialize(byte[] bytes) {return new String(bytes);} };QueueConsumer消息consumer当有新消息来的时候会调用consumer.consumeMessage()来处理消息 这里也搞个简单的string类型的处理consumer QueueConsumerString consumer new QueueConsumerString() {Overridepublic void consumeMessage(String s) throws Exception {System.out.println(receive msg:s);}Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {//TODO} };队列消息发布 //队列节点路径 String queuePath /queue; //使用上面准备的几个参数构造DistributedQueue对象 DistributedQueueString queue QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue(); queue.start(); //调用put方法生产消息 queue.put(hello); queue.put(msg); Thread.sleep(2000); queue.put(3);这样在启动测试程序在consumer的consumeMessage方法就会收到queue.put的消息。 这里有个问题有没有发现在初始化queue的时候需要指定consumer那岂不是只能同一个程序中生产消费何来的分布式 其实这里在queue对象创建的时候consumer可以为null这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。 在DistributedQueue类的构造函数有一步设置isProducerOnly属性 isProducerOnly (consumer null);然后在start()方法会根据isProducerOnly来判断启动方式 if ( !isProducerOnly || (maxItems ! QueueBuilder.NOT_SET) ) {childrenCache.start(); }if ( !isProducerOnly ) {service.submit(new CallableObject(){Overridepublic Object call(){runLoop();return null;}}); }这里看到consumer为空两个if不成立不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。 源码分析 先从消息的发布也就是put方法 首先调用makeItemPath()获取创建节点路径 ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);这里QUEUE_ITEM_NAME“queue-”。 然后调用internalPut()方法来创建节点路径 //先累加消息数量putCount putCount.incrementAndGet(); //使用serializer序列化消息数据 byte[] bytes ItemSerializer.serialize(multiItem, serializer); //根据background来创建节点 if ( putInBackground ) {doPutInBackground(item, path, givenMultiItem, bytes); } else {doPutInForeground(item, path, givenMultiItem, bytes); }看doPutInForeground里就是具体的创建节点了 //创建节点 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes); //哦错了这里putCount不是总消息数是正在创建消息数创建完再回减 synchronized(putCount) {putCount.decrementAndGet();putCount.notifyAll(); }//如果有对应的lisener依次调用 putListenerContainer.forEach(listener - {if ( item ! null ){listener.putCompleted(item);}else{listener.putMultiCompleted(givenMultiItem);} });消息的发布就完成了。 然后是消息的consumer这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作 1、childrenCache.start(); childrenCache初始化是在queue的构造函数里 childrenCache new ChildrenCache(client, queuePath)其start方法会调用 private final CuratorWatcher watcher new CuratorWatcher() {Overridepublic void process(WatchedEvent event) throws Exception{if ( !isClosed.get() ){sync(true);}} };private final BackgroundCallback callback new BackgroundCallback(){Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() KeeperException.Code.OK.intValue() ){setNewChildren(event.getChildren());}}};void start() throws Exception{sync(true);}private synchronized void sync(boolean watched) throws Exception{if ( watched ){//走这里client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);}else{client.getChildren().inBackground(callback).forPath(path);}} 这里先把代码都贴上看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理最后是调用到setNewChildren方法 private synchronized void setNewChildren(ListString newChildren) {if ( newChildren ! null ){Data currentData children.get();//将数据设置到children变量里消息版本1children.set(new Data(newChildren, currentData.version 1));//notifyAll() 等待线程获取消息notifyFromCallback();} }这里有引入了一个children变量然后将数据设置到了该变量里。 private final AtomicReferenceData children new AtomicReferenceData(new Data(Lists.StringnewArrayList(), 0));children其实是线程间通信一个共享数据容器变量。这里设置了数据然后具体的数据消费在下一步。 2、线程池里丢了个任务去执行runLoop();方法。 回到DistributedQueue.start的第二步执行runLoop()方法看名字就应该知道了一直轮询获取消息。 还是来看代码吧 private void runLoop() {long currentVersion -1;long maxWaitMs -1;//while一直轮询while ( state.get() State.STARTED ){try{//从childrenCache里获取数据ChildrenCache.Data data (maxWaitMs 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);currentVersion data.version;ListString children Lists.newArrayList(data.children);sortChildren(children); // makes sure items are processed in the correct orderif ( children.size() 0 ){maxWaitMs getDelay(children.get(0));if ( maxWaitMs 0 ){continue;}}else{continue;}/**处理数据 这里取出消息后会删除节点然后使用serializer反序列化节点数据调用consumer.consumeMessage来处理消息**/processChildren(children, currentVersion);}}} }这里获取数据使用了childrenCache.blockingNextGetData synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException {long startMs System.currentTimeMillis();boolean hasMaxWait (unit ! null);long maxWaitMs hasMaxWait ? unit.toMillis(maxWait) : -1;//数据版本没变一直wait等待while ( startVersion children.get().version ){if ( hasMaxWait ){long elapsedMs System.currentTimeMillis() - startMs;long thisWaitMs maxWaitMs - elapsedMs;if ( thisWaitMs 0 ){break;}wait(thisWaitMs);}else{wait();}}return children.get(); }这里就有wait阻塞等消息当消息来时候会被唤醒。 其它类型队列 curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现有兴趣的自己看吧。
http://www.hkea.cn/news/14589853/

相关文章:

  • 如何建设和优化一个网站步骤基于h5的企业网站建设
  • 安丘市建设局网站网站空间就是主机吗
  • wordpress 微信导航站网站开发概要设计
  • 自己做网站费用wordpress 面包屑导航代码
  • 网站优化服务合同建网站能赚钱吗
  • 网站 not found链接式友谊
  • 做新媒体的小说网站建站程序下载
  • 网站建设php怎么安装好看的论坛网站模板
  • 个人网站需要多大空间市场调研报告包括哪些内容
  • 有没有好的网站可以学做头发网站利用e4a做app
  • 一件代发48个货源网站廊坊网站建设哪家权威
  • 企业网站站内优化嵌入式转行到网站开发
  • 广州制作网站的公司域名注册商平台
  • 搭建一个网站需要哪些技术前端 模板 网站
  • 怎么做网站下单wordpress mysql瓶颈
  • 重庆工程建设招标网官方网站微信开发者中心
  • 找素材的网站大全做网站seo的公司
  • 学校门户网站建设报告做好公司网站
  • 网站建设项目外包大冶市建设部门网站
  • 长春网站建设4435考证培训机构
  • 手机网站设计理念网站设计包含哪些技术
  • 一一影视网站源码app推广拉新公司
  • 单页式网站系统煎蛋wordpress模板
  • 网站经营性备案哪个网站可以做加工
  • 淘宝上做进出口网站有哪些好的seo公司
  • 深圳建设网官方网站杭州python做网站
  • 电商网站建设步骤psd 下载网站
  • 网站建设汇报稿网站建设与管理的实训
  • 档案网站建设规范有哪些sem竞价代运营公司
  • 公司网站怎么维护做遗嘱的网站有哪些