多线程事务的回滚是怎么结合进去的?

特别说明countdownlatch
countdownlatch的用法
countdownlatch(num) 简单说明
主线程:mainthreadlatch.await() 和mainthreadlatch.countdown()
子线程:rollbacklatch.await() 和rollbacklatch.countdown()
为什么所有的子线程会在一瞬间就被所有都释放了?
事务的回滚是怎么结合进去的?
主线程类entry
子线程类workthread
代码实际运用踩坑!!!!
特别说明countdownlatch
countdownlatch是一个类springboot自带的类,可以直接用 ,变量atomicboolean 也是可以直接使用
基于 spring boot + mybatis plus + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/yunaiv/ruoyi-vue-pro
视频教程:https://doc.iocoder.cn/video/
countdownlatch的用法
countdownlatch典型用法:
1、某一线程在开始运行前等待n个线程执行完毕。 将countdownlatch的计数器初始化为new countdownlatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countdown(),当计数器的值变为0时,在countdownlatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
2、实现多个线程开始执行任务的最大并行性。 注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的countdownlatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countdown()时,计数器变为0,多个线程同时被唤醒。
基于 spring cloud alibaba + gateway + nacos + rocketmq + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/yunaiv/yudao-cloud
视频教程:https://doc.iocoder.cn/video/
countdownlatch(num) 简单说明
new 一个 countdownlatch(num) 对象
建立对象的时候 num 代表的是需要等待 num 个线程
// 建立对象的时候 num 代表的是需要等待 num 个线程//主线程countdownlatch mainthreadlatch = new countdownlatch(num);//子线程countdownlatch rollbacklatch  = new countdownlatch(1); 主线程:mainthreadlatch.await() 和mainthreadlatch.countdown()
新建对象
countdownlatch mainthreadlatch = new countdownlatch(num);
卡住主线程,让其等待子线程,代码mainthreadlatch.await(),放在主线程里
mainthreadlatch.await();
代码mainthreadlatch.countdown(),放在子线程里,每一个子线程运行一到这个代码,意味着countdownlatch(num),里面的num-1(自动减一)
mainthreadlatch.countdown();
countdownlatch(num)里面的num减到0,也就是countdownlatch(0),被卡住的主线程mainthreadlatch.await(),就会往下执行
子线程:rollbacklatch.await() 和rollbacklatch.countdown()
新建对象,特别注意:子线程这个num就是1(关于只能为1的解答在后面)
countdownlatch rollbacklatch  = new countdownlatch(1);
卡住子线程,阻止每一个子线程的事务提交和回滚
rollbacklatch.await();
代码rollbacklatch.countdown();放在主线程里,而且是放在主线程的等待代码mainthreadlatch.await();后面。
rollbacklatch.countdown();
为什么所有的子线程会在一瞬间就被所有都释放了?
事务的回滚是怎么结合进去的?
假设总共20个子线程,那么其中一个线程报错了怎么实现所有线程回滚。
引入变量
atomicboolean rollbackflag = new atomicboolean(false)
和字面意思是一样的:根据 rollbackflag 的true或者false 判断子线程里面,是否回滚。
首先我们确定的一点:rollbackflag 是所有的子线程都用着这一个判断
主线程类entry
package org.apache.dolphinscheduler.api.utils;import com.alibaba.fastjson.jsonarray;import com.alibaba.fastjson.jsonobject;import org.apache.dolphinscheduler.api.controller.workthread;import org.apache.dolphinscheduler.common.enums.dbtype;import org.springframework.web.bind.annotation.*;import java.text.simpledateformat;import java.util.arraylist;import java.util.date;import java.util.list;import java.util.timezone;import java.util.concurrent.countdownlatch;import java.util.concurrent.atomic.atomicboolean;@restcontroller@requestmapping(importdatabase)public class entry {    /**     * @param dbid 数据库的id     * @param tablename 表名     * @param sftpfilename 文件名称     * @param head 是否有头文件     * @param splitsign 分隔符     * @param type 数据库类型     */    private static string sftp_host = 192.168.1.92;    private static int sftp_port = 22;    private static string sftp_username = root;    private static string sftp_password = rootroot;    private static string sftp_basepath = /opt/testsftp/;    @postmapping(/thread)    @responsebody    public static jsonobject importdatabase(@requestparam(dbid) int dbid            ,@requestparam(tablename) string tablename            ,@requestparam(sftpfilename) string sftpfilename            ,@requestparam(head) string head            ,@requestparam(splitsign) string splitsign            ,@requestparam(type) dbtype type            ,@requestparam(heads) string heads            ,@requestparam(scolumns) string scolumns            ,@requestparam(tcolumns) string tcolumns ) throws exception {        jsonobject obforretrun = new jsonobject();        try {            jsonarray jsonarray = jsonarray.parsearray(tcolumns);            jsonarray scolumnarray = jsonarray.parsearray(scolumns);            jsonarray headsarray = jsonarray.parsearray(heads);            list listinteger = getrrightdatanum(headsarray,scolumnarray);            jsonarray bodys = sftputils.getsftpcontent(sftp_host,sftp_port,sftp_username,sftp_password,sftp_basepath,sftpfilename,head,splitsign);            int total  = bodys.size();            int num = 20; //一个批次的数据有多少            int count = total/num;//周期            int lastnum =total- count*num;//余数            list list = new arraylist();            simpledateformat sdf = new simpledateformat(hhss:ss);            timezone t = sdf.gettimezone();            t.setrawoffset(0);            sdf.settimezone(t);            long starttime=system.currenttimemillis();            int countforcountdownlatch = 0;            if(lastnum==0){//整除                countforcountdownlatch= count;            }else{                countforcountdownlatch= count + 1;            }            //子线程            countdownlatch rollbacklatch  = new countdownlatch(1);            //主线程            countdownlatch mainthreadlatch = new countdownlatch(countforcountdownlatch);            atomicboolean rollbackflag = new atomicboolean(false);            stringbuffer message = new stringbuffer();            message.append(报错信息:);            //子线程            for(int i=0;ipackage org.apache.dolphinscheduler.api.controller;import com.alibaba.fastjson.jsonarray;import com.alibaba.fastjson.jsonobject;import org.apache.dolphinscheduler.api.service.datasourceservice;import org.apache.dolphinscheduler.common.enums.dbtype;import org.apache.dolphinscheduler.dao.entity.datasource;import org.apache.dolphinscheduler.dao.mapper.datasourcemapper;import org.apache.dolphinscheduler.service.bean.springapplicationcontext;import org.springframework.transaction.platformtransactionmanager;import java.sql.connection;import java.sql.preparedstatement;import java.sql.sqlexception;import java.text.parseexception;import java.text.simpledateformat;import java.util.date;import java.util.list;import java.util.timezone;import java.util.concurrent.countdownlatch;import java.util.concurrent.atomic.atomicboolean;/** * 多线程 */public class workthread implements runnable{ //建立线程的两种方法 1 实现runnable 接口 2 继承 thread 类    private datasourceservice datasourceservice;    private datasourcemapper datasourcemapper;    private integer begin;    private integer end;    private string tablename;    private jsonarray columnarray;    private integer dbid;    private dbtype type;    private jsonarray bodys;    private  list listinteger;    private platformtransactionmanager transactionmanager;    private countdownlatch mainthreadlatch;    private countdownlatch rollbacklatch;    private atomicboolean rollbackflag;    private stringbuffer message;    /**     * @param i     * @param num     * @param tablefrom     * @param columnarrayfrom     * @param dbidfrom     * @param typefrom     */    public workthread(int i, int num, string tablefrom, jsonarray columnarrayfrom, int dbidfrom            , dbtype typefrom, jsonarray bodysfrom, list listintegerfrom            ,countdownlatch mainthreadlatch,countdownlatch rollbacklatch,atomicboolean rollbackflag            ,stringbuffer messagefrom) {        begin=i*num;        end=begin+num;        tablename = tablefrom;        columnarray = columnarrayfrom;        dbid = dbidfrom;        type = typefrom;        bodys = bodysfrom;        listinteger = listintegerfrom;        this.datasourcemapper = springapplicationcontext.getbean(datasourcemapper.class);        this.datasourceservice = springapplicationcontext.getbean(datasourceservice.class);        this.transactionmanager = springapplicationcontext.getbean(platformtransactionmanager.class);        this.mainthreadlatch = mainthreadlatch;        this.rollbacklatch = rollbacklatch;        this.rollbackflag = rollbackflag;        this.message = messagefrom;    }    public void run() {        datasource datasource = datasourcemapper.querydatasourcebyid(dbid);        string cp = datasource.getconnectionparams();        connection con=null;            con =  datasourceservice.getconnection(type,cp);        if(con!=null)        {            simpledateformat sdf = new simpledateformat(hhss:ss);            timezone t = sdf.gettimezone();            t.setrawoffset(0);            sdf.settimezone(t);            long starttime = system.currenttimemillis();            try {                con.setautocommit(false);//---------------------------- 获取字段和类型                string columnstring = null;//活动的字段                int intfortype = 0;                string type[] = new string[columnarray.size()];//类型集合                for(int i=0;i还记得这里有个一批次处理多少数据么,我这边设置了20,实际到运用中的时候客户给了个20w的数据,我批次设置为20,那就有1w个子线程!!!!
这还不是最糟糕的,最糟糕的是每个子线程都会创建一个数据库连接,数据库直接被我搞炸了
所以这里需要把:
int num = 20; //一个批次的数据有多少
改成:
int num = 20000; //一个批次的数据有多少


运放制作的4路抢答器电路图分解
稀土氧化物在MLCC中的应用
韩国SK电讯的5G发展之路探讨
产业园区的运营也从“1.0”到“4.0”正在不断的颠覆传统的思维模式
软件工程师如何走向成功
多线程事务的回滚是怎么结合进去的?
数字电视机顶盒信道解调芯片设计公司
半导体设备业兼并求生 迷雾中的希望
PC并行端口作为数字I/O口的应用
首款3nm芯片:苹果A17性能跑分出炉
物联网安全的薄弱点有哪些
中国家电企业向人工智能靠拢 智能家电领域未来将由中国企业引领世界
AR技术如何与工业4.0结合?
电解电容正负极如何辨别_电解电容的应用
Wireless Xpress如何实现零编程IoT?
腾讯云:不是为了物联网而做物联网
富士康IPO速度“堪比光速” 一路绿灯
电磁阀的原理和一些动态图
“硬着头皮”上的小米5C核心松果处理器,能复制海思的成功?
LED照明行业竞争日益加剧,*ST飞乐撤销退市风险警示