当前位置: 首页 > news >正文

广州手机网站设计优秀企业网站制作

广州手机网站设计,优秀企业网站制作,网站建设入固定资产,开发app需要什么技术ABP - 事件总线之分布式事件总线 1. 分布式事件总线的集成1.2 基于 RabbitMQ 的分布式事件总线 2. 分布式事件总线的使用2.1 发布2.2 订阅2.3 事务和异常处理 3. 自己扩展的分布式事件总线实现 事件总线可以实现代码逻辑的解耦#xff0c;使代码模块之间功能职责更清晰。而分布… ABP - 事件总线之分布式事件总线 1. 分布式事件总线的集成1.2 基于 RabbitMQ 的分布式事件总线 2. 分布式事件总线的使用2.1 发布2.2 订阅2.3 事务和异常处理 3. 自己扩展的分布式事件总线实现 事件总线可以实现代码逻辑的解耦使代码模块之间功能职责更清晰。而分布式事件总线的功能不止这些它允许我们通过消息队列进行中转发布和订阅跨应用/服务边界传输的事件经常被用作微服务或不同应用程序之间异步发送和接收消息的手段属于分布式应用通讯的方式之一。 分布式事件总线依赖于 消息队列 中间件ABP 框架中提供了4种开箱即用的提供程序我们也可以基于抽象的接口自行实现分布式事件总线提供程序已有的四种分别适用于 RabbitMQ、Kafka、Rebus 等消息队列还有一种是默认实现进程内的分布式事件总线它允许我们在没有接入消息队列时也能够编写与分布式体系结构兼容的代码方便日后可能的微服务拆分这时它的工作方式与本地事件总线一样整体的设计思想就和微软的分布式缓存一样。 1. 分布式事件总线的集成 以下的演示还是基于控制台程序分布式事件总线不会默认集成在 ABP 启动模板之中需要我们自行集成Web 应用的集成方式也是一样的。 通过以下命令创建一个控制台程序启动模板 abp new AbpDistributeEventBusSample -t console之后再打开解决方案由于分布式事件总线是在不同应用程序之间进行通讯的所以还需要再创建一个控制台项目进行演示将解决方案中的项目复制一份即可。 本地分布式事件总线 首先讲一下进程内的事件总线的集成这个还是有些必要的如果有考虑后续进行微服务拆分的情况下前期对于事件总线的使用可以基于这个进行开发。当然并不是说本地事件总线就不推荐使用从我自己的日常工作经验中很多时候还是分布式事件总线和本地事件总线搭配使用的。 首先分布式事件总线的核心依赖包为 Volo.Abp.EventBus和本地事件总线一样。我们在需要集成的项目的根目录下通过以下命令进行集成 abp add-package Volo.Abp.EventBus由于是本地分布式事件总线所以这种方式下是没办法跨进程通讯的使用方式和上一篇讲的本地事件总线类似不过使用的时候不再通过 ILocalEventBus接口而是通过 IDistributedEventBus 接口主要是用于业务逻辑的解耦同时也为后续可能的分布式拆分做好准备。具体的使用方式下面细讲。 1.2 基于 RabbitMQ 的分布式事件总线 ABP 框架提供了三种开箱即用的分布式事件总线提供程序分别对应 RabbitMQ、Kafka、Rebus通过结合第三方消息队列实现真正基于消息跨进程通讯的分布式事件总线这里主要讲一下基于 RabbitMQ 的方式其他方式用法类似。 首先基于 RabbitMQ 的分布式事件总线需要安装 Volo.Abp.EventBus.RabbitMQ 驱动程序包。可通过一下方式安装 abp add-package Volo.Abp.EventBus.RabbitMQ上面创建的两个工程都要安装因为我们要演示两个进程间的通讯。 之后需要部署 RabbitMQ 消息队列这里我通过 docker 快速启动一个带有管理平台的 RabbitMQ命令如下 docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq:managementRabbitMQ 默认用户密码为guest / guest这里只是用于测试就直接使用了。生产环境中大家最好将默认用户禁用另行创建自己的用户。RabbitMQ 相关的更详细的使用和配置这里就不细讲了详细内容可见 [[2.1 RabbitMQ基本概念]] 系列文章。 然后添加分布式事件总线相应的配置我们可以在 appsettings.json 文件中添加以下配置节点 RabbitMQ: {Connections: {// 这里的配置支持 RabbitMQ 官方 sdk 中的 ConnectionFactory 的任意属性的配置Default: {HostName: localhost, // rabbitmq 地址集群环境下多个ip用逗号分隔Port: 5672, // rabbbitmq 端口默认5672UserName: guest,Password: guest}// 允许配置多个 rabbitmq 连接但只能有一个用于事件总线//Second: {// HostName: xxx.xxx.xxx.xxx, // rabbitmq 地址集群环境下多个ip用逗号分隔// Port: 5672 // rabbbitmq 端口默认5672//}},EventBus: {ClientName: MyClientName, // 用于事件总线的队列名ExchangeName: MyExchangeName, // 用于事件总线的交换机名称// ConnectionName: Default // 配置多个连接时指定用于事件总线的 RabbitMQ 连接默认是 Default}}以上配置最后都会被转换为 AbpRabbitMqOptions 和 AbpRabbitMqEventBusOptions所以我们也可以直接在代码中对这两个选项进行配置 ConfigureAbpRabbitMqOptions(options {options.Connections.Default.UserName guest;options.Connections.Default.Password guest;options.Connections.Default.HostName localhost;options.Connections.Default.Port 5672; });ConfigureAbpRabbitMqEventBusOptions(options {options.ClientName TestApp1;options.ExchangeName TestMessages; });两种方式选择一种即可如果两种方式同时使用代码配置优先于配置文件。解决方案的两个项目都需要进行配置进行通讯的两个项目需要连接到同一个队列。 完成上面的配置之后启动应用即可看到 RabbitMQ 中创建了我们配置的交换机和队列 2. 分布式事件总线的使用 2.1 发布 事件发布需要一个事件对象官方将之称为 Eto(事件传输对象)这是一个普通类用于保存和事件相关的数据一般以 Eto 作为后缀。就算一个事件不需要传输任何数据也必须创建一个空类这和上一章的本地事件总线是一样的由于在分布式事件触发之后事件对象会被序列化传输到消息队列中所以事件对象应避免循环引用、多态、私有setter并提供默认(空)构造函数如果你有其他的构造函数(虽然某些序列化器可能会正常工作)。 下面是一个用于测试的 Eto 对象的定义。 [EventName(helloEvent)] public class HelloEto {public string Who { get; set; }public DateTime When { get; set; }public string ToWho { get; set; } }默认情况下事件名将事件名称将是事件类的全名我们可以通过 EventNameattribute 特性指定事件名称。 分布式事件的发布通过 IDistributedEventBus 接口只需将其注入到相应的类中使用即可使用方式和本地事件总线一样。 public class HelloWorldService : ITransientDependency {private readonly IDistributedEventBus _distributedEventBus;public HelloWorldService(IDistributedEventBus distributedEventBus){_distributedEventBus distributedEventBus;}public async Task SayHelloAsync(){await _distributedEventBus.PublishAsync(new HelloEto{Who Jerry,When DateTime.Now,ToWho Jack});} }以上代码写在 AbpDistributeSample 项目中这里是事件发布的进程。 2.2 订阅 事件的订阅也和本地事件总线类似这里通过实现了 IDistributedEventHandlerTEvent 接口的处理器来处理事件当前像上一章本地事件总线中讲到的我们也可以通过 IDistributedEventBus 来自己订阅事件。 public class HelloDistribuedEventHandler : IDistributedEventHandlerHelloEto, ITransientDependency {public Task HandleEventAsync(HelloEto eventData){Console.WriteLine(${eventData.Who} Say Hello To {eventData.ToWho} at { eventData.When.ToString(yyyy-MM-dd HH:mm:ss) });return Task.CompletedTask;} }一个事件处理程序可以同时处理多种事件只需要实现多个针对不同 ETO 的 IDistributedEventHandler 泛型接口即可。 之后通过 vs 设置多项目启动 应用启动之后可以看到控制台的输出如下一个简单的基于消息队列的跨进程通讯已经完成 同时在 RabbitMQ 上可以看到已经连接上来了2个消费者每个进程既作为生产者也作为消费者 通过源码可以看到分布式事件总线中会进行初始化其实就是根据配置连接了 RabbitMQ 队列并创建了一个消费者自动订阅消息队列上的事件当接受到消息之后会从消息中获取消息的类型和具体的数据触发消息执行处理程序如果事件处理程序成功执行(没有抛出任何异常)它将向消息代理发送确认(ACK)。 这里的 EventTypes 其实就是维护消息类型和它的类名的对应关系的一个字典在 SubscribeHandlers 中初始化消息类型和执行器对应关系的时候维护的这个就和前一篇的本地事件总线大同小异了。 而消息的发布过程就更简单了也就是将消息序列化发送到 RabbitMQ 队列中。 ABP 分布式事件总线从 5.0 版本开始还加入了 Inbox 、Outbox 机制我们可以通过 AbpDistributedEventBusOptions 选项进行配置例如 ConfigureAbpDistributedEventBusOptions(option {option.Inboxes.Configure(config config.UseDbContextTDbContext());option.Outboxes.Configure(config config.UseDbContextTDbContext()); });从配置方式也可以看出这里是借助了数据库的实际上 Inbox 就是从消息队列接收到消息之后将消息先存入数据库中没有直接执行。 而在应用启动的时候消息队列模块会注册一个 InboxProcessManager实际上这就是一个后台工作者这里面又用到 IInboxProcessor 在 InboxProcessor 中再通过定时器每个一段时间从数据库中读取消息真正地去执行并且将数据库中的记录删除。 OutBox 的机制也是一样的这样做大概就是起到缓冲的作用避免短时间内大量的消息对消息队列或者我们的消费者造成冲击导致应用崩溃而是将这些消息的发送、执行均匀地处理。 2.3 事务和异常处理 分布式事件总线默认实现是 LocalDistributedEventBus在没有集成 RabbitMQ 等第三方消息队列中间件并且没有使用发件箱/收件箱模式的情况事件发布和事件订阅是在同一个进程的。如果当前事件发布的模块使用了工作单元事件总线是在和发布事件的同一工作单元范围内执行事件处理程序如果事件处理程序抛出异常那么相关的工作单元数据库事务将被回滚。这样我们的应用程序逻辑和事件处理逻辑就具有事务性原子。如果想忽略事件处理程序中的错误则必须在处理程序中使用try-catch块并且不应该重新抛出异常。 当我们切换到真正的分布式事件总线提供程序时事件处理程序将在不同的进程/应用程序中执行。在这种情况下实现事务性事件发布的唯一方法是使用发件箱/收件箱模式。这篇文章上面的内容简单地提了一下发件箱/收件箱之后还会有文章详细梳理它的工作模式。 如果在未真正使用分布式事件总线的情况下想在工作单元中立即发布事件而不是等到工作单元中的逻辑执行完成之后再发布可以在使用IDistributedEventBus.PublishAsync方法时将onUnitOfWorkComplete设置为false。如果接入 RabbitMQ 等第三方消息队列实现了分布式事件总线想要立即发布事件则还得将 useOutbox 设置为 false。 3. 自己扩展的分布式事件总线实现 要注意的一点是ABP 框架的分布式事件总线只能配置使用一个队列这就意味着如果一个进程需要和多个进程进行通讯时是无法通过不同的队列进行区分的。当多个进程都连接到同一个队列中时我们无法控制一个进程发送的消息会被哪个进程所消费无法单独通知某一个消费者这是需要注意的。 有一种折中的方式那就是如果一个事件只想发送给某一个进程那就只在这个进程中实现其处理程序其他没有实现处理程序的进程接收到消息之后因为没有处理会抛出异常消息重新返回消息队列中。这种方式依旧是存在一些问题只是将工作中的一点经验供大家参考一下。 public interface IWantDistributedEventBus {Task PublishAsync(Type eventType, object eventData, string queueName); }public class RabbitMqWantDistributedEventBus : IWantDistributedEventBus, ISingletonDependency {protected ILocalEventBus LocalEventBus { get; }protected IConnectionPool ConnectionPool { get; }protected WantRabbitMqEventBusOptions WantRabbitMqEventBusOptions { get; }protected AbpLocalEventBusOptions AbpLocalEventBusOptions { get; }protected IRabbitMqSerializer Serializer { get; }protected IRabbitMqMessageConsumer Consumer { get; private set; }protected IRabbitMqMessageConsumerFactory MessageConsumerFactory { get; }protected ConcurrentDictionarystring, Type EventTypes { get; }public RabbitMqWantDistributedEventBus(IConnectionPool connectionPool,IOptionsWantRabbitMqEventBusOptions options,IOptionsAbpLocalEventBusOptions localEventBusOptions,IRabbitMqSerializer serializer,IRabbitMqMessageConsumerFactory rabbitMqMessageConsumerFactory,ILocalEventBus localEventBus){ConnectionPool connectionPool;WantRabbitMqEventBusOptions options.Value;Serializer serializer;MessageConsumerFactory rabbitMqMessageConsumerFactory;LocalEventBus localEventBus;AbpLocalEventBusOptions localEventBusOptions.Value;EventTypes new ConcurrentDictionarystring, Type();}/// summary/// 队列消费者初始化/// /summarypublic void Initialize(){foreach (var queueName in WantRabbitMqEventBusOptions.ConsumeQueueNames){Consumer MessageConsumerFactory.Create(new ExchangeDeclareConfiguration(WantRabbitMqEventBusOptions.ExchangeName,type: direct,durable: true),new QueueDeclareConfiguration(queueName,durable: true,exclusive: false,autoDelete: false),WantRabbitMqEventBusOptions.ConnectionName);Consumer.BindAsync(queueName);Consumer.OnMessageReceived(ProcessEventAsync);}LoadEventTypes(AbpLocalEventBusOptions.Handlers);}/// summary/// 消息消费逻辑/// /summary/// param namemessage从队列接收到的消息/param/// returns/returnsprivate async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea){var msgData (MessageData)Serializer.Deserialize(ea.Body.ToArray(), typeof(MessageData));var eventName msgData.Type;var eventType EventTypes.GetOrDefault(eventName);// eventType为空, 即不存在类型对应的Handler,消息不应该被消费if (eventType null){// return;throw new NotFoundHandlerException($不存在{eventName}消息Handler!);}// 通过消息类型转发LocalHandler进行具体逻辑处理var eventData Serializer.Deserialize(Serializer.Serialize(msgData.Data), eventType);await LocalEventBus.PublishAsync(eventType, eventData);}/// summary/// 根据现有注册的Handler构建消息类型集合/// /summary/// param namehandlers当前应用中Handler类型集合/param/// returns/returnspublic Task LoadEventTypes(ITypeListIEventHandler handlers){foreach (var handler in handlers){var interfaces handler.GetInterfaces();foreach (var interface in interfaces){if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(interface)){continue;}var genericArgs interface.GetGenericArguments();if (genericArgs.Length 1){var eventTypeName genericArgs[0].FullName;if (!EventTypes.ContainsKey(eventTypeName)){EventTypes[eventTypeName] genericArgs[0];}}}}return Task.CompletedTask;}/// summary/// 发布消息/// /summary/// param nameeventType消息类型/param/// param nameeventData消息内容/param/// param namequeueName队列名称/param/// returns/returnspublic Task PublishAsync(Type eventType, object eventData, string queueName){var msg new MessageData { Type eventType.FullName, Data eventData };var body Serializer.Serialize(msg);using (var channel ConnectionPool.Get(WantRabbitMqEventBusOptions.ConnectionName).CreateModel()){channel.ExchangeDeclare(WantRabbitMqEventBusOptions.ExchangeName,direct,durable: true);var properties channel.CreateBasicProperties();properties.DeliveryMode RabbitMqConsts.DeliveryModes.Persistent;var queue channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);channel.QueueBind(queueName, WantRabbitMqEventBusOptions.ExchangeName, queueName);channel.BasicPublish(exchange: WantRabbitMqEventBusOptions.ExchangeName,routingKey: queueName,mandatory: true,basicProperties: properties,body: body);}return Task.CompletedTask;} }public class WantRabbitMqEventBusOptions {public string ConnectionName { get; set; }public string ClientName { get; set; }public string ExchangeName { get; set; }public IListstring ConsumeQueueNames { get; }public WantRabbitMqEventBusOptions(){ConsumeQueueNames new Liststring();} }public class MessageData {public string Type { get; set; }public object Data { get; set; } }[DependsOn(typeof(AbpRabbitMqModule))] [DependsOn(typeof(AbpEventBusModule))] public class WantAbpEventBusRabbitMqModule : AbpModule {public override void ConfigureServices(ServiceConfigurationContext context){var configuration context.Services.GetConfiguration();ConfigureWantRabbitMqEventBusOptions(configuration.GetSection(RabbitMQ:EventBus));}public override void OnApplicationInitialization(ApplicationInitializationContext context){context.ServiceProvider.GetRequiredServiceRabbitMqWantDistributedEventBus().Initialize();} }以上就是 ABP 框架下分布式事件总线的基本知识点其中也提到了 发件箱/收件箱 等新特性ABP 框架经过长时间的发展基本与 .NET 版本同步更新到现在 9.0 版本各种组件的特性也在逐步迭代更新。分布式事件总线除了这篇文章中提到的基本内容之外还有一些新特性后续会有一篇文章专门讲一下这些特性。 参考文档 ABP 官方文档 - 分布式事件总线
http://www.hkea.cn/news/14326591/

