网站不备案可以建设吗,公司做网站最低需用多少钱,国内十大erp系统,微电商平台多线程事务回滚方法 介绍案例演示线程池配置异常类实体类控制层业务层mapper工具类验证 解决方案使用sqlSession控制手动提交事务SqlSessionTemplate注入容器中改造业务层验证成功操作示例业务层改造 介绍
1.最近有一个大数据量插入的操作入库的业务场景#xff0c;需要先做一… 多线程事务回滚方法 介绍案例演示线程池配置异常类实体类控制层业务层mapper工具类验证 解决方案使用sqlSession控制手动提交事务SqlSessionTemplate注入容器中改造业务层验证成功操作示例业务层改造 介绍
1.最近有一个大数据量插入的操作入库的业务场景需要先做一些其他修改操作然后在执行插入操作由于插入数据可能会很多用到多线程去拆分数据并行处理来提高响应时间如果有一个线程执行失败则全部回滚。
2.在spring中可以使用Transactional注解去控制事务使出现异常时会进行回滚在多线程中这个注解则不会生效如果主线程需要先执行一些修改数据库的操作当子线程在进行处理出现异常时主线程修改的数据则不会回滚导致数据错误。
案例演示
下面是事务不成功案例演示。
线程池配置
package com.mry.rollback.config;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 线程池配置*/
public class ExecutorConfig {private static int maxPoolSize Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService null){synchronized (ExecutorConfig.class){if (executorService null){executorService newThreadPool();}}}return executorService;}private static ExecutorService newThreadPool(){int queueSize 500;int corePool Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}}
异常类
package com.mry.rollback.exception;import lombok.Data;/*** 异常类*/
Data
public class ServiceException extends RuntimeException {private static final long serialVersionUID 1L;private String msg;private int code 500;public ServiceException(String msg) {super(msg);this.msg msg;}public ServiceException(String msg, Throwable e) {super(msg, e);this.msg msg;}public ServiceException(String msg, int code) {super(msg);this.msg msg;this.code code;}public ServiceException(String msg, int code, Throwable e) {super(msg, e);this.msg msg;this.code code;}}实体类
package com.mry.rollback.entity;import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.ToString;
import java.util.Date;ToString
Data
TableName(employee)
public class Employee {private Integer employeeId;private Integer age;private String employeeName;private Date birthDate;private Integer gender;private String idNumber;private Date createTime;private Date updateTime;private Integer status;
}控制层
package com.mry.rollback.controller;import com.mry.rollback.entity.Employee;
import com.mry.rollback.service.EmployeeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;Slf4j
RestController
RequestMapping(/sys)
public class EmployeeController {AutowiredEmployeeService employeeService;GetMapping(/add)public String batchAddEmployee(){int size 10;ListEmployee employeeDOList new ArrayList(size);for (int i 0; isize;i){Employee employee new Employee();employee.setEmployeeName(loli);employee.setAge(18);employee.setGender(1);employee.setBirthDate(Calendar.getInstance().getTime());employee.setIdNumber(iXX);employee.setStatus(1);employee.setCreateTime(Calendar.getInstance().getTime());employee.setUpdateTime(Calendar.getInstance().getTime());employeeDOList.add(employee);}try {employeeService.saveThread(employeeDOList);System.out.println(添加成功);}catch (Exception e){e.printStackTrace();}return 添加成功;}}
业务层
package com.mry.rollback.service;import com.mry.rollback.entity.Employee;
import java.util.List;public interface EmployeeService {public void saveThread(ListEmployee employeeList);
}
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;Slf4j
Service(employeeService)
public class EmployeeServiceImpl implements EmployeeService {AutowiredEmployeeMapper employeeMapper;OverrideTransactionalpublic void saveThread(ListEmployee employeeList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚employeeMapper.delete(null);//获取线程池ExecutorService service ExecutorConfig.getThreadPool();//拆分数据,拆分5份ListListEmployee lists ThreadUtil.averageAssign(employeeList, 5);//执行的线程Thread []threadArray new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch new CountDownLatch(lists.size());AtomicBoolean atomicBoolean new AtomicBoolean(true);for (int i 0;ilists.size();i){if (ilists.size()-1){atomicBoolean.set(false);}ListEmployee list lists.get(i);threadArray[i] new Thread(() - {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException(出现异常,001);}//批量添加,mybatisPlus中自带的batch方法employeeMapper.saveBatchEmployee(list);}finally {countDownLatch.countDown();}});}for (int i 0; i lists.size(); i){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println(添加完毕);}catch (Exception e){log.info(error,e);throw new ServiceException(出现异常,002);}finally {//connection.close();}}
}
mapper
package com.mry.rollback.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.mry.rollback.entity.Employee;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;Repository
public interface EmployeeMapper extends BaseMapperEmployee {Insert(script insert into employee (age, employee_name, birth_date, gender, id_number, create_time, update_time, status) values foreach collectionemployeeList itememployee indexindex separator, (#{employee.age}, #{employee.employeeName}, #{employee.birthDate}, #{employee.gender}, #{employee.idNumber}, #{employee.createTime}, #{employee.updateTime}, #{employee.status}) /foreach /script)public void saveBatchEmployee(Param(employeeList) ListEmployee employeeList);}
工具类
package com.mry.rollback.util;import java.util.ArrayList;
import java.util.List;public class ThreadUtil {/*** 平均拆分list方法.* param source* param n* param T* return*/public static T ListListT averageAssign(ListT source,int n){ListListT resultnew ArrayListListT();int remaidersource.size()%n;int numbersource.size()/n;int offset0;//偏移量for(int i0;in;i){ListT valuenull;if(remaider0){valuesource.subList(i*numberoffset, (i1)*numberoffset1);remaider--;offset;}else{valuesource.subList(i*numberoffset, (i1)*numberoffset);}result.add(value);}return result;}}
验证
1.数据库中存在一条数据 2.请求接口http://127.0.0.1:8866/sys/add 3.控制信息 4.数据库信息 注意可以发现子线程组执行时有一个线程执行失败其他线程也会抛出异常但是主线程中执行的删除操作没有回滚Transactional注解没有生效。
解决方案
使用sqlSession控制手动提交事务
SqlSessionTemplate注入容器中
package com.mry.rollback.config;import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;/*** 获取sqlSession*/
Component
public class SqlContext {Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}}
改造业务层
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.config.SqlContext;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;Slf4j
Service(employeeService)
public class EmployeeServiceImpl implements EmployeeService {AutowiredEmployeeMapper employeeMapper;ResourceSqlContext sqlContext;Overridepublic void saveThread(ListEmployee employeeList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession sqlContext.getSqlSession();Connection connection sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service ExecutorConfig.getThreadPool();ListCallableInteger callableList new ArrayList();//拆分listListListEmployee lists ThreadUtil.averageAssign(employeeList, 5);AtomicBoolean atomicBoolean new AtomicBoolean(true);for (int i 0;ilists.size();i){if (ilists.size()-1){atomicBoolean.set(false);}ListEmployee list lists.get(i);//使用返回结果的callable去执行,CallableInteger callable () - {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException(出现异常,001);}return employeeMapper.saveBatchEmployee(list);};callableList.add(callable);}//执行子线程ListFutureInteger futures service.invokeAll(callableList);for (FutureInteger future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()0){connection.rollback();return;}}connection.commit();System.out.println(添加完毕);}catch (Exception e){connection.rollback();log.info(error,e);throw new ServiceException(出现异常,002);}finally {connection.close();}}
}
验证
1.数据库中存在一条数据
2.请求接口http://127.0.0.1:8877/sys/add 3.控制信息
4.数据库信息 注意删除操作的数据回滚了数据库中的数据依旧存在说明事务成功了。
成功操作示例业务层改造
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.config.SqlContext;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;Slf4j
Service(employeeService)
public class EmployeeServiceImpl implements EmployeeService {AutowiredEmployeeMapper employeeMapper;ResourceSqlContext sqlContext;Overridepublic void saveThread(ListEmployee employeeList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession sqlContext.getSqlSession();Connection connection sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service ExecutorConfig.getThreadPool();ListCallableInteger callableList new ArrayList();ListListEmployee listsThreadUtil.averageAssign(employeeList, 5);for (int i 0;ilists.size();i){ListEmployee list lists.get(i);CallableInteger callable () - employeeMapper.saveBatchEmployee(list);callableList.add(callable);}//执行子线程ListFutureInteger futures service.invokeAll(callableList);for (FutureInteger future:futures) {if (future.get()0){connection.rollback();return;}}connection.commit();System.out.println(添加完毕);}catch (Exception e){connection.rollback();log.info(error,e);throw new ServiceException(出现异常,002);// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}}}
控制台日志输出
数据库中数据
注意删除的删除了添加的添加成功了测试成功。