微网站建设第一步是进行什么的设置,合肥企业网站制作公司,酒店seo是什么意思,免费网络推广软件有哪些前言
本篇博客是一篇elasticsearch的使用案例#xff0c;包括结合MybatisPlus使用ES#xff0c;如何保证MySQL和es的数据一致性#xff0c;另外使用了RabbitMQ进行解耦#xff0c;自定义了发消息的方法。
其他相关的Elasticsearch的文章列表如下#xff1a; Elasticsear…
前言
本篇博客是一篇elasticsearch的使用案例包括结合MybatisPlus使用ES如何保证MySQL和es的数据一致性另外使用了RabbitMQ进行解耦自定义了发消息的方法。
其他相关的Elasticsearch的文章列表如下 Elasticsearch的Docker版本的安装和参数设置 端口开放和浏览器访问 Elasticsearch的可视化Kibana工具安装 IK分词器的安装和使用 Elasticsearch的springboot整合 Kibana进行全查询和模糊查询 目录 前言引出结合MybatisPlus使用ES1.引入依赖2.进行配置3.实体类上加入注解4.创建操作的 Repository5.初始化es中的数据6.进行全查询以及分页带条件分页查询 es和mysql的数据一致性延迟双删加锁的方式 用rabbitmq进行解耦配置yml文件rabbitmq的配置类callback回调方法自定义发消息工具类进行消息的发送接收到消息更新es 总结 引出 1.elasticsearch的使用案例包括结合MybatisPlus使用ES 2.如何保证MySQL和es的数据一致性 3.使用了RabbitMQ进行解耦自定义了发消息的方法。
结合MybatisPlus使用ES
1.引入依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-elasticsearch/artifactId/dependency!--mysql驱动--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdscoperuntime/scope/dependency!-- druid--dependencygroupIdcom.alibaba/groupIdartifactIddruid-spring-boot-starter/artifactId/dependency!-- springboot 整合mybaits plus --dependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency2.进行配置
package com.tianju.es.config;import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;/*** 你也可以不继承 AbstractElasticsearchConfiguration 类而将 ESConfig 写成一般的配置类的型式。* 不过继承 AbstractElasticsearchConfiguration 好处在于它已经帮我们配置好了elasticsearchTemplate 直接使用。*/
Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {Overridepublic RestHighLevelClient elasticsearchClient() {ClientConfiguration clientConfiguration ClientConfiguration.builder().connectedTo(192.168.111.130:9200).build();return RestClients.create(clientConfiguration).rest();}
}3.实体类上加入注解 package com.tianju.es.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;import java.math.BigDecimal;/*** 产品包括库存价格信息*/
Data
NoArgsConstructor
AllArgsConstructor
TableName(finance_sku)
Document(indexName finance_sku)
public class FinanceSkuES {TableId(value ID,type IdType.AUTO)private Long id;TableField(finance_sku_describe)Field(index true,analyzer ik_smart,searchAnalyzer ik_smart,type FieldType.Text)private String detail; // 详情TableField(finance_sku_price)private BigDecimal price;TableField(finance_sku_stock)private Long stock;TableField(finance_state)private Integer status;
}
参数解释
Document(indexName books, shards 1, replicas 0)
Data
public class Book {IdField(type FieldType.Integer)private Integer id;Field(type FieldType.Keyword)private String title;Field(type FieldType.Text)private String press;Field(type FieldType.Keyword)private String author;Field(type FieldType.Keyword,indexfalse)private BigDecimal price;Field(type FieldType.Text)private String description;
}Document 注解会对实体中的所有属性建立索引 indexName “books” 表示创建一个名称为 “books” 的索引 shards 1 表示只使用一个分片 replicas 0 表示不使用复制备份 index false 不能索引查询Field(type FieldType.Keyword) 用以指定字段的数据类型。
4.创建操作的 Repository 从它的祖先们那里继承了大量的现成的方法除此之外它还可以按 spring data 的规则定义特定的方法。 package com.tianju.es.mapper;import com.tianju.es.entity.FinanceSkuES;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;/*** 操作es类似于之前的mapper*/
Repository
public interface SkuESMapper extends ElasticsearchRepositoryFinanceSkuES, Long {/*** 根据关键字进行 分词 分页查询 sku数据* param detail 查询条件* param pageable 分页* return*/PageFinanceSkuES findFinanceSkuESByDetail(String detail, Pageable pageable);/*** 根据id进行删除* param id*/void removeFinanceSkuESById(Long id);}
5.初始化es中的数据 运行的后台信息 查看es页面的信息index management 6.进行全查询以及分页 进行全查询 {content: [{id: 1,detail: HUAWEI MateBook X Pro 2023 微绒典藏版 13代酷睿i7 32GB 2TB 14.2英寸3.1K原色全面屏 墨蓝,price: 13999.0,stock: 50,status: 1},{id: 2,detail: HUAWEI Mate 60 Pro 16GB1TB 宣白,price: 9999.0,stock: 60,status: 1},{id: 3,detail: iPhone 15 Pro Max 超视网膜 XDR 显示屏,price: 9299.0,stock: 46,status: 1},{id: 4,detail: MacBook Air Apple M2 芯片 8 核中央处理器 8 核图形处理器 8GB 统一内存 256GB 固态硬盘,price: 8999.0,stock: 60,status: 1}],pageable: {sort: {empty: true,sorted: false,unsorted: true},offset: 0,pageSize: 4,pageNumber: 0,paged: true,unpaged: false},totalElements: 4,last: true,totalPages: 1,number: 0,size: 4,sort: {empty: true,sorted: false,unsorted: true},numberOfElements: 4,first: true,empty: false
}带条件分页查询 注意分页查询的page从0开始尝试发现需要输入分词器分词后最小单元比如hu不是最小单元而HUAWEI是 分词器进行分词的结果 es和mysql的数据一致性
延迟双删 Overridepublic FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {// 把es看做是缓存如何保证es 和 mysql的 数据一致性// 延迟双删的模式// 1.先删除缓存 esskuESMapper.deleteAll();// 2.更新数据库 mysqlupdateById(financeSkuES);// 3.延时操作try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 4.再次删除缓存 esskuESMapper.deleteAll();// 5.最后更新缓存 esskuESMapper.saveAll(list());OptionalFinanceSkuES byId skuESMapper.findById(financeSkuES.getId());log.debug(byId: byId);return byId.get();}上面代码有不妥的地方我这里是修改结果一开始直接从es中全部删除应该是根据id把修改的数据删除然后把修改好的数据set进es里面
加锁的方式
感觉好像没什么用的样子就是用了一下加锁 用rabbitmq进行解耦 配置yml文件 spring:main:allow-circular-references: truedatasource:driver-class-name: com.mysql.cj.jdbc.Driver### 本地的数据库url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicodetruecharacterEncodingutf8serverTimezoneGMT%2B8allowMultiQueriestrueusername: rootpassword: 123# redis的相关配置redis:host: 119.3.162.127port: 6379database: 0password: Pet3927# rabbitmq相关rabbitmq:host: 192.168.111.130port: 5672username: adminpassword: 123virtual-host: /test# 生产者保证消息可靠性publisher-returns: truepublisher-confirm-type: correlated# 设置手动确认listener:simple:acknowledge-mode: manual
rabbitmq的配置类 将Java对象转换成json字符串传输 package com.tianju.es.rabbit;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {public static final String ES_EXCHANGE es_exchange;public static final String ES_QUEUE es_queue;public static final String ES_KEY es_key;Beanpublic DirectExchange directExchange(){return new DirectExchange(ES_EXCHANGE);}Beanpublic Queue esQueue(){return new Queue(ES_QUEUE);}Beanpublic Binding esQueueToDirectExchange(){return BindingBuilder.bind(esQueue()).to(directExchange()).with(ES_KEY);}/*** 将对象转换为json字符串* return*/Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());// 修改转换器return rabbitTemplate;}} callback回调方法
package com.tianju.es.rabbit;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** 生产者消息可靠性*/
// RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
Configuration
Slf4j
public class CallbackConfigimplements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {Autowiredprivate RabbitTemplate rabbitTemplate;// 初始化PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);rabbitTemplate.setMandatory(true);}/*** 不管成功或者失败都会执行* param correlationData correlation对象需要在 发送消息时候 给* param ack true表示成功false表示发送失败* param cause 如果失败的话会写失败原因如果成功返回为null*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.debug(ack是否成功ack);log.debug(cause信息cause);if (correlationData!null){JSONObject jsonObject JSON.parseObject(correlationData.getReturnedMessage().getBody());String exchange correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();String routingKey correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();log.debug(消息体jsonObject);log.debug(交换机exchange);log.debug(路由keyroutingKey);}if (ack){return;}// 失败了// 1、重试重试上限次数默认值5每重试一次时间间隔会增加// 2、把消息、交换机名称、路由键等相关的消息保存到数据库有一个程序定时扫描相关的消息然后重新发送消息。// 重发上限次数默认值5超过阈值会转人工处理// 2、把消息体、交换机名称、路由键等相关的消息保存到数据库有一个程序定时扫描相关的消息然后重新发送消息。// 重发上限次数默认值5超过阈值会转人工处理// 2.1需要把相关的信息存放到数据中表字段消息体、交换机名称、路由键、状态、次数// 2.2定时任务单体spring定时任务 分布式XxL-job),发送消息}/*** 只有失败了才会执行* param message* param replyCode* param replyText* param exchange* param routingKey*/Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 2、把消息、交换机名称、路由键等相关的消息保存到数据库有一个程序定时扫描相关的消息然后重新发送消息。}
}
自定义发消息工具类 package com.tianju.common.util;import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;Slf4j
public class RabbitUtil {/*** 延迟队列发送消息到达时间后进入死信队列中* param rabbitTemplate 调用的rabbitTemplate* param redisTemplate 用来在redis里面存token* param msg 发送的消息* param token 发送的token用于保证幂等性* param ttl 如果是延迟消费则消息的过期时间到达改时间后进入死信交换机到死信队列中* param exchange 交换机名字* param routingKey 路由键名字* param T 发送消息的实体类*/public static T void sendMsg(RabbitTemplate rabbitTemplate,StringRedisTemplate redisTemplate,T msg,String token,Integer ttl,String exchange,String routingKey) {log.debug(给交换机[{}]通过路由键[{}]发送消息 {}token为{},exchange,routingKey,msg,token);MessagePostProcessor messagePostProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {redisTemplate.opsForValue().set(token, token,5*60000);message.getMessageProperties().setMessageId(token);if (ttl!null){message.getMessageProperties().setExpiration(ttl.toString());}return message;}};CorrelationData correlationData new CorrelationData();// 消息体Message message new Message(JSON.toJSONBytes(msg));// 交换机名称message.getMessageProperties().setReceivedExchange(exchange);// 路由键message.getMessageProperties().setReceivedRoutingKey(routingKey);correlationData.setReturnedMessage(message);// 发送MQ消息rabbitTemplate.convertAndSend(exchange, // 发给交换机routingKey, // 根据这个routingKey就会给到TTL队列到时间成死信发给死信交换机到死信队列msg,messagePostProcessor,correlationData);}
}
进行消息的发送 接口 package com.tianju.es.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.tianju.es.entity.FinanceSkuES;public interface SkuService extends IServiceFinanceSkuES {/*** 延迟双删的方式保证es 缓存 和 mysql数据库的数据一致性* param financeSkuES 修改的数据* return*/FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);/*** 加锁的方式不过感觉没啥用的样子* param financeSkuES* return*/FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);/*** 通过rabbitmq进行解耦* param financeSkuES* return*/String updateByIdRabbitMQ(FinanceSkuES financeSkuES);
} 实现类 package com.tianju.es.service.impl;import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tianju.common.util.RabbitUtil;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import com.tianju.es.mapper.SkuMybatisPlusMapper;
import com.tianju.es.rabbit.RabbitConfig;
import com.tianju.es.service.SkuService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.util.Collection;
import java.util.Optional;
import java.util.UUID;Service
public class SkuServiceImpl extends ServiceImplSkuMybatisPlusMapper,FinanceSkuESimplements SkuService {Autowiredprivate SkuESMapper skuESMapper;Autowiredprivate StringRedisTemplate stringRedisTemplate;Autowiredprivate RabbitTemplate rabbitTemplate;Overridepublic FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {// 把es看做是缓存如何保证es 和 mysql的 数据一致性// 延迟双删的模式// 1.先删除缓存 esskuESMapper.deleteAll();// 2.更新数据库 mysqlupdateById(financeSkuES);// 3.延时操作try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 4.再次删除缓存 esskuESMapper.deleteAll();// 5.最后更新缓存 esskuESMapper.saveAll(list());OptionalFinanceSkuES byId skuESMapper.findById(financeSkuES.getId());log.debug(byId: byId);return byId.get();}Overridepublic FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) {// 第二种方式加锁String uuid UUID.randomUUID().toString();// 相当于setnx指令Boolean skuLock stringRedisTemplate.opsForValue().setIfAbsent(skuLock, uuid);try {if (skuLock){ // 抢到了锁skuESMapper.deleteAll();updateById(financeSkuES);}}finally {if (uuid.equals(stringRedisTemplate.opsForValue().get(skuLock))){stringRedisTemplate.delete(skuLock);}}skuESMapper.saveAll(list());OptionalFinanceSkuES byId skuESMapper.findById(financeSkuES.getId());log.debug(byId: byId);return byId.get();}Overridepublic String updateByIdRabbitMQ(FinanceSkuES financeSkuES) {// 采用rabbitmq进行解耦updateById(financeSkuES);FinanceSkuES skuES getById(financeSkuES.getId());String uuid IdUtil.fastUUID();RabbitUtil.sendMsg(rabbitTemplate,stringRedisTemplate,skuES,uuid,null,RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY);return 已经发送消息skuES;}
} 接收到消息更新es
接收到消息进行es的更新把原来的删除把最新的set进去 package com.tianju.es.rabbit;import com.rabbitmq.client.Channel;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;Slf4j
Component
public class ESListener {Autowiredprivate StringRedisTemplate redisTemplate;Autowiredprivate SkuESMapper skuESMapper;RabbitListener(queues RabbitConfig.ES_QUEUE)public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {String messageId message.getMessageProperties().getMessageId();log.debug(进行业务---- 监听到队列{}的消息messageId为{},financeSkuES,messageId);try {// 幂等性if (redisTemplate.delete(messageId)){// 根据id删除原有的 es 数据// 然后把新的数据set进来log.debug(处理es的业务删除原有的替换最新的);skuESMapper.removeFinanceSkuESById(financeSkuES.getId());skuESMapper.save(financeSkuES);}// 手动签收消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (Exception e){// 幂等性redisTemplate.opsForValue().set(messageId,messageId,5*60000);// 1、重试重试上限次数默认值5 每重试一次时间间隔会增加// 2、把消息、交换机名称、路由键等相关的消息保存到数据库有一个程序定时扫描相关的消息然后重新发送消息。// 重发上限次数默认值5超过阈值会转人工处理// 已知的消息交换机路由器消息 message.getBody() 消息发送给的是监听的队列try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException ex) {throw new RuntimeException(ex);}}}
} 后台打印的日志 总结
1.elasticsearch的使用案例包括结合MybatisPlus使用ES 2.如何保证MySQL和es的数据一致性 3.使用了RabbitMQ进行解耦自定义了发消息的方法。