wordpress站长之家,网站建设业务介绍,做下载网站挣钱吗,图片式网站利于做优化吗情况说明
在本地和服务器分别启动im服务#xff0c;当本地发送消息时#xff0c;会发现服务器上并没有收到消息 初版im只支持单机版#xff0c;不支持分布式的情况。此次针对该情况对项目进行优化,文档中贴出的代码非完整代码#xff0c;可自行查看参考资料[2]
代码结构调…情况说明
在本地和服务器分别启动im服务当本地发送消息时会发现服务器上并没有收到消息 初版im只支持单机版不支持分布式的情况。此次针对该情况对项目进行优化,文档中贴出的代码非完整代码可自行查看参考资料[2]
代码结构调整
本次调整需要增加一个redis的渠道为了方便后续再进行渠道的增加对现有代码结构进行调整
IBaseSendExecutor
渠道扩充接口后续再增加渠道都可以实现该接口
package com.example.im.infra.executor.send;/*** author PC* 通信处理*/
public interface IBaseSendExecutor {/*** 获取通信类型预置的有默认和redis** return 通讯类型*/String getCommunicationType();/*** 发送给指定人** param sendUserName 发送人* param message 消息*/void sendToUser(String sendUserName, String message);/*** 发送给全部人** param sendUserName 发送人* param message 消息*/void sendToAll(String sendUserName, String message);
}AbstractBaseSendExecutor
通信处理抽象类将一些预定义的渠道所需要的公有方法提取出来
package com.example.im.infra.executor.send;import com.example.im.config.WebSocketProperties;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** author PC*/
public abstract class AbstractBaseSendExecutor implements IBaseSendExecutor {protected WebSocketProperties webSocketProperties;Autowiredpublic void setWebSocketProperties(WebSocketProperties webSocketProperties) {this.webSocketProperties webSocketProperties;}/*** 获取接收人信息** param sendUserName 发送人* param message 消息* return 接收人列表*/protected ListString getReceiverName(String sendUserName, String message) {if (!StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {return new ArrayList();}String[] names StringUtils.split(message, webSocketProperties.getReceiverSeparator());return Stream.of(names).skip(1).filter(receiver -!(webSocketProperties.getReceiverExcludesHimselfFlag() StringUtils.equals(sendUserName, receiver))).collect(Collectors.toList());}/*** 根据配置处理发送的信息** param message 原消息* return 被处理后的消息*/protected String generatorMessage(String message) {return BooleanUtils.isTrue(webSocketProperties.getExcludeReceiverInfoFlag()) ?StringUtils.substringBefore(message, webSocketProperties.getReceiverSeparator()) : message;}
}DefaultSendExecutor
原有消息发送逻辑
package com.example.im.infra.executor.send;import com.example.im.endpoint.WebSocketEndpoint;
import com.example.im.infra.constant.ImConstants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;/*** author PC* 默认执行*/
Component
public class DefaultSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger LoggerFactory.getLogger(DefaultSendExecutor.class);private TaskExecutor taskExecutor;Autowiredpublic void setTaskExecutor(TaskExecutor taskExecutor) {this.taskExecutor taskExecutor;}Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.DEFAULT;}Overridepublic void sendToUser(String sendUserName, String message) {ListString receiverNameList getReceiverName(sendUserName, message);CountDownLatch countDownLatch new CountDownLatch(receiverNameList.size());SetString notOnlineReceiverSet ConcurrentHashMap.newKeySet();SetString finalNotOnlineReceiverSet notOnlineReceiverSet;receiverNameList.forEach(receiverName - taskExecutor.execute(() - {try {if (WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.containsKey(receiverName)) {WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverName).getSession().getBasicRemote().sendText(generatorMessage(message));} else {finalNotOnlineReceiverSet.add(receiverName);}} catch (IOException ioException) {logger.error(send error: ioException);} finally {countDownLatch.countDown();}}));try {countDownLatch.await();} catch (InterruptedException interruptedException) {logger.error(error.countDownLatch.await);}notOnlineReceiverSet notOnlineReceiverSet.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toSet());if (CollectionUtils.isNotEmpty(notOnlineReceiverSet)) {logger.info(not online number is notOnlineReceiverSet.size());logger.info(The user : {} is not online, String.join(,, notOnlineReceiverSet));}}Overridepublic void sendToAll(String sendUserName, String message) {for (Map.EntryString, WebSocketEndpoint webSocketEndpointEntry : WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.entrySet()) {taskExecutor.execute(() - {if (webSocketProperties.getReceiverExcludesHimselfFlag() StringUtils.equals(sendUserName, webSocketEndpointEntry.getKey())) {return;}try {webSocketEndpointEntry.getValue().getSession().getBasicRemote().sendText(generatorMessage(message));} catch (IOException ioException) {logger.error(send error: ioException);}});}}
}SendExecutorFactory
发送渠道工厂
package com.example.im.infra.executor.send;import com.example.im.config.WebSocketProperties;
import com.example.im.infra.executor.config.ExecutorConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Optional;/*** author PC* 发送逻辑工厂*/
Component
public class SendExecutorFactory {private final WebSocketProperties webSocketProperties;private ExecutorConfiguration executorConfiguration;Autowiredpublic SendExecutorFactory(WebSocketProperties webSocketProperties) {this.webSocketProperties webSocketProperties;}Autowiredpublic void setExecutorConfiguration(ExecutorConfiguration executorConfiguration) {this.executorConfiguration executorConfiguration;}public void onMessage(String sendUserName, String message) {IBaseSendExecutor iBaseSendExecutor Optional.ofNullable(executorConfiguration.getBaseSendExecutorMap().get(webSocketProperties.getCommunicationType())).orElse(new DefaultSendExecutor());//包含发给指定人否则发给全部人if (StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {iBaseSendExecutor.sendToUser(sendUserName, message);} else {iBaseSendExecutor.sendToAll(sendUserName, message);}}
}ExecutorConfiguration
加载
package com.example.im.infra.executor.config;import com.example.im.infra.executor.send.IBaseSendExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** author PC* Executor配置*/
Component
public class ExecutorConfiguration implements ApplicationContextAware {private final static Logger logger LoggerFactory.getLogger(ExecutorConfiguration.class);private MapString, IBaseSendExecutor baseSendExecutorMap new HashMap(16);private static ApplicationContext applicationContext;Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {ExecutorConfiguration.applicationContext applicationContext;//加载IBaseSendExecutor实现类this.initBaseSendExecutor(applicationContext);}/*** 加载IBaseSendExecutor实现类* 如果一个服务的发送渠道是固定的可以使用Bean搭配ConditionalOnProperty的方式* 但是考虑到后续可能会有一个服务不同发送渠道的场景采用当前加载方式** param applicationContext 上下文*/private void initBaseSendExecutor(ApplicationContext applicationContext) {logger.info(Start loading IBaseSendExecutor);MapString, IBaseSendExecutor baseSendExecutorMap applicationContext.getBeansOfType(IBaseSendExecutor.class);for (Map.EntryString, IBaseSendExecutor iBaseSendExecutorEntry : baseSendExecutorMap.entrySet()) {String communicationType iBaseSendExecutorEntry.getValue().getCommunicationType();this.baseSendExecutorMap.put(communicationType, iBaseSendExecutorEntry.getValue());logger.info(initBaseSendExecutorcommunicationType:{},className:{}, communicationType, iBaseSendExecutorEntry.getValue().getClass().getName());}logger.info(IBaseSendExecutor loading is complete);}public static ApplicationContext getApplicationContext() {return applicationContext;}public MapString, IBaseSendExecutor getBaseSendExecutorMap() {return baseSendExecutorMap;}public void setBaseSendExecutorMap(MapString, IBaseSendExecutor baseSendExecutorMap) {this.baseSendExecutorMap baseSendExecutorMap;}
}添加redis通信渠道
pom.xml
dependencygroupIdorg.springframework.data/groupIdartifactIdspring-data-redis/artifactId
/dependencydependencygroupIdredis.clients/groupIdartifactIdjedis/artifactId
/dependency
application.yml
server:port: 18080
cus:ws:exclude-receiver-info-flag: truereceiver-excludes-himself-flag: truecommunication-type: redis
spring:redis:host: 127.0.0.1port: 6379username: rootpassword: rootdatabase: ${SPRING_REDIS_DATABASE:1}# Redis连接超时时间connect-timeout: ${SPRING_REDIS_CONNECT_TIMEOUT:2000}# Redis读取超时时间timeout: ${SPRING_REDIS_READ_TIMEOUT:5000}lettuce:pool:# 资源池中最大连接数# 默认8-1表示无限制可根据服务并发redis情况及服务端的支持上限调整max-active: ${SPRING_REDIS_POOL_MAX_ACTIVE:50}# 资源池运行最大空闲的连接数# 默认8-1表示无限制可根据服务并发redis情况及服务端的支持上限调整一般建议和max-active保持一致避免资源伸缩带来的开销max-idle: ${SPRING_REDIS_POOL_MAX_IDLE:50}# 当资源池连接用尽后调用者的最大等待时间(单位为毫秒)# 默认 -1 表示永不超时设置5秒max-wait: ${SPRING_REDIS_POOL_MAX_WAIT:5000}RedisSendExecutor
redis发送
package com.example.im.infra.executor.send.redis;import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import com.example.im.infra.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;/*** author PC* redis执行*/
Component
public class RedisSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger LoggerFactory.getLogger(RedisSendExecutor.class);private RedisTemplateString, String redisTemplate;Autowiredpublic void setRedisTemplate(RedisTemplateString, String redisTemplate) {this.redisTemplate redisTemplate;}Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.REDIS;}Overridepublic void sendToUser(String sendUserName, String message) {MessageInfo messageInfo new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);logger.debug(send to user redis websocket, channel is redis-websocket);redisTemplate.convertAndSend(redis-websocket-user, JsonUtils.toJson(messageInfo));}Overridepublic void sendToAll(String sendUserName, String message) {MessageInfo messageInfo new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);logger.debug(send to all redis websocket, channel is redis-websocket);redisTemplate.convertAndSend(redis-websocket-all, JsonUtils.toJson(messageInfo));}
}RedisMessageListener
redis监听
package com.example.im.infra.executor.send.redis;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** author PC* redis监听*/
Component
public class RedisMessageListener implements MessageListener {private final static Logger logger LoggerFactory.getLogger(RedisMessageListener.class);private DefaultSendExecutor defaultSendExecutor;Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor defaultSendExecutor;}Overridepublic void onMessage(Message message, byte[] pattern) {//消息内容String messageJson new String(message.getBody(), StandardCharsets.UTF_8);MessageInfo messageInfo JsonUtils.toObjectByTypeReference(messageJson, new TypeReferenceMessageInfo() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况除非用户覆盖了ScopeOfSending后续可以开个扩展发送范围的口子logger.warn(invalid sending range: messageInfo.getScopeOfSending().getScopeCode());break;}}
}测试
本地服务发送消息 服务器接收到了消息 常见问题
打包报错
执行mvn clean packages打包时出现以下错误
[ERROR] contextLoads Time elapsed: 0.001 s ERROR!
java.lang.IllegalStateException: Failed to load ApplicationContext
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name serverEndpoint defined in class path resource [c
om/example/im/config/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available
Caused by: java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available查看ServerContainer接口发现其有两个接口实现类其中有一个是test包的 将其排除后即可正常打包 jar包启动时no main manifest attribute问题
需将pom的plugin标签中的skip标签删除或设置为false 参考资料
[1].初版im文档
[2].im项目地址