当前位置: 首页 > news >正文

装饰网站模板下载网络营销前景和现状分析

装饰网站模板下载,网络营销前景和现状分析,南通建设工程造价信息网站,永久有效域名zeebe actor 模型🙋‍♂️ 如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。 环境⛅ zeebe release-8.1.14 actor.run() 是怎么开始的🌈 Lon…

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() -> {final InFlightLongPollingActivateJobsRequestsState state =getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;@Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread = ActorThread.current();if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {final ActorJob newJob = currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job = new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future = new CompletableFuture<ActivateJobsHandler>();final var actor =Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName = groupName;this.numOfThreads = numOfThreads;tasks = new WorkStealingGroup(numOfThreads);threads = new ActorThread[numOfThreads];for (int t = 0; t < numOfThreads; t++) {final String threadName = String.format("%s-%d", groupName, t);final ActorThread thread =builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] = thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task

// 继承自Thread @Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");}}// 主要执行doWork 方法@Overridepublic void run() {idleStrategy.init();while (state == ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);}}state = ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask = taskScheduler.getNextTask();if (currentTask != null) {final var actorName = currentTask.actor.getName();try (final var timer = actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties = currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit = false;try {// 真正执行当前任务resubmit = currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error("Unexpected error occurred in task {}", currentTask, e);} finally {MDC.remove("actor-name");clock.update();}if (resubmit) {currentTask.resubmit();}}

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job

public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit = false;// 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()while (!resubmit && (currentJob != null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED -> {final ActorJob terminatedJob = currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob = fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription = terminatedJob.getSubscription();// 如果订阅是一次性的,那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED ->// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit = true;default -> {}}if (shouldYield) {shouldYield = false;resubmit = currentJob != null;break;}}if (currentJob == null) {resubmit = onAllJobsDone();}return resubmit;}private boolean poll() {boolean result = false;result |= pollSubmittedJobs();result |= pollSubscriptions();return result;}

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {actorThread = runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture != null) {resultFuture.complete(invocationResult);resultFuture = null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread = null;// 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable == null) {schedulingState = TaskSchedulingState.TERMINATED;} else {schedulingState = TaskSchedulingState.QUEUED;scheduledAt = System.nanoTime();}}}private void invoke() throws Exception {if (callable != null) {invocationResult = callable.call();} else {// only tasks triggered by a subscription can "yield"; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r = runnable;runnable = null;r.run();} else {// runnable 真正执行runnable.run();}}}

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅

http://www.hkea.cn/news/254250/

相关文章:

  • 啥网站都能看的浏览器下载百度收录查询工具
  • 福田附近公司做网站建设哪家效益快奶糖 seo 博客
  • 临沂免费自助建站模板品牌整合营销
  • iis做本地视频网站找客户资源的网站
  • 做调查用哪个网站网络推广有多少种方法
  • 开发一个交易网站多少钱在线工具
  • 网站平台怎么建立的软文范例
  • 移动应用开发专业学什么东莞seo软件
  • 做宣传网站的公司手机百度极速版app下载安装
  • 私人可以做慈善网站吗外贸如何推广
  • 网站页面模板页面布局如何成为百度广告代理商
  • 瑞安外贸网站建设曲靖百度推广
  • 先做网站还是服务器销售营销方案100例
  • 用卫生纸做的礼物街网站免费网页空间到哪申请
  • 手游网站做cpc还是cpm广告号厦门网页搜索排名提升
  • 人个做外贸用什么网站好宁波百度seo点击软件
  • 诈骗网站怎么做的企业网站seo案例分析
  • 如何做网站接口湖南营销型网站建设
  • 进入兔展网站做PPt软文营销ppt
  • app网站新闻危机公关
  • 东莞关键词优化实力乐云seo南宁seo外包服务商
  • 做网站都是用源码么免费注册个人网站不花钱
  • 建设网站需要两种服务支持官网设计公司
  • 安庆做网站seo建站收费地震
  • 绵阳住房和城市建设局网站官网seo排名优化联系13火星软件
  • 网站开发建设费用关键词异地排名查询
  • 网站建设企业电话广州优化疫情防控举措
  • 重庆模板网站建设百度网站域名注册
  • 安徽建设厅网站地址网络广告推广方式
  • 门户网站内容管理建设方案企业关键词优化推荐