网站建设基础流程图,引擎优化是什么工作,深圳网站开发制作,有没有悬赏做ppt的网站一#xff0c;桥接资源配置及规则配置
Emqx桥接配置流程
1#xff0c;配置资源并测试连接通过
规则引擎——资源——新建——选择MQTT Bridge——填写参数测试连接 参数描述详见3.1资源配置
2#xff0c;配置规则
2.1根据实际业务选择合适sql 规则引擎…一桥接资源配置及规则配置
Emqx桥接配置流程
1配置资源并测试连接通过
规则引擎——资源——新建——选择MQTT Bridge——填写参数测试连接 参数描述详见3.1资源配置
2配置规则
2.1根据实际业务选择合适sql 规则引擎——规则——新建——规则sql sql见06手册/实施部署手册/empx 桥接规则配置模版.xlsx
2.2填写规则id “rule当前节点_upload_目标节点”例如“rule:yantai_upload_shandong”
2.3添加响应动作 动作选择“桥接数据到 MQTT Broker” 关联资源选择配置好的目标资源节点没有目标资源点击新建资源去新建 转发消息主题空着即可 (转发消息时使用的主题。如果未提供则默认为桥接消息的主题) 消息内容模板: 填写”${payload}” (支持变量。若使用空模板默认消息内容为 JSON 格式的所有字段)
3参数配置
3.1资源配置
1资源类型下拉选择MQTT Bridge 2资源ID 一对一“resource:”当前节点_to_目标节点例如“resource:yantai_to_shandong” 一对多“resource:”当前节点_to_目标节点例如“resource:guojia_to_provinces” 3连接池大小设为默认值8 4客户端id当前节点client例如“yantai_client” 5附加GUID设为默认值true附加 GUID 选项设置为 true 时MQTT 连接使用的 clientid 增加随机后缀以保证全局唯一性。 设置为 false 时会导致 clientid 使用同一个连接池中线程互踢EMQX 多个节点之间的桥接也会互踢推荐仅在单节点 EMQX 且连接池大小为 1 时开启此选项。 6用户名连接远程Broker的用户名 7密码 连接远程Broker的密码 8桥接主题的挂载点:示例: 本地节点向 topic1 发消息远程桥接节点的主题会变换为 bridge/aws/${node}/topic1程序中应设置为空 9磁盘缓存设为默认值off 10协议版本设为默认值mqttV4 11心跳间隔设为默认值60s 12重连间隔设为默认值30s 13重传间隔默认值20s 14桥接模式false 15开启SSL连接false 16服务器名称知识指定用于对端证书验证时使用的主机名 或者设置为 disable 以关闭此项验证。默认不填即可
注意配置完毕点击测试连接显示连接成功即可应用
二过多的消息发布
ERRORMQTT(32202): 正在发布过多的消息 解决方案 1增大maxInflight最低需要paho1.2.0版本 2配置多个mqtt client
由于mqttMessageHandler只会引用一个paho客户端并且在内部对paho客户端做了封装所以直接修改MqttPahoMessageHandler复杂度较高我们可以重新写一个MultiMqttMessageHandler内部初始化多个MqttPahoMessageHandler这样通过MessageingGateway发送消息时直接通过MultiMqttMessageHandler来处理mqtt消息MultiMqttMessageHandler可以通过负载均衡的方式来把消息分派给各个MqttPahoMessageHandler
1自定义MyMqttPahoMessageHandler类继承MqttPahoMessageHandler注意权限由protected改成public。handleMessageInternal()会由channel通过dispatcher间接调用重写onInit()用来手动初始化MqttPahoMessageHandler。
Overridepublic void doStop() {super.doStop();}Overridepublic void handleMessageInternal(Message? message) throws Exception {super.handleMessageInternal(message);}Overridepublic void onInit() {try {super.onInit();} catch (Exception e) {e.printStackTrace();}}2自定义MultiMqttMessageHandler类继承AbstractMessageHandler并implements Lifecycle自定义一个MessageHandler添加一个Map成员属性用来维系多个MyMqttPahoMessageHandlerhandlerCount变量可配置多个mqtt client。这里只用了radom随机数来做负载均衡
private final AtomicBoolean running new AtomicBoolean();
private volatile MapInteger, MessageHandler mqttHandlerMap;Value(${spring.mqtt.sender.count})
private Integer handlerCount;Autowired
private MqttSenderConfig senderConfig;Override
public void start() {if (!this.running.getAndSet(true)) {doStart();}
}private void doStart(){mqttHandlerMap new ConcurrentHashMap();for(int i0;ihandlerCount;i){mqttHandlerMap.put(i, senderConfig.createMqttOutbound());}
}Override
public void stop() {if (this.running.getAndSet(false)) {doStop();}
}private void doStop(){for(Map.EntryInteger, MessageHandler e : mqttHandlerMap.entrySet()){MessageHandler handler e.getValue();((MyMqttPahoMessageHandler)handler).doStop();}
}Override
public boolean isRunning() {return this.running.get();
}Override
protected void handleMessageInternal(Message? message) throws Exception {Random random new Random();MyMqttPahoMessageHandler messageHandler (MyMqttPahoMessageHandler)mqttHandlerMap.get(random.nextInt(handlerCount));messageHandler.handleMessageInternal(message);
}3消息发布配置类
Bean
public MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory new DefaultMqttPahoClientFactory();factory.setServerURIs(hostUrl);factory.setUserName(username);factory.setPassword(password);return factory;
}
public MessageHandler createMqttOutbound(){String tempId MqttAsyncClient.generateClientId();MyMqttPahoMessageHandler messageHandler new MyMqttPahoMessageHandler(clientId sender tempId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setDefaultQos(1);messageHandler.onInit();return messageHandler;
}Bean
ServiceActivator(inputChannel mqttOutboundChannel)
public MessageHandler mqttOutbound() {return new MultiMqttMessageHandler();
}Bean
public MessageChannel mqttOutboundChannel() {return new DirectChannel();
}MessagingGateway(defaultRequestChannel mqttOutboundChannel)
public interface MqttGateway {void sendToMqtt(String data, Header(MqttHeaders.TOPIC) String topic);
}三报文内容过大
调整emqx参数值 zone.external.max_packet_size mqtt.max_packet_size
四队列已满
调整emqx参数值 zone.external.max_mqueue_len 消息队列最大长度。当飞行窗口满或客户端离线后消息会被存储至该队列中。0 表示不限制。
五发送大消息的时候客户端会被强制kill掉
emqx升级到4.4.10版本之后
六其他重点相关优化参数
参数介绍api链接 https://www.emqx.io/docs/zh/v4.3/configuration/configuration.html#cluster
//集群节点发现方式。可选值为:manual: 手动加入集群static: 配置静态节点。配置几个固定的节点新节点通过连接固定节点中的某一个来加入集群。mcast: 使用 UDP 多播的方式发现节点。dns: 使用 DNS A 记录的方式发现节点。etcd: 使用 etcd 发现节点。k8s: 使用 Kubernetes 发现节点。
cluster.discovery//指定多久之后从集群中删除离线节点。
cluster.autoclean//当使用 static 方式集群时指定固定的节点列表多个节点间使用逗号分隔
cluster.static.seeds//节点名。格式为 namehost。其中 host 可以是 IP 地址也可以是 FQDN注意格式限制
node.name//系统调优参数设置 Erlang 允许的最大进程数这将影响 emqx 节点能处理的连接数
//integer 1024 - 134217727 默认2097152
node.process_limit//系统调优参数设置 Erlang 允许的最大 Ports 数量
//integer 1024 - 134217727 1048576
node.max_ports//系统调优参数设置 Erlang 分布式通信使用的最大缓存大小
//bytesize 1KB - 2GB 8MB
node.dist_buffer_size//系统调优参数设置 Erlang 运行时允许的最大 ETS 表数量 integer 默认262144
node.max_ets_tables//系统调优参数设置 Erlang 运行多久强制进行一次全局垃圾回收。默认15m
node.global_gc_interval//系统调优参数设置 Erlang 运行时多少次 generational GC 之后才进行一次 fullsweep GC。
//integer 0 - 65535 默认1000
node.fullsweep_after//系统调优参数当一个节点持续无响应多久之后认为其已经宕机并断开连接 默认120node.dist_net_ticktime//MQTT 服务器会为每个主题存储最新一条保留消息以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
mqtt.retain_available//是否忽略自己发送的消息默认false
mqtt.ignore_loop_deliver//当收到一定数量的消息或字节就强制执行一次垃圾回收。
//16000|16MB 表示当收到 16000 条消息或 16MB 的字节流入就强制执行一次垃圾回收
zone.external.force_gc_policy//允许客户端订阅主题的最大层级。0 表示不限制层级多会有性能问题zone.external.max_topic_levels//飞行窗口大小。飞行窗口用于存储未被应答的 QoS 1 和 QoS 2 消息
zone.external.max_inflight//消息重发间隔。EMQX 在每个间隔检查是否需要进行消息重发
zone.external.retry_interval//消息队列是否存储 QoS 0 消息。
zone.external.mqueue_store_qos0//ACL机制
MQTT 授权authorization是指对 MQTT 客户端的发布和订阅操作进行 权限控制。 控制的内容主要是哪些客户端可以发布或者订阅哪些 MQTT 主题。
EMQX 支持集中类型的授权。权限列表亦即 ACL。可以从例如 MongoDB MySQLPostgreSQLRedis或者 EMQX 的内置数据库中读取这个列表。加载一个包含全局的 ACL 的文件。动态访问一个 HTTP 后端服务并通过该 HTTP 调用的返回值来客户端是否有访问的权限。通过提取认证过程中携带的授权数据例如 JWT 的某个字段。EMQX 最大文件句柄数doneulimit -n 1048576done: /etc/security/limits.confdone: /etc/sysctl.confdone: /etc/systemd/system.confdone: 重启 emqx 服务ulimit -n 1048576; ./emqx stop; ./emqx startdone: 确认 EMQX Web 后台显示
tcp并发数
/etc/systemd/system.conf查看默认值$ systemctl --user show syncthing | grep LimitNOFILELimitNOFILE4096LimitNOFILESoft1024设置DefaultLimitNOFILE10485767jmeter压测emqx
1. 下载jmeter解压
https://jmeter.apache.org/download_jmeter.cgi 以 5.4.3 为例下载地址 https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.4.3.zip linux下解压 unzip apache-jmeter-5.4.3.zip
2. 下载mqtt-jmeter插件
下载地址 https://github.com/emqx/mqtt-jmeter/releases https://github.com/emqx/mqtt-jmeter/releases/download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar
3. 将插件放置于jmeter的lib/ext目录下windows/linux同样操作
4. 本文先在windows下生成的jmx脚本然后传至linux下使用
4.1 新建两个线程组 第一个仅包含一个 MQTT DisConnect执行一次 第二个里面包含具体的压测开启1000个线程1s内将线程创建完毕无限循环。创建两个计数器pub_counter用来技术发布消息数thread_counter用来线程计数
4.2 事先创建1000个设备名称为cosmoiottest000001 - cosmoiottest000001000可自己定义。添加一次性控制器mqtt连接一次后续pub消息写上配置信息。
4.3 添加循环控制器循环一次。包含固定定时器休眠1000ms一个发布MQTT Pub Sampler即每个线程进来执行一次发布消息然后休眠1000ms进入下一次循环。每个消息包含100个点位(根据自己需要设置)每个点位随机生成一个整数。配置详见截图
4.4 添加观察结果树、汇总报告、聚合报告等可在windows下面查看结果
4.5 配置截图如下:
循环执行线程 
3. mqtt-jmeter 的jar包需要传至lib/ext目录否则不可用
4. 生成报告时报错Consumer failed with message :Begin size 0 is not equal to fixed size 5 将jdk换成8版本
5. jtl结果文件也可拉到windows使用jmeter直接查看新建线程组-聚合报告选择jtl文件