线程池批量插入大量数据¶
场景介绍¶
需求就是有大量的数据需要通过一个接口保存到数据库中,以100万数据量为例子,首先就是正常导入,写一个接口,单线程批量导入, 这都还好,技术上面的要求不高,事务控制都很简单。但是单线程导入大量数据,要考虑到内存影响和时间的效率,单线程资源开销肯定很大。 以MyBatis-Plus框架的批量插入为例,100万数据插入,表只有2个字段,带一个主键ID,用时30s左右,那这个接口的响应就相对较慢。
那正常的思路就是使用多线程
去优化,将数据分给每个线程,分别去插入,同时根据业务情况还可以考虑做成异步。由此业务总结一下相关技术。
1.线程相关知识点¶
首先先讲一些理论相关的东西。
1.1首先什么是进程,什么是线程呢?¶
进程就是操作系统正在运行的程序,它代表了程序所占用的内存区域,一个进程可以有多个线程,比如打开Windows系统的任务管理器,打开进程卡, 这里面一个个的都是属于进程。
而线程是操作系统OS能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位,一个进程可以开启多个线程,其中有一个主线程来调用本进程中的其他线程,我们看到的进程的切换,切换的也是不同进程的主线程,多线程扩展了多进程的概念,使的同一个进程可以同时并发处理多个任务。
那么两者是什么关系?一个操作系统中可以有多个进程,一个进程中可以包含一个线程(单线程程序),也可以包含多个线程(多线程程序),每个线程在共享同一个进程中的内存的同时,又有自己独立的内存空间。 所以想使用线程技术,得先有进程,进程的创建是OS操作系统来创建的,一般都是C或者C++完成。
同时操作系统还有协程的概念,Java最近几年为了对标Golang语言的协程优势,推出了虚拟线程,这些就不展开讲了。
1.2多线程特性¶
多进程其实就是一个个运行起来的软件,在技术上归根还是要回到多线程的理解上。那多线程是什么?就好比你自己一个人开车干活搬运货物, 一辆车一个一个运固然可以把活干完,但是未免效率太低,活干的太慢了,如果可以两个人一起开车运,是不是一下就减少了工作时间,这也是多线程的思路, 2个或者多个线程一起处理任务,就是多线程,核心思想就是多个处理一个,目的是为了提高效率。
随机性¶
在计算机里面,它有哪些特性?有一点就是随机性:
插一嘴
之前给软件写一个进度条的功能,是给导入文件的导入进度做展示,分子是当前已经执行的进度,分母是程序预计执行的时间。 那么这个功能分子使其很好获取,难点就是分母,因为你无法去预测一段代码的执行时间,同样一段代码每一次执行花费的时间都是不一样的, 归根就是因为CPU时钟信号是随机的,你无法保证它每一次执行的每一次随机都是一样的。最后就是只能写一个假动画,略作展示。
CPU分时调度¶
CPU分配给各个线程的一个时间段,称作它的时间片,即该线程被允许运行的时间,如果在时间片用完时线程还在执行,那CPU将被剥夺并分配给另一个线程,
将当前线程挂起,如果线程在时间片用完之前阻塞或结束,则CPU当即进行切换,从而避免CPU资源浪费,当再次切换到之前挂起的线程,恢复现场,继续执行。
注意:我们无法控制OS选择执行哪些线程,OS底层有自己规则,如:FCFS
(First Come First Service 先来先服务算法) SJS
(Short Job Service短服务算法)
线程的状态(三态模型)¶
由于线程状态比较复杂,我们由易到难,先搞清楚线程的三种基础状态及其转换,简称“三态模型” :
① 就绪(可运行)状态:线程已经准备好运行,只要获得CPU调度,就可立即执行
② 执行(运行)状态:线程已经获得CPU,其程序正在运行的状态
③ 阻塞状态:正在运行的线程由于某些事件(I/O请求等)暂时无法执行的状态,即线程执行阻塞
那么它们之间的状态是如何流转的呢?
- 就绪 → 执行:为就绪线程分配CPU即可变为执行状态
- 执行 → 就绪:正在执行的线程由于时间片用完被剥夺CPU暂停执行,就变为就绪状态
- 执行 → 阻塞:由于发生某事件,使正在执行的线程受阻,无法执行,则由执行变为阻塞(例如线程正在访问临界资源,而资源正在被其他线程访问) 反之,如果获得了之前需要的资源,则由阻塞变为就绪状态,等待分配CPU再次执行
线程的状态(五态模型)¶
如果只是三种,对于描述线程状态在某些时候可能还不够,我们可以再添加两种状态:
- ④ 创建状态:线程的创建比较复杂,需要先申请PCB,然后为该线程运行分配必须的资源,并将该线程转为就绪状态插入到就绪队列中
- ⑤ 终止状态:等待OS进行善后处理,最后将PCB清零,并将PCB返回给系统
PCB(Process Control Block):为了保证参与并发执行的每个线程都能独立运行,OS配置了特有的数据结构PCB来描述线程的基本情况和活动过程,进而控制和管理线程
由此解释Java中线程的生命周期¶
线程生命周期,主要有五种状态:
- 新建状态(New): 当线程对象创建后就进入了新建状态.如:
Thread t = new MyThread();
- 就绪状态(Runnable):当调用线程对象的start()方法,线程即为进入就绪状态.(处于就绪(可运行)状态的线程,只是说明线程已经做好准备,随时等待CPU调度执行,并不是执行了t.start()此线程立即就会执行)
- 运行状态(Running):当CPU调度了处于就绪状态的线程时,此线程才是真正的执行,即进入到运行状态(
就绪状态是进入运行状态的唯一入口
,也就是线程想要进入运行状态状态执行,先得处于就绪状态) - 阻塞状态(Blocked):处于运行状态中的线程由于某种原因,暂时放弃对CPU的使用权,停止执行,此时进入阻塞状态,直到其进入就绪状态才有机会被CPU选中再次执行.根据阻塞状态产生的原因不同,阻塞状态又可以细分成三种:
- 1.
等待阻塞
:运行状态中的线程执行wait()方法,本线程进入到等待阻塞状态 - 2.
同步阻塞
:线程在获取synchronized同步锁失败(因为锁被其他线程占用),它会进入同步阻塞状态 - 3.
其他阻塞
:调用线程的sleep()或者join()或发出了I/O请求时,线程会进入到阻塞状态.当sleep()状态超时.join()等待线程终止或者超时或者I/O处理完毕时线程重新转入就绪状态
- 1.
- 死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
1.3线程的创建¶
一共有三种:Thread、Runable、Callable
1️⃣继承Thread类¶
- 优点: 编写简单,如果需要访问当前线程,无需使用Thread.currentThread()方法,直接使用this即可获得当前线程
- 缺点: 自定义的线程类已继承了Thread类,所以后续无法再继承其他的类
public class ExtendsThread extends Thread {
/**
* 最后:为了修改线程名称,不再使用系统分配的默认名称,需要提供含参构造,并在构造中调用父类给线程起名的构造函数
*/
public ExtendsThread() {
super();
}
public ExtendsThread(String s) {
super(s);
}
@Override
public void run() {
// 具体实现,填写业务需求,但在当前线程中执行,不会启动新线程
System.out.println(Thread.currentThread().getName() + "->执行业务");
}
public static void main(String[] args) {
// 如果只是调用两个线程的run(),那么会按顺序先执行完一个线程,再执行另一个线程,不会有多线程的效果
// 1️⃣run()只是一个普通方法执行的效果,也就是单线程顺序执行的效果,没有多线程的线现象
ExtendsThread mythread1 = new ExtendsThread("线程名字1");
ExtendsThread mythread2 = new ExtendsThread("线程名字2");
ExtendsThread mythread3 = new ExtendsThread("线程名字2");
mythread1.run();
mythread2.run();
mythread3.run();
// 2️⃣只有调用start()才会使线程从新建状态变成可运行状态,才把线程加入就绪队列
mythread1.start();
mythread2.start();
mythread3.start();
}
}
执行完的效果:
2️⃣实现Runnable接口(推荐)¶
- 优点: 自定义的线程类只是实现了Runnable接口或Callable接口,后续还可以继承其他类,在这种方式下,多个线程可以共享同一个target对象,所以非常适合多个相同线程来处理同一份资源的情况,从而可以将CPU、代码、还有数据分开(解耦),形成清晰的模型,较好地体现了面向对象的思想,同时实现Runnable接口避免了多继承局限
- 缺点: 编程稍微复杂,如想访问当前线程,则需使用Thread.currentThread()方法
public class ExtendsRunnable implements Runnable {
@Override
public void run() {
// 具体实现,填写业务需求
System.out.println(Thread.currentThread().getName() + "->执行业务");
}
public static void main(String[] args) {
ExtendsRunnable target = new ExtendsRunnable();
// 把接口的实现类和Thread类绑定:接口没有start()方法
Thread thread1 = new Thread(target);
// 以多线程编程的方式启动,需要创建多个线程对象并启动
// 使用指定的构造函数修改线程的名称--使用Thread类的含参构造
Thread thread2 = new Thread(target, "杰克");
Thread thread3 = new Thread(target, "露丝");
thread1.start();
thread2.start();
thread3.start();
}
}
执行完的效果:
3️⃣实现Callable接口¶
这种方式最大的好处是可以拿到线程执行的结果。
public class ExtendsCallable implements Callable<String> {
@Override
public String call() {
for (int i = 5; i > 0; i--) {
System.out.println(Thread.currentThread() + "当前数" + i);
}
return "执行完毕";
}
public static void main(String[] args) throws Exception {
ExtendsCallable callable = new ExtendsCallable();
FutureTask<String> futureTask=new FutureTask<>(callable);
Thread myThread = new Thread(futureTask);
Thread myThread2 = new Thread(futureTask);
Thread myThread3 = new Thread(futureTask);
myThread.start();
myThread2.start();
myThread3.start();
System.out.println(futureTask.get());
}
}
执行完的效果:
Thread[Thread-0,5,main]当前数5
Thread[Thread-0,5,main]当前数4
Thread[Thread-0,5,main]当前数3
Thread[Thread-0,5,main]当前数2
Thread[Thread-0,5,main]当前数1
执行完毕
1.4多线程的安全和竞争¶
多线程的使用也有一些缺点,就是会增加系统的复杂性,而且它的运行状态往往不透明,你不知道它到底运行的怎么样,线程的创建和销毁也很增加系统的开销, 而且从计算机科学诞生开始,多线程的出现都有一个安全性问题,就是对共享变量的修改是不安全的,考量多线程的使用也就多了很多的维度。 这个东西即使是放到现实中也是一样的,还是两个司机开车运货的例子,当A开始搬最后一批货物,搬完就没有了,此时B司机不知道这个情况, 可能就要多跑一趟回去,放到计算机里就是让库存变为负数,这个就是安全性的一个例子,这个东西从物理逻辑上就是无解的。
判断程序有没有可能出现线程安全问题,主要有以下三个条件:
在多线程程序中 + 有共享数据 + 多条语句操作共享数据
所以要解决线程安全性问题就需要上锁,对共享变量的访问加以控制,一个时间段只有一个线程能访问修改共享变量。锁相关的知识点太多太多, 这里也不一一展开讲,只说一说简单一点的、理论的、八股文的。
乐观锁和悲观锁¶
悲观锁:悲观锁在操作数据时比较悲观,认为别人会同时修改数据。因此
假定会发生并发冲突,屏蔽一切可违反数据完整性的操作,同一时刻只能有一个线程执行写操作
悲观锁认为竞争总是会发生,因此每次对某资源进行操作时,都会持有一个独占的锁,就像synchronized,不管三七二十一,直接上了锁就操作资源了。
例如:synchronized,Lock,WriteReadLock
乐观锁:乐观锁在操作数据时非常乐观,认为别人不会同时修改数据。因此
假设不发生冲突,只在提交操作时检查是否违反数据完整性,多个线程可以并发执行写操作,但是只能有一个线程执行写操作成功。
乐观锁认为竞争不总是会发生,因此它不需要持有锁,将”比较-替换”这两个动作作为一个原子操作尝试去修改内存中的变量,如果失败则表示发生冲突,那么就应该有相应的重试逻辑。
例如:Java中的CAS算法(依赖硬件CPU)、AtomicInteger
1️⃣synchronized(悲观锁,有罪假设)¶
这个是Java自带的关键字,被它修饰的方法和代码块,在同一时间只能有一个线程访问,是最经典的Java锁。 采用synchronized修饰符实现的同步机制叫做互斥锁机制,它所获得的锁叫做互斥锁。每个对象都有一个monitor(锁标记), 当线程拥有这个锁标记时才能访问这个资源,没有锁标记便进入锁池。任何一个对象系统都会为其创建一个互斥锁,这个锁是为了分配给线程的, 防止打断原子操作。每个对象的锁只能分配给一个线程,因此叫做互斥锁。
2️⃣ReentrantLock(悲观锁,有罪假设)¶
这个是JUC包下的一个类,ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,实际上独占锁是一种相对比较保守的锁策略, 在这种情况下任何“读/读”、“读/写”、“写/写”操作都不能同时发生,这在一定程度上降低了吞吐量。然而读操作之间不存在数据竞争问题, 如果”读/读”操作能够以共享锁的方式进行,那会进一步提升性能。
3️⃣ReentrantReadWriteLock(乐观锁,无罪假设)¶
- 因此引入了ReentrantReadWriteLock,顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。
读写锁内部又分为读锁和写锁,
读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的
。 读锁和写锁分离从而提升程序性能,读写锁主要应用于读多写少的场景。 - 与互斥锁相比,读-写锁允许对共享数据进行更高级别的并发访问。虽然一次只有一个线程(writer 线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(reader 线程)从理论上讲,与互斥锁定相比,使用读-写锁允许的并发性增强将带来更大的性能提高。
注意
需要注意的是,用synchronized修饰的方法或者语句块在代码执行完之后锁会自动释放,而用 Lock需要我们手动释放锁, 所以为了保证锁最终被释放(发生异常情况),要把互斥区放在try内,释放锁放在finally内!
4️⃣volatile¶
他也是Java关键字之一,本身不是锁,是对锁机制的一种功能补充
被volatile修饰的变量能够保证每个线程能够获取该变量的最新值,从而避免出现数据脏读的现象。
一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:
①、
(1)当写一个volatile变量时,JMM会把该线程本地内存中的变量强制刷新到主内存中去;
(2)这个写会操作会导致其他线程中的volatile变量缓存无效。
②、
重排序是指编译器和处理器为了优化程序性能而对指令序列进行排序的一种手段。重排序需要遵守一定规则:
- (1)重排序操作不会对存在数据依赖关系的操作进行重排序。
比如:a=1;b=a; 这个指令序列,由于第二个操作依赖于第一个操作,所以在编译时和处理器运行时这两个操作不会被重排序。
- (2)重排序是为了优化性能,但是不管怎么重排序,单线程下程序的执行结果不能被改变
比如:a=1;b=2;c=a+b这三个操作,第一步(a=1)和第二步(b=2)由于不存在数据依赖关系,所以可能会发生重排序,但是c=a+b这个操作是不会被重排序的,因为需要保证最终的结果一定是c=a+b=3。重排序在单线程下一定能保证结果的正确性,但是在多线程环境下,可能发生重排序,影响结果
③、
④、volatile原理
volatile可以保证线程可见性且提供了一定的有序性,但是无法保证原子性。
在JVM底层volatile是采用"内存屏障"来实现的。观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令,lock前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供3个功能;
(1)它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
(2)它会强制将对缓存的修改操作立即写入主存;
(3)如果是写操作,它会导致其他CPU中对应的缓存行无效。
5️⃣死锁¶
Java线程死锁的发生需要满足以下四个条件:
- 互斥条件(Mutual Exclusion):至少有一个资源必须处于非共享模式,即一次只能被一个线程占用。如果另一个线程请求该资源,请求线程必须等待。
- 占有并等待条件(Hold and Wait):一个线程必须占有至少一个资源,并等待获取其他线程占有的资源。
- 不可抢占条件(No Preemption):资源不能被强制从占有它的线程中剥夺,只能由占有它的线程显式释放。
- 循环等待条件(Circular Wait):存在一组等待线程,其中每个线程都在等待下一个线程占有的资源,形成一个循环链。
public class DeadLockDemo {
private static Object resource1 = new Object();
private static Object resource2 = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (resource1) {
System.out.println(Thread.currentThread().getName() + "已经获得锁对象:resource1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在等待获得锁对象:resource2......");
synchronized (resource2) {
System.out.println(Thread.currentThread().getName() + "已经获得锁对象:resource2");
}
}
}, "线程一").start();
new Thread(() -> {
synchronized (resource2) {
System.out.println(Thread.currentThread().getName() + "已经获得锁对象:resource2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在等待获得锁对象:resource1......");
synchronized (resource1) {
System.out.println(Thread.currentThread().getName() + "已经获得锁对象:resource1");
}
}
}, "线程二").start();
}
}
public class DeadLockDemo {
private static final Object lockA = new Object();
private static final Object lockB = new Object();
public void methodA() {
synchronized (lockA) {
synchronized (lockB) {
// 操作资源
}
}
}
public void methodB() {
synchronized (lockA) { // 先获取lockA,再获取lockB
synchronized (lockB) {
// 操作资源
}
}
}
}
public class DeadLockDemo {
private static final Object lockA = new Object();
private static final Object lockB = new Object();
public void methodA() {
synchronized (lockA) {
try {
if (!lockB.tryLock(100, TimeUnit.MILLISECONDS)) {
// 超时处理
return;
}
try {
// 操作资源
} finally {
lockB.unlock();
}
} catch (InterruptedException e) {
// 处理中断
}
}
}
public void methodB() {
synchronized (lockA) { // 先获取lockA,再获取lockB
try {
if (!lockB.tryLock(100, TimeUnit.MILLISECONDS)) {
// 超时处理
return;
}
try {
// 操作资源
} finally {
lockB.unlock();
}
} catch (InterruptedException e) {
// 处理中断
}
}
}
}
1.5多线程效率的理解¶
首先并发并不是减少了单个任务的执行时间,而是减少了多个任务的执行时间。如果只是单纯的将任务丢进线程池,而没有将任务拆分开来, 让多个线程并发执行,那么也只是让任务由一个线程切换到另一个线程执行,反而会因为上下文切换,导致效率降低,这就不是正确的使用姿势。
public class ThreadPoolDemo1 {
private static final int MAX_NUM = 10000;
private static int currentNumber = 0;
public static void main(String[] args) throws InterruptedException {
StopWatch stopWatch1 = singleThreadPrinter();
System.out.println("单线程共计耗时: " + stopWatch1.getTotalTimeMillis() + "ms");
}
// 耗时23756ms左右
private static StopWatch singleThreadPrinter() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (int i = 0; i < MAX_NUM; i++) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
stopWatch.stop();
return stopWatch;
}
}
public class ThreadPoolDemo1 {
private static final int MAX_NUM = 10000;
private static int currentNumber = 0;
public static void main(String[] args) throws InterruptedException {
StopWatch stopWatch2 = threadPoolPrinter();
System.out.println("多线程共计耗时: " + stopWatch2.getTotalTimeMillis() + "ms");
}
// 耗时仍然在23798ms左右
private static StopWatch threadPoolPrinter() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
8 + 1,
15,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
Printer printer = new Printer();
// 这里没有拆分任务
executor.execute(printer);
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
Runtime.getRuntime().gc();
stopWatch.stop();
return stopWatch;
}
private static class Printer implements Runnable {
@Override
public void run() {
// 单个线程只需要自己打印就行
while (currentNumber < MAX_NUM) {
currentNumber++;
try {
Thread.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
public class ThreadPoolDemo1 {
private static final int MAX_NUM = 10000;
private static int currentNumber = 0;
public static void main(String[] args) throws InterruptedException {
StopWatch stopWatch2 = threadPoolPrinter();
System.out.println("多线程共计耗时: " + stopWatch2.getTotalTimeMillis() + "ms");
}
/**
* MAX_NUM = 10000时多线程在2700-2900ms之间(有一点小误差忽略)
*/
private static StopWatch threadPoolPrinter() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
8 + 1,
15,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
Printer printer = new Printer();
// 一定要将任务拆分
for (int i = 0; i < 9; i++) {
executor.execute(printer);
}
executor.shutdown();
// 等待所有任务执行完毕
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
Runtime.getRuntime().gc();
stopWatch.stop();
return stopWatch;
}
private static class Printer implements Runnable {
@Override
public void run() {
// 单个线程只需要自己打印就行
while (currentNumber < MAX_NUM) {
currentNumber++;
try {
Thread.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
1.6池化思想--线程池¶
什么是池化思想?讲到底就是复用,对经常要用的一些计算机资源进行统一管理。优点就是提高复用和性能,代码比自己来更简单,就像做出租车一样, 你自己要用了,打一辆就行,其他的你不用考虑,出租车一直都在,想用就用。当然池化思想也有缺点,万一你不是经常用,会对系统资源有一定开销, 而且对系统资源池化也有一定的维护成本。池化比如有数据库连接池、线程池等。
常用的几个创建线程池方法
newCachedThreadPool
:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool
:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool
:创建一个定长线程池,支持定时及周期性任务执行。newSingleThreadExecutor
:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
1️⃣线程数怎么设置一般看业务场景,分为IO密集型和CPU密集型场景¶
-
CPU密集型场景:理论上CPU核数和线程数一致最合适,实际工程上线程数会设置成CPU核数+1, 这样当线程因为因为额外的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上。
-
IO密集型场景:可以根据公式计算,一般可设置为2N,
线程阻塞时间和线程忙碌时间可以通过压测与代码埋点统计获取, 比如本机R7-5800H 8核16线程CPU
公式
手动创建线程池:
创建线程池的7个参数
- 1️⃣
corePoolSize
:线程池的核心线程数 - 2️⃣
maximumPoolSize
:能容纳的最大线程数,最大线程池数量,当线程数>=corePoolSize,且任务队列已满时,线程池会创建新线程来处理任务;任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常,也一样根据业务来,太大消耗资源,太小容易满 - 3️⃣
keepAliveTime
:空闲线程存活时间 - 4️⃣
unit
: 存活的时间单位 - 5️⃣
workQueue
: 存放提交但未执行任务的队列 - 6️⃣
threadFactory
:创建线程的工厂类 - 7️⃣
handler
: 等待队列满后的拒绝策略
2️⃣例子¶
@Bean("taskAsyncExecutor")
public AsyncListenableTaskExecutor taskExector() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//获取到服务器的cpu内核
int i = Runtime.getRuntime().availableProcessors();
// CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为N(CPU 核心数)+1,
// 多出来的一个线程是为了防止某些原因导致的线程阻塞(如IO操作,线程sleep,等待锁)而带来的影响。
// 一旦某个线程被阻塞,释放了CPU资源,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
// *******************************************************************************
// I/O 密集型任务(2N): 系统的大部分时间都在处理 IO 操作,此时线程可能会被阻塞,释放CPU资源,
// 这时就可以将 CPU 交出给其它线程使用。因此在 IO 密集型任务的应用中,可以多配置一些线程,
// 具体的计算方法:最佳线程数 = CPU核心数 * (1/CPU利用率) = CPU核心数 * (1 + (IO耗时/CPU耗时)),
// 一般可设置为2N。
executor.setCorePoolSize(2 * i + 1);
// 最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
// 任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
// 也一样根据业务来,太大消耗资源,太小容易满
executor.setMaxPoolSize(100);
// 阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
// 这个也是需要根据业务需要来,太大消耗资源,太小线程池队列就容易满,根据拒绝策略来分情况会出现不一样的可能
// 如AbortPolicy(),就会抛RejectedExecutionException异常
executor.setQueueCapacity(1024);
// 线程空闲时间
executor.setKeepAliveSeconds(60);
// spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
// jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
executor.setThreadNamePrefix("taskAsyncExecutor->");
// rejection-policy:拒绝策略:当线程数已经达到maxSize的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行, (个人推荐)
// AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
// DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
// DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
// 具体时间看业务需要,也不能一直等,太久也不合适
executor.setAwaitTerminationSeconds(60);
// 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁
executor.setWaitForTasksToCompleteOnShutdown(true);
// 如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000),
threadFactory
);
3️⃣jdk中提供了四种工作队列(这个也很重要,实际开发也需要很关注):¶
1️⃣ArrayBlockingQueue
:
2️⃣LinkedBlockingQueue
:
3️⃣SynchronousQueue
:
4️⃣PriorityBlockingQueue
:
handler 拒绝策略¶
1️⃣CallerRunsPolicy:该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池shutdown,则直接抛弃任务
2️⃣AbortPolicy:该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
3️⃣DiscardPolicy:该策略下,直接丢弃任务,什么都不做。
4️⃣DiscardOldestPolicy:该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
1.7多线程批量插入大量数据¶
那就是使用线程池。以下开始用只有2个字段,带一个主键ID的本地数据库表,来展示思路和实践,表没有MyBatis-Plus等框架的自动填充,他会影响一定的效率, 表除去主键无任何索引,测试的机子是在Windows 11平台,是一台拯救者笔记本,CPU是R7-5800H,磁盘是海力士,内存条是金士顿DDR4的32G。
1️⃣先来测试一下使用MyBatis-Plus框架自带的insertBatch(Collection<T> entities)
方法,使用单线程插入数据,耗时29477ms¶
@GetMapping("/insertBatchSingle")
@ApiOperation(value = "测试大数据量的批量插入,使用单线程,仅仅测试一下效率")
public R<String> insertBatchSingle(@RequestParam @NotNull Integer size) {
// 构造数据
ArrayList<@Nullable Test> arrayList = constructData1(size);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
bigDataInsertService.insertBatchSingle(arrayList);
stopWatch.stop();
log.info("插入完毕,用时:{}毫秒", stopWatch.getTotalTimeMillis());
return success("数据插入成功");
}
private static ArrayList<@Nullable Test> constructData1(Integer size) {
// 构造数据
ArrayList<@Nullable Test> arrayList = Lists.newArrayList();
for (int i = 0; i < size; i++) {
Test topic = new Test();
topic.setId(UUID.randomUUID().toString().replaceAll("-", ""));
topic.setName("title");
arrayList.add(topic);
}
return arrayList;
}
那么多线程怎么做?如果需要一些功能的,可以使用JUC下面的CompletableFuture工具,使用Test表,MyBatis-Plus的InsertBatch,插入100万数据需要11.98秒¶
@Bean("taskAsyncExecutor")
public ThreadPoolTaskExecutor taskExector() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int i = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(2 * i + 1);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskAsyncExecutor->");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setAwaitTerminationSeconds(60);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
@Autowired
private ExecturConfig execturConfig;
@GetMapping("/insertBatchUseMp")
@ApiOperation(value = "测试大数据量的批量插入")
public R<String> insertBatchUseMp(@RequestParam @NotNull Integer size) {
// 构造数据
ArrayList<@Nullable Test> arrayList = constructData1(size);
// 构造任务
ArrayList<Runnable> tasks = bigDataInsertService.insertBatchUseMp(arrayList);
// 提交任务
List<CompletableFuture<Void>> taskFutureList = new CopyOnWriteArrayList<>();
tasks.forEach(task -> taskFutureList.add(CompletableFuture.runAsync(task, execturConfig.taskExector())
.thenRun(() -> log.info("本线程完成"))
.exceptionally(ex -> {
log.error("系统错误,线程{}执行任务失败:{}", task, ex.getMessage());
return null;
})));
try {
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
} catch (InterruptedException | ExecutionException e) {
log.error("系统错误,线程执行任务失败:{}", e.getMessage());
throw new ServiceException(INTERNAL_SERVER_ERROR);
}
return created("任务新建成功,数据正在同步...");
}
@Override
@Transactional(rollbackFor = Exception.class)
public ArrayList<Runnable> insertBatchUseMp(ArrayList<Test> arrayList) {
// 将数据按照固定的数量分批
List<List<Test>> partitions = Lists.partition(arrayList, SIZE);
ArrayList<Runnable> taskList = Lists.newArrayList();
partitions.forEach(partition -> taskList.add(() -> testMapper.insertBatch(partition)));
return taskList;
}
还可以测试一下MyBatis的xml插入效率,使用Test表,MyBatis的xml的foreach,插入100万数据需要16秒¶
@Autowired
private ExecturConfig execturConfig;
@GetMapping("/useMybatisForEach")
@ApiOperation(value = "测试大数据量的批量插入")
public R<String> insertBatchTest(@RequestParam @NotNull Integer size) {
// 构造数据
ArrayList<@Nullable Test> arrayList = constructData1(size);
// 构造任务
ArrayList<Runnable> tasks = bigDataInsertService.constructRunnablesForTest(arrayList);
// 提交任务
List<CompletableFuture<Void>> taskFutureList = new CopyOnWriteArrayList<>();
tasks.forEach(task -> taskFutureList.add(CompletableFuture.runAsync(task, execturConfig.taskExector())
.thenRun(() -> log.info("本线程完成"))
.exceptionally(ex -> {
log.error("系统错误,线程{}执行任务失败:{}", task, ex.getMessage());
return null;
})));
try {
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
} catch (InterruptedException | ExecutionException e) {
log.error("系统错误,线程执行任务失败:{}", e.getMessage());
throw new ServiceException(INTERNAL_SERVER_ERROR);
}
return created("任务新建成功,数据正在同步...");
}
@Override
@Transactional(rollbackFor = Exception.class)
public ArrayList<Runnable> constructRunnablesForTest(ArrayList<Test> topics) {
// 将数据按照固定的数量分批
List<List<Test>> partitions = Lists.partition(topics, SIZE);
ArrayList<Runnable> taskList = Lists.newArrayList();
partitions.forEach(partition -> taskList.add(() -> testMapper.bigDataInsertBatch(partition)));
return taskList;
}
如果不需要额外功能,简单一点的,可以这样写,用的也是MyBatis的xml方法¶
@Autowired
private ExecturConfig execturConfig;
@GetMapping("/insertBatchAsync1")
@ApiOperation(value = "测试大数据量的批量插入1")
public R<String> insertBatchAsync1(@RequestParam @NotNull Integer size) {
// 构造数据
ArrayList<@Nullable TopicD> arrayList = constructData(size);
// 计算平均每个线程插入多少数据
List<List<TopicD>> partitions = Lists.partition(arrayList, 70000);
ArrayList<Runnable> tasks = Lists.newArrayList();
partitions.forEach(partition -> tasks.add(() -> {
bigDataInsertMapper.bigDataInsertBatch(partition);
log.info("完成!");
}));
// 提交任务
tasks.forEach(task -> execturConfig.taskExector().submit(task));
return created("任务新建成功,数据正在同步...");
}
那么还能不能效率更好一些,时间再短一点?可以考虑使用更为底层的方法,即JDBC。这里没有做异步,示例代码添加了重试。使用test表,只有主键索引和一个name字段,100万数据插入使用了7.02s¶
@GetMapping("/insertJdbcBatch")
@ApiOperation(value = "测试大数据量的批量插入")
public R<String> insertJdbcBatch(@RequestParam @NotNull Integer size) {
// 构造数据
ArrayList<@Nullable Test> arrayList = constructData1(size);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 决定多少数据为一组
List<List<Test>> partitions = Lists.partition(arrayList, 100000);
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
partitions.forEach(partition -> execturConfig.taskExector().execute(()
-> bigDataInsertService.insertJdbcBatch(countDownLatch, partition)
));
// 等待所有线程执行完
try {
countDownLatch.await();
} catch (Exception e) {
log.error("等待所有线程执行完异常,e:{}", e.getMessage(), e);
}
stopWatch.stop();
log.info("共计耗时:{}", stopWatch.getTotalTimeMillis());
// 提前将不再使用的集合清空,释放资源
arrayList.clear();
partitions.clear();
log.info("线程 {} 执行完毕", Thread.currentThread().getName());
return created("任务新建成功,数据正在同步...");
}
@Override
public void insertJdbcBatch(CountDownLatch countDownLatch, List<Test> partition) {
// 设置重试次数
int retryCount = 3;
boolean success = false;
while (retryCount > 0 && !success) {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = druidConfig.druid().getConnection();
connection.setAutoCommit(false);
String sql = "insert into test_for_insertbigdata (id, name) values (?,?)";
preparedStatement = connection.prepareStatement(sql);
for (Test topicD : partition) {
preparedStatement.setString(1, topicD.getId());
preparedStatement.setString(2, topicD.getName());
preparedStatement.addBatch();
}
Assert.notNull(preparedStatement, "数据库连接存在问题,无法获取preparedStatement对象");
preparedStatement.executeBatch();
connection.commit();
// 插入成功,跳出重试循环
success = true;
log.info("数据插入完毕");
} catch (Exception e) {
log.error("启动线程失败,原因:{}", e.getMessage(), e);
try {
if (connection != null) {
connection.rollback();
}
} catch (SQLException rollbackException) {
log.error("事务回滚失败,原因:{}", rollbackException.getMessage(), rollbackException);
}
} finally {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
log.error("连接或预处理语句关闭失败");
}
}
retryCount--;
if (!success && retryCount > 0) {
log.warn("插入失败,重试剩余次数: {}", retryCount);
try {
// 重试前等待固定时间,避免频繁重试导致系统负载过高
Thread.sleep(1000);
} catch (InterruptedException ie) {
log.error("线程中断,e:{}", ie.getMessage(), ie);
Thread.currentThread().interrupt();
}
}
}
if (!success) {
log.error("重试3次后仍然失败,数据插入失败");
// 可以在这里添加其他处理逻辑,例如记录失败的数据到日志或数据库
}
// 执行完一个线程减1,直到执行完
countDownLatch.countDown();
}
统计测试结果¶
如果基础硬件一致,IO一致,代码逻辑配置环境等等都一致,只有一个分批次的批次数量不一定,那么一致从小到大,一致测试,很有可能是一个正态分布
,
它是一个倒“u
”型的曲线,批次数量多了也不行,少了也不行,只有刚刚好的那个批次才是最优解。这个测试结果有机会可以做一做,同一批次测试三次,
取耗时的平均值,然后调整批次数量,一直统计,最终就是一张统计图,最后再来分析最优解的范围。
下面是我自己手动举的例子,并不是实际的测试结果,横坐标是批次量,纵坐标是耗时。
那么还有哪些思路?¶
- 1.还可以使用消息队列
- 2.使用MySQL自带的
LOAD DATA INFILE
语句,专门为了导入数据的,但是可能和业务不一定能结合
1.8事务控制¶
其实可以看到,多线程可以很明显的提高插入效率,当然我们还需要注意数据量,表的结构,因为索引数量会延缓插入效率,线程池的参数,
不要插入的时候OOM了。同时还有一点,这些个方式,都有一个缺陷,无法做到强一致性。业务层面你可以这样控制,每一批次数据是一个事务,
插入失败的单独拿出来做补偿(重试是一种),或者让用户将失败数据在导一次,或者将事务不提交,等待其他事务完成一起提交,最终一致性还是可以做到的。
但是极海Channel
对这个概念的理解,
视频链接
其实到最后,这个就是有一点分布式事务
的感觉了,你无法在单一进程里面实现,那就加一层中间层,俗话说:“没有什么是加一层中间层解决不了的”,
可即使强如seata框架,也不是100%安全的。
那我就是要实现“多线程事务”怎么办?我去请教了开源项目easy-query的作者,图是他的思路
这个方案的一致性要求已经很好了,可能性能差一些,和分布式事务一样。效率和一致性就是不可调和的矛盾,是现实逻辑的映射。
1.9贴一张工具类¶
这个工具类的逻辑至少是可以保证提交的时候都提交成功,提交的时候数据库不挂的强一致性。来自于B站up程序员路人
,主页
这个类使用时,要特别注意线程数不要超过数据库连接池的数量,不然容易死锁,比如50个连接池数量,来了100个,其中50个可以拿到连接, 执行完后业务代码不提交而持有数据库连接,等待其他50个按照代码逻辑,但是另外50个会一直等待前面的数据库连接资源,因为前50个一直不释放, 只能等他们用完了,此时就是死锁,就是一个bug。总的来说还是不建议使用。
@Slf4j
public class MultiThreadTransactionUtils {
/**
* 多线程事务处理,适用于需要在多线程环境下执行多个数据库操作,并且这些操作要么全部成功,要么全部失败的场景
* ⚠️⚠️需要注意任务列表的大小不要超出数据库连接池的大小和数据库本身要求的连接数⚠️⚠️
*
* @param platformTransactionManager Spring的事务管理器,用于控制事务的提交和回滚
* @param taskList 一个可变数量的Runnable任务,每个任务代表一个数据库操作
* @return 如果所有任务都可以执行成功,返回true
*/
public static boolean execute(PlatformTransactionManager platformTransactionManager, List<Runnable> taskList) {
if (taskList == null || taskList.isEmpty()) {
throw new RuntimeException("任务列表不能为空");
}
// 任务数量
int taskSize = taskList.size();
// 任务成功数量计数器
AtomicInteger taskSuccessCount = new AtomicInteger(0);
ArrayList<Future<?>> taskFutureList = new ArrayList<>(taskSize);
// 循环屏障,用于让多线程事务一起提交或者一起回滚
CyclicBarrier cyclicBarrier = new CyclicBarrier(taskSize);
int i = 1;
// 这里就是自定义了,参数看业务
ThreadPoolTaskExecutor exector = taskExector();
try {
// 使用线程池执行循环处理任务,每个任何会交给线程池中的一个线程执行
for (Runnable task : taskList) {
final int taskIndex = i;
Future<?> future = exector.submit(() -> {
TransactionStatus transactionStatus = null;
try {
// 使用spring编程式事务,开启事务
transactionStatus = platformTransactionManager.getTransaction(new DefaultTransactionAttribute());
// 执行任务
task.run();
// 成功数量+1
taskSuccessCount.incrementAndGet();
log.debug("task: {} 等待事务提交", taskIndex);
} catch (Throwable e) {
log.error("task: {}, 执行异常,异常原因:{}", taskIndex, e.getMessage());
} finally {
// 走到这里,会阻塞,直到当前线程池中所有的任务都执行到这个位置后,才会被唤醒,继续向下走
try {
cyclicBarrier.await();
} catch (Exception e) {
log.error("cyclicBarrier.await error: {}", e.getMessage(), e);
}
}
if (transactionStatus != null) {
// 如果所有任务都成功(successAccount的值等于任务总数),则一起提交事务,如果有任何任务失败,则一起回滚事务
if (taskSuccessCount.get() == taskSize) {
log.debug("task: {} 提交事务", taskIndex);
platformTransactionManager.commit(transactionStatus);
} else {
log.debug("task: {} 回滚事务", taskIndex);
platformTransactionManager.rollback(transactionStatus);
}
}
});
taskFutureList.add(future);
i++;
}
for (Future<?> future : taskFutureList) {
try {
future.get();
} catch (Exception e) {
log.error("future.get error: {}", e.getMessage(), e);
}
}
} finally {
exector.shutdown();
}
return taskSuccessCount.get() == taskSize;
}
public static ThreadPoolTaskExecutor taskExector() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int i = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(2 * i + 1);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskAsyncExecutor->");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setAwaitTerminationSeconds(60);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
}