网站全站开发,wordpress timestamp,快速收录网,大连网站制作的公司哪家好背景介绍
1#xff0c;最近有一个大数据量插入的操作入库的业务场景#xff0c;需要先做一些其他修改操作#xff0c;然后在执行插入操作#xff0c;由于插入数据可能会很多#xff0c;用到多线程去拆分数据并行处理来提高响应时间#xff0c;如果有一个线程执行失败最近有一个大数据量插入的操作入库的业务场景需要先做一些其他修改操作然后在执行插入操作由于插入数据可能会很多用到多线程去拆分数据并行处理来提高响应时间如果有一个线程执行失败则全部回滚。
2在spring中可以使用Transactional注解去控制事务使出现异常时会进行回滚在多线程中这个注解则不会生效如果主线程需要先执行一些修改数据库的操作当子线程在进行处理出现异常时主线程修改的数据则不会回滚导致数据错误。
3下面用一个简单示例演示多线程事务。
公用的类和方法
/*** 平均拆分list方法.* param source* param n* param T* return*/
public static T ListListT averageAssign(ListT source,int n){ListListT resultnew ArrayListListT();int remaidersource.size()%n; int numbersource.size()/n; int offset0;//偏移量for(int i0;in;i){ListT valuenull;if(remaider0){valuesource.subList(i*numberoffset, (i1)*numberoffset1);remaider--;offset;}else{valuesource.subList(i*numberoffset, (i1)*numberoffset);}result.add(value);}return result;
}
/** 线程池配置* version V1.0*/
public class ExecutorConfig {private static int maxPoolSize Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService null){synchronized (ExecutorConfig.class){if (executorService null){executorService newThreadPool();}}}return executorService;}private static ExecutorService newThreadPool(){int queueSize 500;int corePool Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
/** 获取sqlSession* author 86182* version V1.0*/
Component
public class SqlContext {Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}
示例事务不成功操作
/*** 测试多线程事务.* param employeeDOList*/
Override
Transactional
public void saveThread(ListEmployeeDO employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service ExecutorConfig.getThreadPool();//拆分数据,拆分5份ListListEmployeeDO listsaverageAssign(employeeDOList, 5);//执行的线程Thread []threadArray new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch new CountDownLatch(lists.size());AtomicBoolean atomicBoolean new AtomicBoolean(true);for (int i 0;ilists.size();i){if (ilists.size()-1){atomicBoolean.set(false);}ListEmployeeDO list lists.get(i);threadArray[i] new Thread(() - {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException(001,出现异常);}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i 0; i lists.size(); i){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println(添加完毕);}catch (Exception e){log.info(error,e);throw new ServiceException(002,出现异常);}finally {connection.close();}
}
数据库中存在一条数据 //测试用例
RunWith(SpringRunner.class)
SpringBootTest(classes { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {Resourceprivate EmployeeBO employeeBO;/*** 测试多线程事务.* throws InterruptedException*/Testpublic void MoreThreadTest2() throws InterruptedException {int size 10;ListEmployeeDO employeeDOList new ArrayList(size);for (int i 0; isize;i){EmployeeDO employeeDO new EmployeeDO();employeeDO.setEmployeeName(loli);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(iXX);employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println(添加成功);}catch (Exception e){e.printStackTrace();}}
}
测试结果 可以发现子线程组执行时有一个线程执行失败其他线程也会抛出异常但是主线程中执行的删除操作没有回滚Transactional注解没有生效。 使用sqlSession控制手动提交事务 ResourceSqlContext sqlContext;/*** 测试多线程事务.* param employeeDOList*/
Override
public void saveThread(ListEmployeeDO employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession sqlContext.getSqlSession();Connection connection sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service ExecutorConfig.getThreadPool();ListCallableInteger callableList new ArrayList();//拆分listListListEmployeeDO listsaverageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean new AtomicBoolean(true);for (int i 0;ilists.size();i){if (ilists.size()-1){atomicBoolean.set(false);}ListEmployeeDO list lists.get(i);//使用返回结果的callable去执行,CallableInteger callable () - {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException(001,出现异常);}return employeeMapper.saveBatch(list);};callableList.add(callable);}//执行子线程ListFutureInteger futures service.invokeAll(callableList);for (FutureInteger future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()0){connection.rollback();return;}}connection.commit();System.out.println(添加完毕);}catch (Exception e){connection.rollback();log.info(error,e);throw new ServiceException(002,出现异常);}finally {connection.close();}
}
// sql
insert idsaveBatch parameterTypeListINSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)valuesforeach collectionlist itemitem indexindex separator,(#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status})/foreach/insert
数据库中一条数据 测试结果抛出异常 删除操作的数据回滚了数据库中的数据依旧存在说明事务成功了。 成功操作示例 Resource
SqlContext sqlContext;
/*** 测试多线程事务.* param employeeDOList*/
Override
public void saveThread(ListEmployeeDO employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession sqlContext.getSqlSession();Connection connection sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service ExecutorConfig.getThreadPool();ListCallableInteger callableList new ArrayList();ListListEmployeeDO listsaverageAssign(employeeDOList, 5);for (int i 0;ilists.size();i){ListEmployeeDO list lists.get(i);CallableInteger callable () - employeeMapper.saveBatch(list);callableList.add(callable);}//执行子线程ListFutureInteger futures service.invokeAll(callableList);for (FutureInteger future:futures) {if (future.get()0){connection.rollback();return;}}connection.commit();System.out.println(添加完毕);}catch (Exception e){connection.rollback();log.info(error,e);throw new ServiceException(002,出现异常);// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}
}
测试结果
数据库中数据
删除的删除了添加的添加成功了测试成功。