当前位置: 首页 > news >正文

seo查询网站是什么上海专业网站建设机构

seo查询网站是什么,上海专业网站建设机构,深圳市建设注册执业资格中心网站,拍摄微电影公司前言 ThreadPoolTaskExecutor是Spring框架提供的一个线程池实现#xff0c;它是对Java标准库中ThreadPoolExecutor的封装#xff0c;提供了更便捷的配置和集成方式#xff0c;特别适合在Spring环境中使用。相关线程池概念见线程线程池相关 CompletableFuture 是 Java…前言 ThreadPoolTaskExecutor是Spring框架提供的一个线程池实现它是对Java标准库中ThreadPoolExecutor的封装提供了更便捷的配置和集成方式特别适合在Spring环境中使用。相关线程池概念见线程线程池相关 CompletableFuture 是 Java 8 引入的异步编程工具实现了 Future 和CompletionStage 接口。它不仅提供了异步任务执行能力还支持强大的函数式编程风格允许开发者以声明式方式组合多个异步操作处理复杂的异步编程场景。相关概念及API使用 功能描述 创建了一个ThreadPoolTaskExecutor的管理类用于监控线程池状态、动态调整线程池配置定义线程池注册为Spring Bean创建基于分页查询和同步的CompletableFuture异步任务使用自定义的核心线程池提交任务最终主线程获取异步结果也可以引申主线程返回任务执行中记录任务ID主动获取任务执行结果通知主线程实现页面操作非阻塞性。 代码示例 用于记录线程池状态和调整线程池参数的实体类 package gov.zwfw.iam.uc.threadpoolconfig;import lombok.AllArgsConstructor; import lombok.Data;Data AllArgsConstructor public class ThreadPoolStatusPo {private String poolName;private int corePoolSize;private int maxPoolSize;private int currentPoolSize;private int activeCount;private int largestPoolSize;private long taskCount;private long completedTaskCount;private int queueSize;private int queueRemainingCapacity;private int queueCapacity;private int keepAliveSeconds;private String rejectedHandlerType;private String threadNamePrefix;private String queueName; } package gov.zwfw.iam.uc.threadpoolconfig;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.util.concurrent.RejectedExecutionHandler;/*** 用于动态调整线程池配置的实体类* 包括核心线程数、最大线程数、队列大小、拒绝策略、线程存活时间*/ Data NoArgsConstructor AllArgsConstructor public class ThreadPoolJudgePo {private int corePoolSize;private int maxPoolSize;private int keepAliveSeconds;private int queueCapacity;private RejectedExecutionHandler rejectedExecutionHandler; } 用于监控线程池状态和调整线程池参数的管理类 package gov.zwfw.iam.uc.threadpoolconfig;import org.apache.tomcat.util.threads.TaskQueue; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor;Component public class ThreadPoolManager {//存储所有注册的线程池private static final MapString, ThreadPoolTaskExecutor threadPoolMap new ConcurrentHashMap();//存储线程池原始配置用于重置private static final MapString, ThreadPoolJudgePo originalConfigMap new ConcurrentHashMap();/*** 注册线程池* param poolName* param threadPoolTaskExecutor* param threadPoolJudgePo*/public void registerThreadPool(String poolName, ThreadPoolTaskExecutor threadPoolTaskExecutor, ThreadPoolJudgePo threadPoolJudgePo){threadPoolMap.put(poolName,threadPoolTaskExecutor);originalConfigMap.put(poolName, threadPoolJudgePo);}/*** 获取所有线程池状态* return*/public MapString,ThreadPoolStatusPo getAllThreadPoolStatus(){MapString,ThreadPoolStatusPo statusMap new HashMap();threadPoolMap.forEach((name,executor)-{statusMap.put(name,getThreadPoolStatus(name,executor));});return statusMap;}/*** 获取单个线程池状态* param name* return*/public ThreadPoolStatusPo getSingleThreadPoolStatus(String name){ThreadPoolTaskExecutor threadPoolTaskExecutor threadPoolMap.get(name);return getThreadPoolStatus(name,threadPoolTaskExecutor);}/*** 问题为什么有的属性从executorThreadPoolTaskExecutor获取有的从threadPoolTaskExecutorThreadPoolExecutor获取** 原因分析** ThreadPoolTaskExecutor是Spring对Java原生ThreadPoolExecutor的包装它提供了一些额外的配置和功能同时内部持有一个ThreadPoolExecutor实例。* 线程池的核心状态如核心线程数、最大线程数、当前线程数、活跃线程数、历史最大线程数、任务总数、已完成任务数、队列大小等都是ThreadPoolExecutor原生提供的所以直接从ThreadPoolExecutor实例获取。* 但是Spring的ThreadPoolTaskExecutor在配置线程池时有一些属性是它自己扩展的或者需要从它那里获取配置值例如* keepAliveSeconds在ThreadPoolExecutor中存活时间是通过getKeepAliveTime(TimeUnit)方法获取的但是需要转换单位。而Spring的ThreadPoolTaskExecutor直接提供了getKeepAliveSeconds()方法返回的是以秒为单位的值这样更方便。* threadNamePrefix这个前缀是Spring的ThreadPoolTaskExecutor在创建线程工厂时使用的用于设置线程的名称前缀ThreadPoolExecutor本身没有提供直接获取线程名称前缀的方法所以只能从ThreadPoolTaskExecutor获取。* 另外拒绝策略的处理ThreadPoolExecutor提供了getRejectedExecutionHandler()方法可以获取到拒绝策略处理器然后通过getClass().getName()得到其类名。这里没有使用Spring的包装因为拒绝策略处理器是直接设置在底层的ThreadPoolExecutor上的。* 因此总结如下** 大多数运行时状态动态的都是从ThreadPoolExecutor即threadPoolTaskExecutor中获取。* 而一些配置信息特别是Spring包装后提供的配置如keepAliveSeconds和threadNamePrefix则从ThreadPoolTaskExecutor即executor中获取。* 注意代码中有一个属性是queueCapacity队列总容量它是通过queue.size() queue.remainingCapacity()计算得到的因为队列的剩余容量加上当前已使用的容量就是总容量。** 所以这样的设计是合理的充分利用了Spring的ThreadPoolTaskExecutor提供的便捷方法同时也直接使用原生的ThreadPoolExecutor来获取运行时指标。** 但是这里有一个潜在的问题Spring的ThreadPoolTaskExecutor的getKeepAliveSeconds()返回的是配置的存活时间秒而实际上ThreadPoolExecutor内部是以纳秒为单位保存的。不过由于我们在配置时也是以秒为单位所以这里获取的值是一致的。** 另外关于拒绝策略这里获取的是处理器的类名这样我们可以知道具体是哪种拒绝策略。* param name* param executor* return*/private ThreadPoolStatusPo getThreadPoolStatus(String name, ThreadPoolTaskExecutor executor) {ThreadPoolExecutor threadPoolTaskExecutor executor.getThreadPoolExecutor();return new ThreadPoolStatusPo(name,threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaximumPoolSize(),threadPoolTaskExecutor.getPoolSize(),threadPoolTaskExecutor.getActiveCount(),threadPoolTaskExecutor.getLargestPoolSize(),threadPoolTaskExecutor.getTaskCount(),threadPoolTaskExecutor.getCompletedTaskCount(),threadPoolTaskExecutor.getQueue().size(),threadPoolTaskExecutor.getQueue().remainingCapacity(),threadPoolTaskExecutor.getQueue().size() threadPoolTaskExecutor.getQueue().remainingCapacity(),executor.getKeepAliveSeconds(),threadPoolTaskExecutor.getRejectedExecutionHandler().getClass().getName(),executor.getThreadNamePrefix(),executor.getThreadPoolExecutor().getQueue().getClass().getName());}/*** 动态调整线程池* param name* param corePoolSize* param maxPoolSize* param queueCapacity*/public void adjustThreadPool(String name,Integer corePoolSize,Integer maxPoolSize,Integer queueCapacity){ThreadPoolTaskExecutor executor threadPoolMap.get(name);if(null executor){throw new RuntimeException(name线程池不存在);}ThreadPoolExecutor threadPoolExecutor executor.getThreadPoolExecutor();//调整核心线程数if(null ! corePoolSize corePoolSize 0){threadPoolExecutor.setCorePoolSize(corePoolSize);}//调整最大线程数if(null ! maxPoolSize maxPoolSize 0){threadPoolExecutor.setMaximumPoolSize(maxPoolSize);}//调整队列容量if(null ! queueCapacity queueCapacity 0){//在Spring的ThreadPoolTaskExecutor中我们设置队列容量时它实际上创建的就是TaskQueue//考虑使用Spring提供的setQueueCapacity方法通过ThreadPoolTaskExecutor对象这样更安全。但是这个方法会重新设置队列容量但不会改变队列实例因为内部会调用TaskQueue.setCapacity如果队列是TaskQueue的话//所以我们可以直接调用executor.setQueueCapacity(queueCapacity)来实现executor.setQueueCapacity(queueCapacity);}}/*** 重置线程池* param name*/public void resetThreadPool(String name){ThreadPoolJudgePo threadPoolJudgePo originalConfigMap.get(name);if(null threadPoolJudgePo){throw new RuntimeException(name线程池初始化配置不存在);}adjustThreadPool(name,threadPoolJudgePo.getCorePoolSize(),threadPoolJudgePo.getMaxPoolSize(),threadPoolJudgePo.getQueueCapacity());}} 用于SpringBoot项目注册线程池Bean的配置类 package gov.zwfw.iam.uc.threadpoolconfig;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor;/*** 线程池配置* 用于执行异步任务*/ Configuration EnableAsync public class ThreadPoolConfig {AutowiredThreadPoolManager threadPoolManager;/*** 核心线程池配置* return*/Bean(name coreTaskExecutor)public Executor coreTaskExecutor(){ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();//核心线程数executor.setCorePoolSize(10);//最大线程数executor.setMaxPoolSize(20);//队列容量在创建ThreadPoolExecutor时如果队列是LinkedBlockingQueue且queueCapacity0则将其替换为TaskQueue。executor.setQueueCapacity(500);//空闲线程存活时间executor.setKeepAliveSeconds(60);//拒绝策略使用调用者运行策略也可以自定义策略以增强可用性这里只是简单推送人员信息量不是特别大没必要费劲executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//线程名前缀executor.setThreadNamePrefix(coreTaskExecutor-);//优雅停机配置//等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);//等待终止时间executor.setAwaitTerminationSeconds(60);executor.initialize();//注册到监控threadPoolManager.registerThreadPool(coreTaskExecutor,executor,new ThreadPoolJudgePo(executor.getCorePoolSize(),executor.getMaxPoolSize(),executor.getKeepAliveSeconds(),executor.getThreadPoolExecutor().getQueue().size(),executor.getThreadPoolExecutor().getRejectedExecutionHandler()));System.out.println(executor.getThreadPoolExecutor().getQueue().getClass().getName());return executor;}Bean(name commonTaskExecutor)public Executor commonTaskExecutor(){ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();//核心线程数executor.setCorePoolSize(5);//最大线程数executor.setMaxPoolSize(10);//队列容量executor.setQueueCapacity(100);//空闲线程存活时间executor.setKeepAliveSeconds(120);//拒绝策略抛弃策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());executor.setThreadNamePrefix(commonTaskExecutor-);executor.initialize();return executor;} } 用于获取线程池状态和调整线程池配置的控制类 package gov.zwfw.iam.uc.threadpoolconfig;import com.alibaba.fastjson.JSONObject; import gov.zwfw.iam.base.util.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors;RestController RequestMapping(/pool) public class ThreadPoolManageController {AutowiredThreadPoolManager threadPoolManager;RequestMapping(value /getStatus,method RequestMethod.GET)public String getStatus(RequestParam(value name,required false) String name){if(StringUtils.isEmpty(name)){ListStringresult new ArrayList();MapString, ThreadPoolStatusPo allThreadPoolStatus threadPoolManager.getAllThreadPoolStatus();allThreadPoolStatus.keySet().forEach(key-{ThreadPoolStatusPo threadPoolStatusPo allThreadPoolStatus.get(key);String s key:JSONObject.toJSONString(threadPoolStatusPo);result.add(s);});return result.stream().collect(Collectors.joining(\n));}else{ThreadPoolStatusPo singleThreadPoolStatus threadPoolManager.getSingleThreadPoolStatus(name);return name:JSONObject.toJSONString(singleThreadPoolStatus);}}RequestMapping(value /adjust,method RequestMethod.POST)public String adjust(RequestParam(value name) String name,RequestParam(value corePoolSize,required false)Integer corePoolSize,RequestParam(value maxPoolSize,required false)Integer maxPoolSize,RequestParam(value queueCapacity,required false)Integer queueCapacity){threadPoolManager.adjustThreadPool(name,corePoolSize,maxPoolSize,queueCapacity);return 调整成功;}RequestMapping(value /reset,method RequestMethod.POST)public String reset(RequestParam(value name) String name){threadPoolManager.resetThreadPool(name);return 重置成功;} } 提交异步任务的业务实现这里是分页查询用户信息同步到第三方平台首先是页面点击实现的全量同步和失败重试接口 Value(${batch.push.size:5})private int batchSize;RequestMapping(/syncUser)ResponseBodypublic Result syncUser(RequestParam String resId) {Result result new Result();//校验当天是否存在失败未重试任务String key FAIL_INDEX_resId_new SimpleDateFormat(yyyyMMdd).format(new Date());if(CodisUtil.lLen(key)0){result.setCode(-5);result.setMsg(存在失败未重试任务请点击失败重试按钮处理);return result;}String resUrl ;try{resUrl staffApi.selectUserSyncUrl(resId);if(gov.zwfw.iam.base.util.StringUtils.isEmpty(resUrl)){result.setCode(-1);result.setMsg(同步地址为空);return result;}logger.info(同步地址{}, resUrl);//分页查询用户信息分批推送int userNum staffApi.countUser();long startTime System.nanoTime();CompletableFutureJSONObject jsonObject staffApi.resolveTask(resId,resUrl,userNum,batchSize, null);JSONObject futureJson jsonObject.get();long endTime System.nanoTime();logger.info(同步耗时{}纳秒, (endTime-startTime));result.setCode(futureJson.getString(code));result.setMsg(futureJson.getString(msg));}catch (Exception e){logger.error(同步失败同步地址{}失败原因{}, resUrl, e.getMessage());result.setCode(-1);result.setMsg(e.getMessage());}return result;}RequestMapping(/syncUserFail)ResponseBodypublic Result syncUserFail(RequestParam String resId) {Result result new Result();String key FAIL_INDEX_resId_new SimpleDateFormat(yyyyMMdd).format(new Date());if(CodisUtil.lLen(key)0){String resUrl ;try{resUrl staffApi.selectUserSyncUrl(resId);if(gov.zwfw.iam.base.util.StringUtils.isEmpty(resUrl)){result.setCode(-1);result.setMsg(同步地址为空);return result;}logger.info(同步地址{}, resUrl);ListString failIndexList CodisUtil.lRange(key,0,-1);CodisUtil.delKey(key);long startTime System.nanoTime();CompletableFutureJSONObject jsonObject staffApi.resolveTask(resId,resUrl,0,batchSize,failIndexList);JSONObject futureJson jsonObject.get();long endTime System.nanoTime();logger.info(同步耗时{}纳秒, (endTime-startTime));result.setCode(futureJson.getString(code));result.setMsg(futureJson.getString(msg));}catch (Exception e){logger.error(同步失败同步地址{}失败原因{}, resUrl, e.getMessage());result.setCode(-1);result.setMsg(e.getMessage());}}else{result.setCode(-6);result.setMsg(不存在失败未重试任务);}return result;}然后是真正实现分页任务以及提交执行的核心类 Resource(name coreTaskExecutor)private Executor coreTaskExecutor;Async(coreTaskExecutor)public CompletableFutureJSONObject resolveTask(String resId, String resUrl, int total, int batchSize, ListString failIndexList) {//1、分页任务列表ListCompletableFutureString futures new ArrayList();//失败的下表存储到redis的key使用list类型String key FAIL_INDEX_resId_new SimpleDateFormat(yyyyMMdd).format(new Date());//2、如果是全量推送计算起始位置执行分页查询并推送如果是失败数据推送那么使用失败的下表分页查询并推送if(total !0 null failIndexList){for(int i0;itotal;ibatchSize){int startIndex i;CompletableFutureString future getFuture(startIndex, batchSize, resUrl, key);futures.add(future);}}else{for (int i 0; i failIndexList.size(); i) {int startIndex Integer.parseInt(failIndexList.get(i));CompletableFutureString future getFuture(startIndex, batchSize, resUrl, key);futures.add(future);}}//5、等待所有任务执行完成并处理结果CompletableFutureVoid allFutures CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));return allFutures.thenApply(r-{ListString results futures.stream().map(CompletableFuture::join).collect(Collectors.toList());//6、构建响应信息JSONObject resultJson new JSONObject();int failCount Math.toIntExact(results.stream().filter(result -{JSONObject jsonObject JSONObject.parseObject(result);if(jsonObject.containsKey(startIndex)){String startIndex jsonObject.getString(startIndex);CodisUtil.lPush(key,startIndex);logger.error(失败index{},startIndex);}return !jsonObject.getString(code).equals(0);}).count());resultJson.put(code,failCount0?-1:0);resultJson.put(msg,failCount0?部分数据推送失败请点击失败重试按钮重新推送:推送成功);CodisUtil.expireKey(key,60*60*24);return resultJson;});}public int countUser() {return govStaffService.countUser();}public CompletableFutureString getFuture(int startIndex, int batchSize, String resUrl, String key){CompletableFutureString future CompletableFuture.supplyAsync(() - {try{//3、分页查询ListUcGovStaff list govStaffService.selectListByPage(startIndex, batchSize);logger.info(查询到第(startIndex/batchSize1)页数据数量为{}, list.size());String syncRes ;if(null list || list.isEmpty()){JSONObject jsonObject new JSONObject();jsonObject.put(code,-2);jsonObject.put(msg,推送数据为空);return jsonObject.toJSONString();}//4、执行推送任务syncRes govStaffService.syncUser(startIndex,list, resUrl);return syncRes;}catch (Exception e){logger.error(分页任务异常{},e.getMessage());JSONObject jsonObject new JSONObject();jsonObject.put(code,-3);jsonObject.put(msg,任务执行失败);CodisUtil.lPush(key,String.valueOf(startIndex));CodisUtil.expireKey(key,60*60*24);return jsonObject.toJSONString();}},coreTaskExecutor);return future;}public String syncUser(int startIndex, ListUcGovStaff list, String resUrl) {String data JSON.toJSONString(list);JSONObject jsonObject new JSONObject();jsonObject.put(data, data);String s ;try {s WebUtils.doPost(resUrl,jsonObject);jsonObject JSONObject.parseObject(s);if(!0.equals(jsonObject.getString(code))){jsonObject.put(startIndex,startIndex);}s jsonObject.toJSONString();} catch (IOException e) {logger.error(同步人员异常{},e.getMessage());jsonObject new JSONObject();jsonObject.put(code,-1);jsonObject.put(msg,网络请求异常);jsonObject.put(startIndex,startIndex);s jsonObject.toJSONString();}return s;}注意使用异步任务一定要在启动类添加EnableAsync注解同时真正执行异步任务的方法上添加Async(coreTaskExecutor)注解注解里的参数对应的是提交任务的线程池名称。下面是获取线程池状态以及调整线程池配置的示例 总结 这次业务实现了基于ThreadPoolTaskExecutorCompletableFuture的数据推送业务可以引申为其他的多线程异步任务实现实现了全量数据推送和失败重试机制对于处理大批量任务很有帮助由于业务中主线程和异步任务是同步实现的因此会阻塞主线程直至异步任务执行完成如果要实现主线程同步返回异步执行后续任务只需要Async注解提交resolveTask任务即可。
http://www.hkea.cn/news/14471931/