相关文章:

  • 优秀自适应网站建设哪家好网站使用帮助内容
  • 阿里云网站建设 部署与发布答案网页框架与布局
  • 中国八大设计院排名seort什么意思
  • 精品课程网站怎么做王占山战斗英雄
  • 关于加强内网网站建设的通知不会编程怎样建设网站
  • 公司网站兰州建设需要多少钱公司招聘网站有哪些
  • 软件工程是干什么的百度推广优化排名
  • 怎么给企业制作网站专题类响应式网站建设
  • 建设银行互联网网站wordpress ddos攻击
  • 南京企业网站设计制作网站怎么做seo、
  • 无锡富通电力建设有限公司网站全国知名vi设计公司
  • 哈尔滨做网站价格百度seo2022
  • 建设中网站首页长沙软件开发
  • 平顶山建设网站宁海做网站
  • 网站建设高端公司适合做网站的软件有哪些
  • 电子类网站模板网站建设技术课程设计
  • 广州外贸网站信息黑龙江恒泰建设集团网站
  • 4昌平区网站建设郑州便宜网站建设报价
  • 找外包网站 和自己做群晖wordpress怎么用
  • 济南川芎网站建设公司网站设计一年费用
  • 工程公司手机网站网站推广效果怎么样
  • 网站外链怎么发布网站查询平台
  • 南京好的网站设计网页制作啥专业
  • 云尚网络科技有限公司网站建设成都设计公司logo
  • 专业格泰建站app网站下载免费
  • 福田网站设计处理二级域名子域名大全
  • 傻瓜式自助建站系统想做网站的公司好
  • 上海网站建设免石家庄怎样做网站
  • 适合高中生做网站的主题快速网站模板公司
  • 企业网站.net工业信息化部网站备案