修改 网站 数据库,宁波外贸公司为什么这么多,提升学历图片,重庆沛宣网站建设最近我在做一个车上的HMI项目#xff0c;也就是车机应用#xff0c;需要与云端和域控进行通信。HMI的功能已经外包了#xff0c;但消息的统一层留给我自己来做。因为项目组其他人都没有经验#xff0c;所以这个任务就落到了我头上#xff0c;尽管我自己也没有太多经验也就是车机应用需要与云端和域控进行通信。HMI的功能已经外包了但消息的统一层留给我自己来做。因为项目组其他人都没有经验所以这个任务就落到了我头上尽管我自己也没有太多经验但也没办法只能直接上手了。大概架构如下 1、整体设计
设计原则很简单通过阻塞队列进行交互因为要和第三方进行对接这里通过topic和 bytes 进行交互让他们自己转化 看下消息队列定义也没啥就是两个阻塞队列通过队列和第三方交互。
public class MsgQueue
{// 用于存储接收到的二进制消息private static BlockingCollectionMsg recMessageQueue new BlockingCollectionMsg();private static BlockingCollectionMsg sendMessageQueue new BlockingCollectionMsg();public static BlockingCollectionMsg RecvMessageQueue{get { return recMessageQueue; }}public static BlockingCollectionMsg SendMessageQueue{get { return sendMessageQueue; }}
}2、zeroMq功能实现
这次的需求是和域控进行通信主要使用发布订阅模式也就是我本地需要一个client一个server。
2.1 zeroMq 介绍
第一次使用zeroMq 稍微介绍下ZeroMQ 是一个高性能的异步消息库旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换支持多种消息模式能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ 的一些关键特性和概念ZeroMQ 支持多种消息模式包括 请求-响应Req-Rep客户端发送请求服务器处理并回复。 发布-订阅Pub-Sub发布者发布消息订阅者接收感兴趣的消息。 推送-拉取Push-Pull用于分布式任务处理推送端将任务发送到拉取端。 管道Pipeline将多个组件连接起来形成数据处理管道。简单说就是一个TCP通信的框架可以在本地作为客户端和服务器 官方网站https://zeromq.org/languages/csharp/
2.2 插件介绍
zeromq的在C#上主要是通过netMq库这玩意好多年不更新了 具体地址https://github.com/zeromq/netmq 这玩意折腾了好久第一次上手主要要注意版本通过Nuget 安装
Install-Package NetMQ不知道为什么我安装不成功unity里还是无法使用我直接拷贝了dll到plugins拷贝的时候注意依赖项总共有三个要不然会报错 2.3 代码实现
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;public class ZeroStarter : MonoBehaviour
{private SubscriberSocket subscriber;private PublisherSocket publisherSocket;private bool isRunning true;void Start(){AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作// 初始化发布者publisherSocket new PublisherSocket();publisherSocket.Bind(tcp://*:5557);subscriber new SubscriberSocket();subscriber.Connect(tcp://localhost:5556);// 启动发布和订阅任务Task.Run(() StartPub());Task.Run(() StartSubscriber());}private void StartPub(){while (isRunning){try{// 使用 BlockingCollection 的 Take 方法获取消息Msg message MsgQueue.SendMessageQueue.Take();publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);}catch (Exception e){Debug.LogError(Error in StartPub: e.Message);}}publisherSocket?.Close(); // 确保发布套接字在退出时关闭}private void StartSubscriber(){subscriber.Subscribe();while (isRunning){try{// 通过超时和 TryReceiveFrameString 检查订阅消息if (subscriber.TryReceiveFrameString(out string topic)){byte[] bytes subscriber.ReceiveFrameBytes();MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));string str Encoding.Default.GetString(bytes);Debug.Log(${topic} {str});}Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环}catch (Exception e){Debug.LogError(Error in StartSubscriber: e.Message);}}subscriber.Close(); // 手动关闭订阅者套接字}private void OnDestroy(){isRunning false; // 设置标志位通知任务退出// 确保发布套接字关闭publisherSocket?.Close();NetMQConfig.Cleanup(); // 清理 NetMQ}
}这里有一个注意点就是zeroMq 不支持topic 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险也很奇怪但是貌似是官方推荐的做法不纠结就这样吧。
3、Mqtt功能实现
3.1 emqx介绍
mqtt也是一个消息队列主要是用在IOT简单来说就是一个TCP服务器消息头会小一些。适合不稳定的网络。我们用的是Emqx 还行挺好用。工具客户端使用的是mqttx下载地址https://mqttx.app/zh 官方网站https://www.emqx.com/zh
3.2 unity插件
这里使用的是官方推荐的插件库 sdk 介绍地址https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html
官方示例https://github.com/emqx/MQTT-Client-Examples
我这里使用的是https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库也不起作用不知道为毛盲猜是版本的问题打开上面库下载之后自己编译 因为用unity所以打开了mono的库 编译之后 将生成的dll拷贝到plugins下
3.3 代码实现
using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;public class MqttStarter : MonoBehaviour
{// Start is called before the first frame updatevoid Start(){string ipAddr 192.168.3.8;int emqxPort 1883;string clientId csharpclientid;//服务器默认密码是这个string username username;string password pwd;MqttClient client new MqttClient(ipAddr, emqxPort, false, null, null, MqttSslProtocols.None);// register to message receivedclient.MqttMsgPublishReceived client_MqttMsgPublishReceived;client.Connect(clientId,username,password);// client.Subscribe(new string[] { idse/cloud2veh/# }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));Debug.Log(mqtt-- e.Topic Encoding.Default.GetString(e.Message));}
}这里需要注意的是m2mqtt 会自动开启线程不需要单独实现线程。其他的都是常规操作。
4、验证
先看下zeromq的收发 验证下Mqtt的收发 5、Java代码zeromq的使用
pom中加入 dependencygroupIdorg.zeromq/groupIdartifactIdjeromq/artifactIdversion0.5.2/version/dependencyzeromq 发布者
package org.example;/*** Hello world!**/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;public class App {public static void main(String[] args) {int i 0;// 创建一个ZeroMQ的上下文try (ZContext context new ZContext()) {// 创建一个发布者socketZMQ.Socket publisher context.createSocket(ZMQ.PUB);// 绑定到指定的端口String address tcp://*:5556;publisher.bind(address);System.out.println(Publisher started at address);// 持续发送消息int messageNumber 0;while (!Thread.currentThread().isInterrupted()) {String message Message messageNumber;publisher.sendMore(t: (messageNumber%2));publisher.send(abc.getBytes());System.out.println(Sent: message);// 增加消息计数并休眠1秒messageNumber;Thread.sleep(1000);}// 关闭发布者socketpublisher.close();} catch (Exception e) {e.printStackTrace();}}
}zeromq 订阅者
package org.example;import org.zeromq.ZMQ;
import org.zeromq.ZContext;import java.nio.ByteBuffer;public class ZmqSubscriber {public static void main(String[] args) {// 创建一个ZeroMQ的上下文try (ZContext context new ZContext()) {// 创建一个订阅者socketZMQ.Socket subscriber context.createSocket(ZMQ.SUB);// 连接到发布者的地址String address tcp://localhost:5557; // 确保使用正确的地址subscriber.connect(address);System.out.println(Subscriber connected to address);// 订阅所有消息subscriber.subscribe(.getBytes()); // 订阅所有消息可以根据需要指定主题// 持续接收消息while (!Thread.currentThread().isInterrupted()) {// 接收消息String topic subscriber.recvStr(0);byte[] message subscriber.recv(0);ByteBuffer wrap ByteBuffer.wrap(message);if (message ! null) {System.out.println(Received: topic wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());}}// 关闭订阅者socketsubscriber.close();} catch (Exception e) {e.printStackTrace();}}
}
6、总结
第一次搞zeromq的消息队列和平常用的kafka和rocketmq 差的很多甚至完全不在同一个讨论方向。
还有一些需要研究 在unity中nuget的学习 c# 不同平台的学习 启动后台线程之后无法关闭导致unity死掉。
byte[] bytes BitConverter.GetBytes(66666);C#侧生成的bytes 是小端序列