相关文章:

  • 龙岗互联网公司北京seo薪资
  • 做一个信息发布网站要多少钱苏州商动力网络科技有限公司
  • 网站怎么做动态图施工效果图怎么做
  • 做二手货车网站公司wordpress影院插件
  • 有名的网站建设电商网站开发平台实验
  • 做装饰公司网站在线教育培训系统
  • 可以用腾讯企业邮箱域名做网站嘉兴网站建设企业
  • 江苏盐城建筑公司网站网站建设优化去哪学
  • 做餐饮培训网站广告区块链网站建设
  • 网站建设的一般要素网上购物网站开发的背景
  • 蒙古文网站建设网站结构组成部分有那些
  • 有没有建网站的app学会网站建设的重要性
  • 汽车网站建设代理加盟包站长ppt网站
  • 建设网站多少钱坪山新区城市建设局网站
  • 厂字形网页布局网站莱芜租房
  • 北京建设制作网站c2c网站代表有哪些
  • 界面设计网站微信的企业网站模板
  • 二级区域网站名工厂生产管理app
  • 河南省网站建设哪家好创建网站免费
  • 张家界建设局网站电话网店模板图片
  • 制作模板网站报价建站平台选择建议
  • 东莞高端商城网站建设怎么看网站空间
  • 在成都如何找到做网站的公司嘉兴网站建设费用
  • 网站建设与运营 市场分析短链接生成官网
  • 做网站认证违法吗福田网站建设运营费用
  • 沧州网站建设公司电话电子商务网站开发开发背景
  • 求个没封的a站yw1129cm居然设计家官网
  • 两个公司的网站建设建设工程合同管理的主要内容
  • 做淘宝客如何建立网站重庆网站建设运营
  • 公司网站怎样备案网站关键字语法