当前位置: 首页 > news >正文

精通网站建设工资多少沪江博客wordpress模板

精通网站建设工资多少,沪江博客wordpress模板,地方建立网站做SEM,设计中国第一架飞机的人是zeebe actor 模型#x1f64b;‍♂️ 如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法#xff0c;那么这篇文章就围绕actor.run 方法#xff0c;说说zeebe actor 的模型。 环境⛅ zeebe release-8.1.14 actor.run() 是怎么开始的#x1f308; Lon…zeebe actor 模型‍♂️ 如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法那么这篇文章就围绕actor.run 方法说说zeebe actor 的模型。 环境⛅ zeebe release-8.1.14 actor.run() 是怎么开始的 LongPollingActivateJobsHandler.java 以LongPollingActivateJobsHandler 的激活任务方法为例我们可以看到run 方法实际上执行ActorControl类的run 方法让我们进到run 方法中。 private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() - {final InFlightLongPollingActivateJobsRequestsState state getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}ActorControl 可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中对于job 的执行要等到队列不断被线程pop 到当前job 的时候。 final ActorTask task;Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread ActorThread.current();if (currentActorThread ! null currentActorThread.getCurrentTask() task) {final ActorJob newJob currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}job 是怎么被执行的⚡ 并不是任意一个ActorControl 都可以执行run 方法的按照上图所示Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task每个task 中又会有多个job按照策略选取不同的job 执行我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。 Gateway.java gateway 注册task private CompletableFutureActivateJobsHandler submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future new CompletableFutureActivateJobsHandler();final var actor Actor.newActor().name(ActivateJobsHandler).actorStartedHandler(handler.andThen(t - future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}ActorThreadGroup.java 就是上面提到的“线程池”负责初始化每一条ActorThread 线程并为其分配默认的WorkStealingGroup protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器初始化每条线程并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName groupName;this.numOfThreads numOfThreads;tasks new WorkStealingGroup(numOfThreads);threads new ActorThread[numOfThreads];for (int t 0; t numOfThreads; t) {final String threadName String.format(%s-%d, groupName, t);final ActorThread thread builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}ActorThread.java ActorThread 继承自Thread可以看到startrundoWork 的引用流程在doWork 方法中首先从taskScheduler 中获取当前task然后执行当前task // 继承自Thread Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException(Cannot start runner, not in state NEW.);}}// 主要执行doWork 方法Overridepublic void run() {idleStrategy.init();while (state ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error(Unexpected error occurred while in the actor thread {}, getName(), e);}}state ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask taskScheduler.getNextTask();if (currentTask ! null) {final var actorName currentTask.actor.getName();try (final var timer actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit false;try {// 真正执行当前任务resubmit currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error(Unexpected error occurred in task {}, currentTask, e);} finally {MDC.remove(actor-name);clock.update();}if (resubmit) {currentTask.resubmit();}}ActorTask.java ActorTask 的执行流程它会不断从订阅的列表中拉取jobpoll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务那么currentJob ! null 会短路返回true而不进行poll()从这里可以看到submittedJobs 和fastlaneJobs 的区别 找到job 后开始执行当前job public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit false;// 不断从订阅的列表中拉取jobpoll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务那么currentJob ! null 会短路返回true而不进行poll()while (!resubmit (currentJob ! null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED - {final ActorJob terminatedJob currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription terminatedJob.getSubscription();// 如果订阅是一次性的那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED -// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit true;default - {}}if (shouldYield) {shouldYield false;resubmit currentJob ! null;break;}}if (currentJob null) {resubmit onAllJobsDone();}return resubmit;}private boolean poll() {boolean result false;result | pollSubmittedJobs();result | pollSubscriptions();return result;} ActorJob.java ActorJob 的执行逻辑 还记得上面说过ActorJob 可以理解为runnable 的吗在invoke 中ActorJob 中的runnable 真正执行了至此job 的执行过程结束 void execute(final ActorThread runner) {actorThread runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture ! null) {resultFuture.complete(invocationResult);resultFuture null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread null;// 无论那种情况成功或者失败都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable null) {schedulingState TaskSchedulingState.TERMINATED;} else {schedulingState TaskSchedulingState.QUEUED;scheduledAt System.nanoTime();}}}private void invoke() throws Exception {if (callable ! null) {invocationResult callable.call();} else {// only tasks triggered by a subscription can yield; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r runnable;runnable null;r.run();} else {// runnable 真正执行runnable.run();}}}总结 本文中的激活例子其实只是列举了Actor 的实现原理想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘~ zeebe 团队真的是太喜欢functional programming了找一个方法的调用链头都大了
http://www.hkea.cn/news/14330269/

相关文章:

  • 公司网站建设入什么费用中信建设有限责任公司内部网站
  • seo sem是指什么意思seo技巧分享
  • 如何制作动漫网站模板wordpress时间轴插件
  • 网站建设比较合理的流程织梦网站如何生成伪静态
  • 一起做网商网站怎么样遵义网络科技有限公司
  • 做seo网站优化多少钱网站建设与管理教学视频教程
  • 公司网站域名是什么意思盘锦做网站电话
  • 做淘宝需要知道什么网站全球设计行
  • 怎样建设免费网站软件技术职业生涯规划书
  • 湛江市住房和城乡建设网站宿州网站建设贰聚思诚信
  • 南宁码科网站建设徐州关键字优化资讯
  • 电话推销网站建设上海数据开放网站建设
  • 余姚网站建设yyshj毕业设计博客网站开发
  • 搭建网站的步骤和顺序酒泉建设局网站
  • 给网站做排名优化学什么好处wordpress怎么更改后台路径
  • 青岛网站设计报价免费可商用的cms
  • 合肥市城乡和建设网站单位网站建设制作
  • 网站设计 中高端用什么软件做网站最简单 最方便
  • 最新免费网站源码如何免费建站
  • 黄江镇网站建设公司国内专业的企业展厅设计
  • 龙岗做网站哪里找网站管理与维护的优势
  • 站长统计app软件下载官网wordpress xss漏洞
  • asp网站建设 win7河南省新闻出版学校
  • 网络营销与网站推广的北京注册公司核名网站
  • 建设银行的网站是什么情况宁波seo外包公司
  • 天津通用网站建设收费河南网站建设的公司
  • 保定网络公司网站wordpress微博同步
  • 手机可以做3d动漫视频网站有哪些黑科技广告推广神器
  • 建德市住房和城乡建设局网站怎么更改网站栏目id
  • 怎么做仲博注册网站百度收录查询网址