- 什么是JUC
- 了解ThreadLocal
- 了解volatile
- 了解final
- CAS
- 原子类
- 锁机制
- Lock接口
- 线程通讯
- 线程八锁
- 集合不安全
- Callable创建线程
- JUC辅助类
- LockSupport
- AQS
- JUC读写锁
- JUC阻塞队列
- ThreadPool线程池
- java8流式计算
- 分支合并框架
- 异步回调CompletableFuture
- 打造高性能缓存
- 避坑必看
什么是JUC
- java.util.concurrent在并发编程中使用的工具类
进程与线程
- 进程:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
- 大四的时候写论文,用word写论文,同时用QQ音乐放音乐,同时用QQ聊天,多个进程。
- 线程:通常在一个进程中可以包含若干个线程,当然一个进程中至少有一个线程,不然没有存在的意义。线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位,由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统多个程序间并发执行的程度。
- word如没有保存,停电关机,再通电后打开word可以恢复之前未保存的文档,word也会检查你的拼写,两个线程:容灾备份,语法检查
- 线程的几种状态
Thread.State
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,(新建)
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,(准备就绪)
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,(阻塞)
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,(不见不散)
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,(过时不候)
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;(终结)
}
wait与sleep
- wait放开手去睡,放开手里的锁
- sleep握紧手去睡,醒了手里还有锁
并发与并行
- 并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
- 例子:小米9今天上午10点,限量抢购,春运抢票,电商秒杀
- 并行:多项工作一起执行,之后再汇总
- 例子:泡方便面,电水壶烧水,一边撕调料倒入桶中
了解ThreadLocal
-
ThreadLocalMap类是每个线程Thread类里边的变量,里面最重要的一个键值对数组Entry[] table,可以认为是一个map
键:这个ThreadLocal
值:实际需要的成员变量,比如user或simpleDateFormat对象
-
使用场景
每个线程需要一个独享的对象(通常是工具类,比如SimpleDateFormat,Random)
/** * 描述: 利用ThreadLocal,给每个线程分配自己的dateFormat对象,保证了线程安全,高效利用内存 */ public class ThreadLocalNormalUsage05 { public static ExecutorService threadPool = Executors.newFixedThreadPool(10); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 1000; i++) { int finalI = i; threadPool.submit(new Runnable() { @Override public void run() { String date = new ThreadLocalNormalUsage05().date(finalI); System.out.println(date); } }); } threadPool.shutdown(); } public String date(int seconds) { //参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时 Date date = new Date(1000 * seconds); // SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat dateFormat = ThreadSafeFormatter.dateFormatThreadLocal2.get(); return dateFormat.format(date); } } class ThreadSafeFormatter { public static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } }; //java8 public static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal2 = ThreadLocal .withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); }
每个线程内需要保存全局变量,例如在拦截器中获取用户信息,可以让不同方法直接调用,避免参数传递的麻烦
/** * 描述: 演示ThreadLocal用法2:避免传递参数的麻烦 */ public class ThreadLocalNormalUsage06 { public static void main(String[] args) { new Service1().process(""); } } class Service1 { public void process(String name) { User user = new User("超哥"); UserContextHolder.holder.set(user); new Service2().process(); } } class Service2 { public void process() { User user = UserContextHolder.holder.get(); ThreadSafeFormatter.dateFormatThreadLocal.get(); System.out.println("Service2拿到用户名:" + user.name); new Service3().process(); } } class Service3 { public void process() { User user = UserContextHolder.holder.get(); System.out.println("Service3拿到用户名:" + user.name); UserContextHolder.holder.remove(); } } class UserContextHolder { public static ThreadLocal<User> holder = new ThreadLocal<>(); } class User { String name; public User(String name) { this.name = name; } }
主要方法
-
initialValue
该方法会返回当前线程对应的初始值,这是一个延迟加载的方法,只有在调用get方法时触发
如果调用remove方法后再调用get,则可以再次调用此方法
如果不重写此方法,就会返回null
-
set:为这个线程设置一个新值
-
get
得到这个线程对应的value
先取出当前线程的ThreadLocalMap,然后调用map.getEntry方法,把本ThreadLocald的引用作为参数传入,取出map中属于本ThreadLocal的值
map中的键值对是存放在线程中的,而不是在ThreadLocal中
-
remove:删除线程对应的值
内存泄露与空指针
-
threadLocalMap的每个Entry都是一个对key的弱引用,同时每个Entry都包含了一个对value的强引用,正常情况下,当线程终止,保存在threadLocal中的value会被垃圾回收,因为没有任何强引用了,但是,如果线程不终止,那么key对应的value就不能被回收,因为有以下调用链
Thread—ThreadLocalMap—Entry(key为null)—value
-
JDK考虑到这点,在set,remove,rehash方法中会扫描key为null的Entry,并把对应的value也设置为null,这样value就可以被回收了
-
调用remove方法,就会删除对应的Entry对象,可以避免内存泄漏,所以使用完ThreadLocal后,应该调用remove方法。
-
空指针问题
/**
* 描述: TODO
*/
public class ThreadLocalNPE {
ThreadLocal<Long> longThreadLocal = new ThreadLocal<Long>();
public void set() {
longThreadLocal.set(Thread.currentThread().getId());
}
//自动拆箱导致空指针
public long get() {
return longThreadLocal.get();
}
public static void main(String[] args) {
ThreadLocalNPE threadLocalNPE = new ThreadLocalNPE();
System.out.println(threadLocalNPE.get());
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
threadLocalNPE.set();
System.out.println(threadLocalNPE.get());
}
});
thread1.start();
}
}
了解volatile
- volatile是java虚拟机提供的轻量级同步机制
- volatile可以保证可见性,但是不保证原子性,还可以禁止指令重排
package thread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class VolatileDemo {
public static void main(String[] args) {
volatileVisibilityDemo();
atomicDemo();
}
private static void atomicDemo() {
System.out.println("原子性测试");
MyData myData=new MyData();
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j <1000 ; j++) {
myData.addPlusPlus();
myData.addAtomic();
}
},String.valueOf(i)).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"\t int type finally number value: "+myData.number);
System.out.println(Thread.currentThread().getName()+"\t AtomicInteger type finally number value: "+myData.atomicInteger);
}
//volatile可以保证可见性,及时通知其它线程主物理内存的值已被修改
private static void volatileVisibilityDemo() {
System.out.println("可见性测试");
MyData myData=new MyData();//资源类
//启动一个线程操作共享数据
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t come in");
try {
TimeUnit.SECONDS.sleep(3);
myData.setTo60();
System.out.println(Thread.currentThread().getName()+"\t update number value: "+myData.number);
}catch (InterruptedException e)
{
e.printStackTrace();
}
},"AAA").start();
while (myData.number==0){
//main线程持有共享数据的拷贝,一直为0
}
System.out.println(Thread.currentThread().getName()+"\t mission is over. main get number value: "+myData.number);
}
}
class MyData{
int number=0;
//volatile int number=0;
AtomicInteger atomicInteger=new AtomicInteger();
public void setTo60(){
this.number=60;
}
//此时number前面已经加了volatile,但是不保证原子性
public void addPlusPlus(){
number++;
}
public void addAtomic(){
atomicInteger.getAndIncrement();
}
}
JMM
- JMM(java内存模型java memory model)本身是一种抽象的概念并不真实存在,他描述的是一组规范或规则,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。
- JMM关于同步的规定:
- 线程解锁前,必须把共享变量的值刷新回主内存
- 线程加锁前,必须读取主内存的最新值到自己的工作内存。
- 加锁和解锁是同一把锁。
- 由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取和赋值等)必须在工作内存中进行,首先需要将变量从主内存拷贝到自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成
- 可见性问题:可能存在一个线程AAA修改了共享变量X的值但未写回主内存时,另外一个线程BBB又对主内存中同一个共享变量X进行操作,但此时AAA线程工作内存中共享变量X对线程BBB来说并不可见,这种工作内存与主内存同步延迟现象就造成了可见性问题。
- 原子性:number++在多线程下是非线程安全的,为何不加synchronized解决?
class MyData{
volatile int number = 0;
Object object = new Object();
public void addTo60(){
this.number = 60;
}
public void addPlusPlus(){
this.number++;
}
AtomicInteger atomicInteger = new AtomicInteger();
public void addAtomic(){
atomicInteger.getAndIncrement();
}
}
/**
* 验证volatile的可见性
* 1.当number未被volatile修饰时,new Thread将number值改为60,但main线程并不知道,会一直在循环中出不来
* 2.当number使用volatile修饰,new Thread改变number值后,会通知main线程主内存的值已被修改,结束任务。体现了可见性
*
* 验证volatile不保证原子性
* 1.原子性是指,某个线程在执行某项业务时,中间不可被加塞或分割,需要整体完整。要么同时成功,要么同时失败
*
* 如何解决呢?
* 1.使用synchronize
* 2.使用AtomicInteger
*
*/
public class VolatileDemo {
public static void main(String[] args) {
//seeByVolatile();
atomic();
}
//验证原子性
public static void atomic() {
MyData myData = new MyData();
for (int i = 1; i <= 20; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 1; j <= 1000; j++) {
/*synchronized (myData.object){
myData.addPlusPlus();
}*/
myData.addPlusPlus();
myData.addAtomic();
}
}
}).start();
}
//等待上面20个线程全部计算结束
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + "int finally number is " + myData.number);
System.out.println(Thread.currentThread().getName() + "AtomicInteger finally number is " + myData.atomicInteger);
}
//验证可见性的方法
public static void seeByVolatile() {
MyData myData = new MyData();
//第一个线程
new Thread(){
public void run(){
System.out.println(Thread.currentThread().getName() + " come in");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
myData.addTo60();
System.out.println(Thread.currentThread().getName() + " update number to " + myData.number);
}
}.start();
//第二个线程 main
while (myData.number == 0){
}
System.out.println(Thread.currentThread().getName() + "mission is over");
}
}
- 有序性:
- 计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序,一般分为三种:源代码—编译器优化的重排—指令并行的重排—内存系统的重排—最终执行的指令
- 单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致
- 处理器在进行重排序时必须要考虑指令之间的数据依赖性
- 多线程环境中线程交替执行,但由于编译器优化重排的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测。
{
int x = 11;//1
int y = 12;//2
x = x + 5;//3
y = x * x;//4
}
1234
2134
1324
问题:请问语句4可以重排后变成第一条吗?
不行,因为存在数据依赖性
- 案例
public class ReSortSeqDemo{
int a = 0;
boolean flag = false;
public void method1(){
a= 1; //语句1
flag = true; //语句2
}
public void method2(){
if(flag){
a = a+5; //语句3
sout("***retValue:"+a);
}
}
}
//线程操作资源类,线程1访问method1,线程2访问method2,正常情况顺序执行,a=6
//多线程下假设出现了指令重排,语句2在语句1之前,当执行完flag=true后,另一个线程马上执行method2,a=5
- volatile实现禁止指令重排优化,从而避免多线程环境下程序出现乱序执行的现象。
- 内存屏障(memory barrier)又称为内存栅栏,是一个CPU指令,他的作用有两个
- 一是保证特定操作的执行顺序
- 二是保证某些变量的内存可见性(利用该特性实现volatile的内存可见性)
- 由于编译器和处理器都能执行指令重排优化。如果在指令间插入一条内存屏障则会告诉编译器和CPU,不管什么指令都不能和这条内存屏障指令重排序,也就是说通过插入内存屏障禁止在内存屏障前后的指令执行重排序优化。内存屏障另外一个作用是强制刷出各种CPU的缓存数据,因此任何CPU上的线程都能读取到这些数据的最新版本。
- 线程安全性获得保证:
- 工作内存和主内存同步延迟现象导致的可见性问题可以使用synchronized或volatile关键字解决,他们都可以使一个线程修改后的变量立即对其他线程可见。
- 对于指令重排导致的可见性问题和有序性问题,可以利用volatile关键字解决,因为volatile的另一个作用就是禁止重排序优化。
应用场景
- 单例模式DCL
public class SingletonDemo {
//加volatile禁止指令重排
private static volatile SingletonDemo instance = null;
private SingletonDemo(){
System.out.println(Thread.currentThread().getName() + "构造方法");
}
//DCL双端加锁机制,也就是双重检查锁
public static SingletonDemo getInstance(){
if (instance == null){
synchronized (SingletonDemo.class){
if (instance == null){
instance = new SingletonDemo();
}
}
}
return instance;
}
}
//这种写法在多线程条件下可能正确率为99.999999%,但可能由于指令重排出错
- 单例模式volatile分析
- 指令重排只会保证串行语义的执行一致性(单线程),但并不会关心多线程间的语义一致性。
- 当一条线程访问instance不为null时,由于instance实例未必已初始化完成,也就造成了线程安全问题
了解final
- 如果对象在被创建后,状态就不能被修改,那么他就是不可变的。
- 具有不变性的对象一定是线程安全的,我们不需要对其采取任何额外的安全措施,也能保证线程安全。
CAS
- 我认为V的值应该是A,如果是的话就把他改成B,如果不是A(说明被别人修改过),那我就不修改了,避免多人同时修改导致出错
- CAS有三个操作数,内存值V,预期值A,要修改的值B。仅当预期值A和内存值V相等时,才将内存值修改为B,否则什么也不做,最后返回现在的V值。
- 使用场景:原子类,并发工具
- 使用volatile来修饰value字段,保证可见性
比较交换
- this.getIntVolatile(var1,var2) 获取var1这个对象在var2这个地址上的值
- getandIncrement()方法底层调用的是Unsafe类的getAndAddInt()方法,底层是CAS思想,如果比较成功,加1;否则,重新获得再比较,直至成功
/**
* 1.CAS是什么? -->compareAndSet
* 比较并交换
*/
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(5);
//期望结果为5,若符合,替换值为2019
System.out.println(atomicInteger.compareAndSet(5, 2019));
System.out.println(atomicInteger.compareAndSet(5, 2020));
}
}
//true
//false
- 等价代码
/**
* 描述: 模拟CAS操作,等价代码
*/
public class TwoThreadsCompetition implements Runnable {
private volatile int value;
public synchronized int compareAndSwap(int expectedValue, int newValue) {
int oldValue = value;
if (oldValue == expectedValue) {
value = newValue;
}
return oldValue;
}
public static void main(String[] args) throws InterruptedException {
TwoThreadsCompetition r = new TwoThreadsCompetition();
r.value = 0;
Thread t1 = new Thread(r,"Thread 1");
Thread t2 = new Thread(r,"Thread 2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(r.value);
}
@Override
public void run() {
compareAndSwap(0, 1);
}
}
底层原理
- 以atomicInteger.getAndIncrement()来说明
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
/*
this :当前对象
valueOffset :内存偏移量(内存地址)
为什么AtomicInteger能解决i++多线程下不安全的问题,靠的就是底层的Unsafe类
*/
- Unsafe类详解
- unsafe.getAndAddInt
- 小总结
- CAS:比较当前工作内存中的值和主内存中的值,如果相同则执行规定操作,否则继续比较,直到主内存和工作内存中的值一致为止。
- CAS应用:CAS有3个操作数,内存值V,旧的预期值A,要修改的更新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
缺点
- 循环时间长,开销很大
- 只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作。但是,对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性。
- CAS会引起ABA问题
ABA问题
-
ABA问题的产生
CAS算法实现一个重要前提就是需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差中会导致数据的变化
比如一个线程one从内存位置V取出A,这时另一个线程two也从内存中取出A,并且线程two进行了一些操作将值变成了B,然后线程two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中依然是A,然后线程one操作成功。
尽管线程one的CAS操作成功,但是并不代表这个过程是没有问题的。
-
原子引用:AtomicReference
class User{
String name;
int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
public class AtomicReferenceDemo {
public static void main(String[] args) {
User z3 = new User("z3", 22);
User l4 = new User("l4", 25);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(z3);
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.toString());
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.toString());
}
}
//true User{name='l4', age=25}
//false User{name='l4', age=25}
- 解决ABA问题的方法:使用时间戳原子引用AtomicStampedReference
- 案例
/**
* ABA问题的解决 AtomicStampedReference
*/
public class ABADemo {
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
System.out.println("-----------------ABA问题的产生--------------------");
new Thread("t1"){
@Override
public void run() {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}
}.start();
new Thread("t2"){
@Override
public void run() {
try {
//线程t2休眠1秒钟,确保t1完成一次ABA操作
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.compareAndSet(100, 2020) + "\t" + atomicReference.get());
}
}.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----------------ABA问题的解决--------------------");
new Thread("t3"){
@Override
public void run() {
int stamp = atomicStampedReference.getStamp();
System.out.println(getName() + "\t第一次版本号:" + stamp);
try {
//t3线程休眠1秒中,确保t4也拿到初始的版本号
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(getName() + "\t第二次版本号:" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(getName() + "\t第三次版本号:" + atomicStampedReference.getStamp());
}
}.start();
new Thread("t4"){
@Override
public void run() {
int stamp = atomicStampedReference.getStamp();
System.out.println(getName() + "\t第一次版本号:" + stamp);
try {
//t4线程休眠3秒中,确保t3完成一次ABA操作
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2020, stamp, stamp + 1);
System.out.println(getName() + "\t是否修改成功," + result + "\t当前最新实际版本号:" + atomicStampedReference.getStamp());
System.out.println(getName() + "\t当前实际最新值:" + atomicStampedReference.getReference());
}
}.start();
}
}
/*
-----------------ABA问题的产生--------------------
true 2020
-----------------ABA问题的解决--------------------
t3 第一次版本号:1
t4 第一次版本号:1
t3 第二次版本号:2
t3 第三次版本号:3
t4 是否修改成功,false 当前最新实际版本号:3
t4 当前实际最新值:100
*/
原子类
- 原子类的作用和锁类似,是为了保证并发情况下线程安全。
- 粒度更细:原子变量可以把竞争范围缩小到变量级别,通常锁的粒度都要大于原子变量的粒度。
- 效率更高:使用原子类的效率会比使用锁的效率更高,除了高度竞争的情况。
原子类 | 实现 |
---|---|
Atomic*基本原子类 | AtomicInteger AtomicLong AtomicBoolean |
Atomic*Array数组类型原子类 | AtomicIntegerArray AtomicLongArray AtomicReferenceArray |
Atomic*Reference引用类型原子类 | AtomicReference AtomicStampedReference AtomicMarkableReference |
Atomic*FieldUpdater升级类型原子类 | AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater |
Adder累加器 | LongAdder DoubleAdder |
Accumulator累加器 | LongAccumulator DoubleAccumulator |
基本类型原子类
- ActomicInteger主要方法:
- public final int get():获取当前的值
- public final int getAndSet(int newValue):获取当前的值,并设置新的值
- public final int getAndIncrement():获取当前的值,并自增
- public final int getAndDecrement():获取当前的值,并自减
- public final int getAndAdd(int data):获取当前的值,并加上预期的值
- boolean compareAndSet(int expect,int update):如果当前的数值等于预期值,则以原子方式将该值设置为输入值(update)
- 演示ActomicInteger
/**
* 描述: 演示AtomicInteger的基本用法,对比非原子类的线程安全问题,使用了原子类之后,不需要加锁,也可以保证线程安全。
*/
public class AtomicIntegerDemo1 implements Runnable {
private static final AtomicInteger atomicInteger = new AtomicInteger();
public void incrementAtomic() {
atomicInteger.getAndAdd(-90);
}
private static volatile int basicCount = 0;
public synchronized void incrementBasic() {
basicCount++;
}
public static void main(String[] args) throws InterruptedException {
AtomicIntegerDemo1 r = new AtomicIntegerDemo1();
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("原子类的结果:" + atomicInteger.get());
System.out.println("普通变量的结果:" + basicCount);
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
incrementAtomic();
incrementBasic();
}
}
}
- 原子数组
/**
* 描述: 演示原子数组的使用方法
*/
public class AtomicArrayDemo {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
Incrementer incrementer = new Incrementer(atomicIntegerArray);
Decrementer decrementer = new Decrementer(atomicIntegerArray);
Thread[] threadsIncrementer = new Thread[100];
Thread[] threadsDecrementer = new Thread[100];
for (int i = 0; i < 100; i++) {
threadsDecrementer[i] = new Thread(decrementer);
threadsIncrementer[i] = new Thread(incrementer);
threadsDecrementer[i].start();
threadsIncrementer[i].start();
}
// Thread.sleep(10000);
for (int i = 0; i < 100; i++) {
try {
threadsDecrementer[i].join();
threadsIncrementer[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < atomicIntegerArray.length(); i++) {
// if (atomicIntegerArray.get(i)!=0) {
// System.out.println("发现了错误"+i);
// }
System.out.println(atomicIntegerArray.get(i));
}
System.out.println("运行结束");
}
}
class Decrementer implements Runnable {
private AtomicIntegerArray array;
public Decrementer(AtomicIntegerArray array) {
this.array = array;
}
@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.getAndDecrement(i);
}
}
}
class Incrementer implements Runnable {
private AtomicIntegerArray array;
public Incrementer(AtomicIntegerArray array) {
this.array = array;
}
@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.getAndIncrement(i);
}
}
}
引用类型原子类
- AtomicReference可以让一个对象保证原子性,用法和AtomicInteger类似
变量升级原子类
- 偶尔需要一个原子get-set操作
/**
* 描述: 演示AtomicIntegerFieldUpdater的用法
*/
public class AtomicIntegerFieldUpdaterDemo implements Runnable{
static Candidate tom;
static Candidate peter;
public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater
.newUpdater(Candidate.class, "score");
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
peter.score++;
scoreUpdater.getAndIncrement(tom);
}
}
public static class Candidate {
volatile int score;
}
public static void main(String[] args) throws InterruptedException {
tom=new Candidate();
peter=new Candidate();
AtomicIntegerFieldUpdaterDemo r = new AtomicIntegerFieldUpdaterDemo();
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("普通变量:"+peter.score);
System.out.println("升级后的结果"+ tom.score);
}
}
累加器
-
高并发下LongAdder比AtomicInteger效率高,本质是空间换时间
-
竞争激烈的时候,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,是多段锁的概念,提高了并发性。
-
AtomicInteger的实现原理是每一次加法都需要做同步,所以在高并发时会导致冲突比较多,降低了效率
-
LongAdder,每个线程会有一个自己的计数器,仅用来在自己线程内计数,这样一来就不会和其他线程的计数器干扰
-
LongAdder引入分段累加概念,内部有一个base变量和一个Cell[]数组共同参与计数
base变量:竞争不激烈,直接累加到该变量上
Cell数组:竞争激烈,各个线程分散累加到自己的槽Cell[i]中
-
LongAdder适用于统计求和计数的场景
-
AtomicLong
/**
* 描述: 演示高并发场景下,LongAdder比AtomicLong性能好
*/
public class AtomicLongDemo {
public static void main(String[] args) throws InterruptedException {
AtomicLong counter = new AtomicLong(0);
ExecutorService service = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
service.submit(new Task(counter));
}
service.shutdown();
while (!service.isTerminated()) {
}
long end = System.currentTimeMillis();
System.out.println(counter.get());
System.out.println("AtomicLong耗时:" + (end - start));
}
private static class Task implements Runnable {
private AtomicLong counter;
public Task(AtomicLong counter) {
this.counter = counter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
counter.incrementAndGet();
}
}
}
}
- LongAdder
/**
* 描述: 演示高并发场景下,LongAdder比AtomicLong性能好
*/
public class LongAdderDemo {
public static void main(String[] args) throws InterruptedException {
LongAdder counter = new LongAdder();
ExecutorService service = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
service.submit(new Task(counter));
}
service.shutdown();
while (!service.isTerminated()) {
}
long end = System.currentTimeMillis();
System.out.println(counter.sum());
System.out.println("LongAdder耗时:" + (end - start));
}
private static class Task implements Runnable {
private LongAdder counter;
public Task(LongAdder counter) {
this.counter = counter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
}
}
}
- LongAccumulator
/**
* 描述: 演示LongAccumulator的用法
*/
public class LongAccumulatorDemo {
public static void main(String[] args) {
LongAccumulator accumulator = new LongAccumulator((x, y) -> 2 + x * y, 1);
ExecutorService executor = Executors.newFixedThreadPool(8);
IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println(accumulator.getThenReset());
}
}
锁机制
锁的分类
乐观锁和悲观锁
-
常见的悲观锁就是synchronized和lock。在数据库中体现为 select for update
-
悲观锁的劣势:
阻塞和唤醒带来的性能劣势
永久阻塞,如果持有锁的线程被永久阻塞,比如遇到无限循环,死锁等活跃性问题,那么等待该线程释放锁的那几个线程将永远得不到执行
-
乐观锁
认为自己在处理操作的时候不会有其他线程来干扰,所以并不会锁住被操作对象
在更新的时候,去对比在我修改数据期间有没有其他人修改过,若没被修改,就说明只有我在操作,那我就去正常修改数据
如果数据和我一开始拿到的不一样了,说明其他人在这段时间内改过数据,那我就不能继续刚才的更改数据过程了。会选择放弃,报错,重试等策略。
乐观锁的实现一般都是用CAS算法实现的
典型的乐观锁是原子类和并发容器
在数据库中用一个version字段就是乐观锁
先查询这个更新语句的version:select * from table 然后更新:update set sum = 2,version = version+1 where version = 1 and id = 5; 如果version被更新了等于2,不一样就会报错,
-
开销对比
- 悲观锁的原始开销要高于乐观锁,但是特点是一劳永逸,临界区持锁时间就算越来越差,也不会对互斥锁的开销产生影响。
- 虽然乐观锁的开销一开始比悲观锁小,但是如果自旋时间越长或重试次数越多,那么消耗的资源也越多。
- 悲观锁适合并发写入多的情况,适用于临界区持锁时间比较长的情况,悲观锁可以避免大量无用自旋等消耗
- 乐观锁适合并发写入少,大部分是读取的场景,不加锁能让读取性能提高
公平与非公平锁
- 公平锁:是指多个线程按照申请锁的顺序来获取锁,类似排队买饭,先来后到。
- 公平情况:假设线程1234是按照顺序调用lock的,后续等待的线程会进入wait queue中,按照顺序依次执行。在线程1执行unlock释放锁后,由于此时线程2的等待时间最久,所以线程2先得到执行,然后是线程3和4
- 非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并发情况下,有可能造成优先级反转或者饥饿现象。优势:更快,吞吐量更大
- 非公平也同样不提倡插队行为,这里的非公平是指在合适的时机插队,而不是盲目插队。
- 非公平情况:如果在线程1释放锁的时候,线程5恰好去执行lock操作,由于ReentrantLock发现此时并没有线程持有lock这把锁(线程2还没来得及获取到,因为获取需要时间),线程5可以插队,直接拿到这把锁,这也是ReentrantLock默认的公平策略,也就是不公平
- 并发包中ReentrantLock的创建可以指定构造函数的boolean类型来得到公平锁或非公平锁,默认是非公平锁。如果在创建ReentrantLock时参数填写为true,那么这就是公平锁
- 针对tryLock方法,它不遵守设定的公平规则,当有线程执行tryLock的时候,一旦有线程释放了锁,那么这个正在tryock的线程就会获取到锁,即使在它之前已经有其他线程在等待队列里了。
- 两者区别:
- 公平锁:在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果是空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己。
- 非公平锁:线程一开始就尝试占有锁,如果尝试失败,就再采用类似公平锁的那种方式。非公平锁的优点在于吞吐量比公平锁大。对于synchronized而言,也是一种非公平锁。
- 演示读锁和写锁
public class CinemaReadWrite {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
private static void read() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到了读锁,正在读取");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放读锁");
readLock.unlock();
}
}
private static void write() {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到了写锁,正在写入");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放写锁");
writeLock.unlock();
}
}
public static void main(String[] args) {
new Thread(()->read(),"Thread1").start();
new Thread(()->read(),"Thread2").start();
new Thread(()->write(),"Thread3").start();
new Thread(()->write(),"Thread4").start();
}
}
//=========================================================================
/**
* 描述: TODO
*/
public class CinemaReadWriteQueue {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(false);
private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
private static void read() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到了读锁,正在读取");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放读锁");
readLock.unlock();
}
}
private static void write() {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到了写锁,正在写入");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放写锁");
writeLock.unlock();
}
}
public static void main(String[] args) {
new Thread(()->write(),"Thread1").start();
new Thread(()->read(),"Thread2").start();
new Thread(()->read(),"Thread3").start();
new Thread(()->write(),"Thread4").start();
new Thread(()->read(),"Thread5").start();
}
}
- 演示公平和非公平
/**
* 描述: 演示非公平和公平的ReentrantReadWriteLock的策略
*/
public class NonfairBargeDemo {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(
true);
private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
private static void read() {
System.out.println(Thread.currentThread().getName() + "开始尝试获取读锁");
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
System.out.println(Thread.currentThread().getName() + "释放读锁");
readLock.unlock();
}
}
private static void write() {
System.out.println(Thread.currentThread().getName() + "开始尝试获取写锁");
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入");
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
System.out.println(Thread.currentThread().getName() + "释放写锁");
writeLock.unlock();
}
}
public static void main(String[] args) {
new Thread(()->write(),"Thread1").start();
new Thread(()->read(),"Thread2").start();
new Thread(()->read(),"Thread3").start();
new Thread(()->write(),"Thread4").start();
new Thread(()->read(),"Thread5").start();
new Thread(new Runnable() {
@Override
public void run() {
Thread thread[] = new Thread[1000];
for (int i = 0; i < 1000; i++) {
thread[i] = new Thread(() -> read(), "子线程创建的Thread" + i);
}
for (int i = 0; i < 1000; i++) {
thread[i].start();
}
}
}).start();
}
}
可重入和非可重入锁(递归锁)
-
可重入锁指的是同一线程外层函数获得锁之后,内层递归函数仍然能获得该锁的代码,在同一线程在外层方法获取锁的时候,在进入内层方法会自动获取锁,也就是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块(前提,锁对象是同一个对象)。
-
ReentrantLock和synchronized就是一种典型的可重入锁。
-
可重入锁的最大作用就是避免死锁
-
可重入锁种类
-
隐式锁(即synchronized关键字使用的锁)默认是可重入锁
-
//调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的 public class ReEnterLockDemo { static Object objectLockA = new Object(); public static void m1(){ new Thread(() -> { synchronized (objectLockA){ System.out.println(Thread.currentThread().getName()+"\t"+"------外层调用"); synchronized (objectLockA){ System.out.println(Thread.currentThread().getName()+"\t"+"------中层调用"); synchronized (objectLockA) { System.out.println(Thread.currentThread().getName()+"\t"+"------内层调用"); } } } },"t1").start(); } public static void main(String[] args) { m1(); } } /* 可重入锁:可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。 * * 在一个synchronized修饰的方法或代码块的内部 * 调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的 */ public class ReEnterLockDemo { public synchronized void m1(){ System.out.println("=====外层"); m2(); } public synchronized void m2() { System.out.println("=====中层"); m3(); } public synchronized void m3(){ System.out.println("=====内层"); } public static void main(String[] args) { new ReEnterLockDemo().m1(); } }
-
-
Synchronized的重入的实现机理
-
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。 当执行monitorenter时,如果目标锋对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加i。 在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。 当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。
-
显式锁(即Lock)也有ReentrantLock这样的可重入锁。
- ```java
/**
- 可重入锁:可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。 *
- 在一个synchronized修饰的方法或代码块的内部
- 调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的 */
public class ReEnterLockDemo {
static Lock lock = new ReentrantLock();
public static void main(String[] args) { new Thread(() -> { lock.lock(); //lock.lock(); try{ System.out.println("=======外层"); lock.lock(); try{ System.out.println("=======内层"); }finally { lock.unlock(); } } finally { //实现加锁次数和释放次数不一样 //由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。 lock.unlock(); //lock.unlock(); //正在情况,加锁几次就要解锁几次 } },"t1").start(); new Thread(() -> { lock.lock(); try{ System.out.println("b thread----外层调用lock"); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } },"b").start();
} } ```
- 案例
/**
* 可重入锁(递归锁)
*/
class Phone implements Runnable {
public synchronized void sendSMS() throws Exception{
System.out.println(Thread.currentThread().getName() + "\t invoked sendSMS()");
sendEmail();
}
public synchronized void sendEmail() throws Exception{
System.out.println(Thread.currentThread().getName() + "\t invoked sendEmail()");
}
//----------------------------------------------------------------------------------------
Lock lock = new ReentrantLock();
@Override
public void run() {
get();
}
public void get(){
//可以写多次,但加几次、解几次
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t invoked get()");
set();
} finally {
lock.unlock();
lock.unlock();
}
}
public void set(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t invoked set()");
} finally {
lock.unlock();
}
}
}
/**
* demo 1 证明synchronized是可重入锁
* t1 invoked sendSMS() t1线程在外层方法获取锁的时候
* t1 invoked sendEmail() t1在进入内层方法会自动获取锁
* t2 invoked sendSMS()
* t2 invoked sendEmail()
* --------------------------------------------------------
* demo 2 证明ReentranLock是可重入锁
* t3 invoked get()
* t3 invoked set()
* t4 invoked get()
* t4 invoked set()
*/
public class ReenterLockDemo {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread("t1"){
@Override
public void run() {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
new Thread("t2"){
@Override
public void run() {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
Thread t3 = new Thread(phone, "t3");
t3.start();
Thread t4 = new Thread(phone, "t4");
t4.start();
}
}
/*
t1 invoked sendSMS()
t1 invoked sendEmail()
t2 invoked sendSMS()
t2 invoked sendEmail()
t3 invoked get()
t3 invoked set()
t4 invoked get()
t4 invoked set()
*/
- isHeldByCurrentThread方法可以看出锁是否被当前线程持有
- getQueueLength方法可以返回当前正在等待这把锁的队列长度
- 使用 getHoldCount()方法可以获得持有锁的次数,lock一次就加一,unlock一次就减一
自旋锁
- 阻塞或唤醒一个java线程需要操作系统切换CPU来实现,这种状态转换需要消耗处理器时间。
- 如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户执行代码的时间还要长,在很多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程的挂起和恢复的花费可能让系统得不偿失
- 如果有物理机有多个处理器,能让两个或以上线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程会否很快释放锁
- 为了让当前线程稍等一下,我们需要让当前线程自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而直接获取同步资源,从而避免切换线程的开销,这就是自旋锁。
-
如果锁被占用很长时间,那么自旋的线程只会白浪费处理器资源。在自旋过程中,一直消耗CPU,所以虽然自旋锁的起始消耗低于悲观锁,但是随着自旋时间的增长,开销也是线性增长的。
- 是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
- unsafe类的getAndAddIt方法就是典型的自旋锁
- 案例
/**
* 自旋锁:循环比较获取直到成功为止,没有类似wait的阻塞
*
* 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
* 当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B抢到
*
*/
public class SpinLockDemo {
//原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\tcome in.");
while (!atomicReference.compareAndSet(null, thread)){
}
}
public void myUnlock(){
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\tinvoke myUnlock().");
}
public static void main(String[] args) {
SpinLockDemo demo = new SpinLockDemo();
new Thread("t1"){
@Override
public void run() {
demo.myLock();
try {
sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
demo.myUnlock();
}
}.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread("t2"){
@Override
public void run() {
demo.myLock();
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
demo.myUnlock();
}
}.start();
}
}
/*
t1come in.
---1s后-----
t2come in.
---5s后-----
t1invoke myUnlock().
t2invoke myUnlock().
*/
-
使用场景:
自旋锁一般用于多核服务器,在并发度不是特别高的情况下,比阻塞锁的效率高。
自旋锁适用于临界区比较短小的情况,否则如果临界区很大(线层一旦拿到锁,很久以后才会释放),那也是不合适的。
独占,共享与互斥锁
- 独占锁:是指该锁一次只能被一个线程所持有,对ReentrantLock和synchronized而言都是独占锁
- 共享锁:是指锁可被多个线程所持有。
- 以ReentrantReadWriteLock读写锁为例,实现了ReadWriteLock接口,使用readLock()方法来获取读锁,用writeLock()方法来获取写锁。其中读锁是共享锁,写锁是排它锁。在读的地方使用读锁,写的地方使用写锁,如果没有写锁的情况下,读是无阻塞的,提高程序执行效率。
- 读锁的共享锁可保证并发读是非常高效的,读写,写读,写写过程是互斥的。
- 读写锁的规则
- 多个线程都申请读锁,都可以申请到
- 如果已经有一个线程占有了读锁,此时如果其他线程去申请写锁,则申请写锁的线程一直等待读锁释放
- 若已经有线程占有了写锁,此时其他线程去申请写锁或读锁,都会等待,直到写锁被释放
- 一句话总结:要么是一个或多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现(要么多读,要么一写)
- 读锁插队策略
- 公平锁:不允许插队。
- 非公平:假设线程2和线程4正在同时读取,线程3想要写入,拿不到锁,于是进入等待队列,线程5不在队列中,现在过来想要读取。 策略1:读可以插队,提高效率,但可能会导致写入线程饥饿。 策略2:读不可以插队,必须进入队列进行等待,可以避免饥饿。 ReentrantReadWriteLock选用的是策略2。
- 案例
class MyCache { //资源类
private volatile Map<String, Object> map = new HashMap<>();
public void put(String key, Object value){
System.out.println(Thread.currentThread().getName() + "\t正在写入:" + key);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t写入完成");
}
public void get(String key){
System.out.println(Thread.currentThread().getName() + "\t正在读取:");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object value = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t读取完成:" + value);
}
}
/**
* 多个线程同时读一个资源类没问题,所以为了满足并发量,读取共享资源该可以同时进行
* 但是,
* 如果有一个线程想去写共享资源,就不该再有其它线程可以对该资源进行读或写
* 写操作:原子+独占,整个过程必须是一个完整的统一体,中间不允许被分割、被打断
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//5个线程写
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(String.valueOf(i)) {
@Override
public void run() {
myCache.put(String.valueOf(tempInt), tempInt);
}
}.start();
}
//5个线程读
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(String.valueOf(i)) {
@Override
public void run() {
myCache.get(String.valueOf(tempInt));
}
}.start();
}
}
}
/*
2 正在写入:2
1 正在写入:1
3 正在写入:3
4 正在写入:4
5 正在写入:5
1 正在读取:
2 正在读取:
3 正在读取:
4 正在读取:
5 正在读取:
1 写入完成
3 写入完成
4 写入完成
5 写入完成
2 写入完成
3 读取完成:null
1 读取完成:1
2 读取完成:2
5 读取完成:5
4 读取完成:4
*/
class MyCache { //资源类
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在写入:" + key);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t写入完成");
}catch (Exception e) {
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在读取:");
/*try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
Object value = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t读取完成:" + value);
}catch (Exception e) {
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
/**
* 多个线程同时读一个资源类没问题,所以为了满足并发量,读取共享资源该可以同时进行
* 但是,
* 如果有一个线程想去写共享资源,就不该再有其它线程可以对该资源进行读或写
* 写操作:原子+独占,整个过程必须是一个完整的统一体,中间不允许被分割、被打断
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//5个线程写
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(String.valueOf(i)) {
@Override
public void run() {
myCache.put(String.valueOf(tempInt), tempInt);
}
}.start();
}
//5个线程读
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(String.valueOf(i)) {
@Override
public void run() {
myCache.get(String.valueOf(tempInt));
}
}.start();
}
}
}
/*
2 正在写入:2
2 写入完成
1 正在写入:1
1 写入完成
3 正在写入:3
3 写入完成
5 正在写入:5
5 写入完成
4 正在写入:4
4 写入完成
2 正在读取:
2 读取完成:2
1 正在读取:
1 读取完成:1
4 正在读取:
5 正在读取:
5 读取完成:5
3 正在读取:
3 读取完成:3
4 读取完成:4
*/
可中断锁
- synchronized是不可中断锁,Lock是可中断锁,因为tryLock(time)和lockInterruptibly()都能响应中断。
public class LockInterruptibly implements Runnable {
private Lock lock = new ReentrantLock();
public static void main(String[] args) {
LockInterruptibly lockInterruptibly = new LockInterruptibly();
Thread thread0 = new Thread(lockInterruptibly);
Thread thread1 = new Thread(lockInterruptibly);
thread0.start();
thread1.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread1.interrupt();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "尝试获取锁");
try {
lock.lockInterruptibly();
try {
System.out.println(Thread.currentThread().getName() + "获取到了锁");
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "睡眠期间被中断了");
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + "释放了锁");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "获得锁期间被中断了");
}
}
}
锁的升降级
- 支持锁的降级,不支持升级,避免死锁
/**
* 描述: 演示ReentrantReadWriteLock可以降级,不能升级
*/
public class Upgrading {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(
false);
private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
private static void readUpgrading() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到了读锁,正在读取");
Thread.sleep(1000);
System.out.println("升级会带来阻塞");
writeLock.lock();
System.out.println(Thread.currentThread().getName() + "获取到了写锁,升级成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放读锁");
readLock.unlock();
}
}
private static void writeDowngrading() {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到了写锁,正在写入");
Thread.sleep(1000);
readLock.lock();
System.out.println("在不释放写锁的情况下,直接获取读锁,成功降级");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
writeLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
// System.out.println("先演示降级是可以的");
// Thread thread1 = new Thread(() -> writeDowngrading(), "Thread1");
// thread1.start();
// thread1.join();
// System.out.println("------------------");
// System.out.println("演示升级是不行的");
Thread thread2 = new Thread(() -> readUpgrading(), "Thread2");
thread2.start();
}
}
锁的优化
1:缩小同步代码块 2:尽量不要锁住方法 3:减少请求锁的次数 4:避免人为制造“热点” 5:锁中尽量不要包含锁 6:选择适合场景的锁
Lock接口
- Lock并不是用来替代synchronized的,而是当它不满足需求时,用来提供高级功能的。
- Lock接口最常见的实现类是ReentrantLock,通常情况下,Lock只允许一个线程来访问共享资源,不过有时候一些特殊的实现也可允许并发访问,比如ReadWriteLock里的ReadLock。
- lock不会像synchronized一样在异常时自动释放锁
- synchronized效率低,锁的释放情况少,试图获取锁时不能设定超时,不能中断一个正在试图获取锁的线程
- synchronized不够灵活(读写锁更灵活),加锁和释放的时机单一,每个锁仅有单一的条件(某个对象),可能是不够的,并且无法知道是否成功获取到了锁
synchronized
- 原则:高内聚低耦合
- 套路:线程操作资源类
- 实现步骤:
- 创建资源类
- 资源类里创建同步方法,同步代码块
- 案例:卖票
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 题目:三个售票员卖30张票
* 多线程企业级套路加模板
* 在高内聚低耦合前提下(线程,操作,资源类)
*/
public class SaleTicket1 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.saleTicket();
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.saleTicket();
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.saleTicket();
}
},"C").start();
}
}
class Ticket{
private int number = 30;
public synchronized void saleTicket() {
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName()+"\t卖出第:"+(number--)+"\t还剩下:"+number);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
ReentrantLock
- 一个可重入互斥Lock具有与使用synchronized方法和语句访问的隐式监视锁相同的基本行为和语义,但具有扩展功能。
-
ReentrantLock由线程拥有 ,最后成功锁定,但尚未解锁。 调用lock的线程将返回,成功获取锁,当锁不是由另一个线程拥有。 如果当前线程已经拥有该锁,该方法将立即返回。 这可以使用方法isHeldByCurrentThread()和getHoldCount()进行检查。
- 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。
- 建议的做法是始终立即跟随lock与try块的通话,最常见的是在之前/之后的建设,如:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
- 用tryLock来避免死锁
/**
* 描述: 用tryLock来避免死锁
*/
public class TryLockDeadlock implements Runnable {
int flag = 1;
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
TryLockDeadlock r1 = new TryLockDeadlock();
TryLockDeadlock r2 = new TryLockDeadlock();
r1.flag = 1;
r1.flag = 0;
new Thread(r1).start();
new Thread(r2).start();
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
if (flag == 1) {
try {
if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
try {
System.out.println("线程1获取到了锁1");
Thread.sleep(new Random().nextInt(1000));
if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) {
try {
System.out.println("线程1获取到了锁2");
System.out.println("线程1成功获取到了两把锁");
break;
} finally {
lock2.unlock();
}
} else {
System.out.println("线程1获取锁2失败,已重试");
}
} finally {
lock1.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("线程1获取锁1失败,已重试");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (flag == 0) {
try {
if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) {
try {
System.out.println("线程2获取到了锁2");
Thread.sleep(new Random().nextInt(1000));
if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
try {
System.out.println("线程2获取到了锁1");
System.out.println("线程2成功获取到了两把锁");
break;
} finally {
lock1.unlock();
}
} else {
System.out.println("线程2获取锁1失败,已重试");
}
} finally {
lock2.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("线程2获取锁2失败,已重试");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
对比
- 对比synchronized:
- 首先synchronized是java内置关键字,属于jvm层面,monitorenter(底层通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象,只有在同步代码块或同步方法中才能调用wait/notify等方法),monitorexit。
- Lock是个java类(java.util.concurrent.locks.lock)是api层面的锁;
- synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;
- synchronized不可中断,除非抛出异常或正常执行结束。
- ReentrantLock可中断, 设置超时方法tryLock(long timeout, TimeUnit unit)
- synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
- synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)
- synchronized没有锁的绑定条件;ReentrantLock用condition来实现分组唤醒需要唤醒的线程,可以精确唤醒,而不是像synchronized随即唤醒一个或者全部唤醒。
- Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题。
- 案例
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 题目:三个售票员卖30张票
* 多线程企业级套路加模板
* 在高内聚低耦合前提下(线程,操作,资源类)
*/
public class SaleTicket1 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.saleTicket();
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.saleTicket();
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.saleTicket();
}
},"C").start();
}
}
class Ticket{
private int number = 30;
//juc可重入锁
private Lock lock = new ReentrantLock();
public void saleTicket() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName()+"\t卖出第:"+(number--)+"\t还剩下:"+number);
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
线程通讯
- 线程间通信:1、生产者+消费者2、通知等待唤醒机制
sync+wait+notify实现
- 两个线程对同一个变量循环加减
package com.atguigu.thread;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
*
* @Description:
*现在两个线程,
* 可以操作初始值为零的一个变量,
* 实现一个线程对该变量加1,一个线程对该变量减1,
* 交替,来10轮。
* @author xialei
*
* * 笔记:Java里面如何进行工程级别的多线程编写
* 1 多线程变成模板(套路)-----上
* 1.1 线程 操作 资源类
* 1.2 高内聚 低耦合
* 2 多线程变成模板(套路)-----下
* 2.1 判断
* 2.2 干活
* 2.3 通知
* 3 防止虚假唤醒用while
*/
public class NotifyWaitDemoOne
{
public static void main(String[] args)
{
ShareDataOne sd = new ShareDataOne();
new Thread(() -> {
for (int i = 1; i < 10; i++) {
try {
sd.increment();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i < 10; i++) {
try {
sd.decrement();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "B").start();
}
}
class ShareDataOne//资源类
{
private int number = 0;//初始值为零的一个变量
public synchronized void increment() throws InterruptedException
{
//1判断
//这里会存在虚假唤醒问题
if(number !=0 ) {
this.wait();
}
//2干活
++number;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//3通知
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException
{
// 1判断
if (number == 0) {
this.wait();
}
// 2干活
--number;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3通知
this.notifyAll();
}
}
/*
A 1
B 0
A 1
B 0
A 1
B 0
A 1
B 0
A 1
B 0
*/
虚假唤醒
- 换成4个线程(两个线程加,两个线程减)会导致错误,虚假唤醒。原因:在java多线程判断时,不能用if,程序出事出在了判断上面, 突然有一添加的线程进到if了,突然中断了交出控制权,没有进行验证,而是直接走下去了,加了两次,甚至多次
- 中断和虚假唤醒是可能产生的,所以要用loop循环,if只判断一次,while是只要唤醒就要拉回来再判断一次。if换成while
//判断
while(number!=1) {
this.wait();
}
//干活
--number;
System.out.println(Thread.currentThread().getName()+" \t "+number);
//通知
this.notifyAll();
lock+condition+signal实现
- Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。
- 一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其newCondition()方法。
- 官方案例
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
- 案例实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 多线程之间按顺序调用
* 实现A-B-C
* AA打印5次,BB打印10次,CC打印15次,循环10轮
*
* 改变标志位,用lock实现精准唤醒
*/
public class ThreadOrderAccess3 {
public static void main(String[] args) {
Resource resource = new Resource();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
resource.print5();
}
},"A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
resource.print10();
}
},"B").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
resource.print15();
}
},"C").start();
}
}
class Resource {
//标志位
private int number = 1; //1:A,2:B,3:C
//锁
private Lock lock = new ReentrantLock();
//钥匙
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() {
lock.lock();
try {
while (number != 1) {
condition1.await();
}
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
while (number != 2) {
condition2.await();
}
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
while (number != 3) {
condition3.await();
}
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
package com.atguigu.thread;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.omg.IOP.Codec;
/**
*
* @Description:
*现在两个线程,
* 可以操作初始值为零的一个变量,
* 实现一个线程对该变量加1,一个线程对该变量减1,
* 交替,来10轮。
*
* * 笔记:Java里面如何进行工程级别的多线程编写
* 1 多线程变成模板(套路)-----上
* 1.1 线程 操作 资源类
* 1.2 高内聚 低耦合
* 2 多线程变成模板(套路)-----下
* 2.1 判断
* 2.2 干活
* 2.3 通知
*/
public class NotifyWaitDemo
{
public static void main(String[] args)
{
ShareData sd = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
sd.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
sd.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
sd.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
sd.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class ShareData//资源类
{
private int number = 0;//初始值为零的一个变量
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws InterruptedException
{
lock.lock();
try {
//判断
while(number!=0) {
condition.await();
}
//干活
++number;
System.out.println(Thread.currentThread().getName()+" \t "+number);
//通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException
{
lock.lock();
try {
//判断
while(number!=1) {
condition.await();
}
//干活
--number;
System.out.println(Thread.currentThread().getName()+" \t "+number);
//通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
线程八锁
import java.util.concurrent.TimeUnit;
/**
* 题目:多线程8锁
*
* 1 标准访问,先打印邮件还是短信?(邮件)
* 2 邮件方法暂停四秒(后续情况都是暂停四秒),先打印邮件还是短信?(邮件)
* 3 新增一个没有synchronized的普通方法hello(),先打印邮件还是hello?(hello)
* 4 两部手机,先打印邮件还是短信?(短信)
* 5 两个静态同步方法,一部手机,先打印邮件还是短信?(邮件)
* 6 两个静态同步方法,两部手机,先打印邮件还是短信?(邮件)
* 7 1个普通同步方法,1个静态同步方法,一部手机,先打印邮件还是短信?(短信)
* 8 1个普通同步方法,1个静态同步方法,两部手机,先打印邮件还是短信?(短信)
*
*
* 笔记
* 一个对象里边如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中一个synchronized方法了,
* 其他线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法,
* 锁的是当前对象this,被锁定后,其他的线程都不能进入到当前对象的其他synchronized方法
*
* 加个普通方法后,发现和同步锁无关
* 换成两个对象后,不是同一把锁了,所以互不影响
*
* 换成静态同步方法后,情况发生改变,此时锁的是整个类(class)
*
* 所有的非静态同步方法用的都是同一把锁,那就是实例对象本身
*
* synchronized实现同步的基础,java中的每个对象都可以作为锁
* 具体表现为以下三种形式
* 对于普通同步方法,锁是当前实例对象
* 对于静态同步方法,锁的是当前类的class对象
* 对于同步方法块,锁是synchronized括号里的对象
*
* 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁
*
* 也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁
* 可是其他实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁
* 所以无需等待该实例对象已获得锁的非静态同步方法释放锁就可以获取他们自己的锁
*
* 所有的静态同步方法用的也是同一把锁:类对象本身
* 这两把锁(this,class)是两个不同的对象,所以静态同步方法和非静态同步方法之间是不会有竞争的
* 但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获得锁
* 而不管是同一个实例对象的静态同步方法之间,还是不同实例对象的静态同步方法之间,只要他们同一个类的实例对象
*/
public class Lock4 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
phone.sendEmail();
}).start();
Thread.sleep(100);
new Thread(() -> {
phone.sendSMS();
}).start();
}
}
class Phone {
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----email");
}
public synchronized void sendSMS() {
System.out.println("-----sms");
}
}
死锁
- 死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那他们都将无法运行下去,如果系统资源充足,线程的资源请求都能得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
- 手写一个死锁
class HoldLockThread implements Runnable {
private String lockA;
private String lockB;
public HoldLockThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "\t持有" + lockA + ",想要获得" + lockB);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "\t持有" + lockB + ",想要获得" + lockA);
}
}
}
}
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new HoldLockThread(lockA, lockB), "A").start();
new Thread(new HoldLockThread(lockB, lockA), "B").start();
}
}
/*
A 持有lockA,想要获得lockB
B 持有lockB,想要获得lockA
*/
- 排查死锁
- 使用jps命令定位进程号
- 使用jstack找到死锁位置
集合不安全
ConcurrentHashMap
-
HashMap在高并发情况下会发生死循环(仅在JDK7以前存在)
-
java7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成的拉链法。每个segment上有个独立的ReentrantLock锁,每个segment之间互不影响,提高了并发效率。
-
ConcurrentHashMap默认有16个segment,最多可同时支持16个线程并发写(操作分别分布在不同的segment上),这个默认值可以在初始化的时候设置为其他值,但是一旦初始化之后,是不可以扩容的
-
putValue流程
判断key,value不为空
计算hash值
根据对应位置节点的类型,来赋值,或者helpTransfer,或者增长链表,或者给红黑树增加节点
检查满足阈值就红黑树化
返回oldValue
-
get流程
计算hash值
找到对应位置,根据情况进行:
直接取值
红黑树里找值
遍历链表取值
返回找到的结果
-
组合操作不安全
/**
* 描述: 组合操作并不保证线程安全
*/
public class OptionsNotSafe implements Runnable {
private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>();
public static void main(String[] args) throws InterruptedException {
scores.put("小明", 0);
Thread t1 = new Thread(new OptionsNotSafe());
Thread t2 = new Thread(new OptionsNotSafe());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(scores);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
while (true) {
Integer score = scores.get("小明");
Integer newScore = score + 1;
boolean b = scores.replace("小明", score, newScore);
if (b) {
break;
}
}
}
}
}
- ConcurrentHashMap 只能保证提供的原子性读写操作是线程安全的
Set<String> set = new HashSet<>();//线程不安全
Set<String> set = new CopyOnWriteArraySet<>();//线程安全
//HashSet底层数据结构是什么?是HashMap。但HashSet的add是放一个值,而HashMap是放K、V键值对
public HashSet() {
map = new HashMap<>();
}
private static final Object PRESENT = new Object();
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
//------------------------------------------------------------
Map<String,String> map = new HashMap<>();//线程不安全
Map<String,String> map = new ConcurrentHashMap<>();//线程安全
- ArrayList在迭代的时候如果同时对其进行修改就会抛出java.util.ConcurrentModificationException异常 并发修改异常
List<String> list = new ArrayList<>();
for (int i = 0; i <30 ; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}
//看ArrayList的源码
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
//没有synchronized线程不安全
-
解决方案:
- 使用vector
List<String> list = new Vector<>(); 看Vector的源码 public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; } 有synchronized线程安全
- 使用Collections
- 写时复制:CopyOnWriteArrayList是arraylist的一种线程安全变体, 其中所有可变操作(add、set等)都是通过生成底层数组的新副本来实现的。
JUC写时复制CopyOnWrite
- 创建新副本,读写分离
- 代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样
- 读写锁规则升级:读取是完全不用加锁的,写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待;
- 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性,如果希望写入的数据马上能读到,就不要使用CopyOnWrite容器
-
内存占用问题:因为CopyOnWrite的写是复制机制,所以在进行写操作时,内存中会同时驻扎两个对象的内存。
- 不加锁性能提升,出错误,加锁数据一致性能下降
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
/*
CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,
而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。
添加元素后,再将原容器的引用指向新的容器setArray(newElements)。
这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
*/
- 可以修改数组内容
/**
* 描述:演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
*/
public class CopyOnWriteArrayListDemo1 {
public static void main(String[] args) {
//ArrayList<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
System.out.println("list is" + list);
String next = iterator.next();
System.out.println(next);
if (next.equals("2")) {
list.remove("5");
}
if (next.equals("3")) {
list.add("3 found");
}
}
}
}
Callable创建线程
- Callable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而,Runnable不返回结果,也不能抛出被检查的异常。
- 这是一个函数式接口,因此可以用作lambda表达式或方法引用的赋值对象。
- 对比runnable:
- 是否有返回值
- 是否抛异常
- 落地方法不一样,一个是run,一个是call
Future和FutureTask
-
可以用Future.get来获取Callable接口返回的执行结果,可以通过Future.isDone()来判断任务是否已经完成。
-
在call方法未执行完毕之前,调用get方法的线程会被阻塞,直到call方法返回了结果后,此时Future.get()才会得到结果,然后主线程才切换到runnable状态
-
Future是一个存储器,它存储了call这个任务的结果,而这个任务的执行时间是不确定的。
-
for循环批量获取future结果时,容易发生一部分线程很慢的情况,get方法调用时应该使用timeout限制
-
Future的生命周期不能后退,和线程池的生命周期一样,一旦全部执行完毕就永久停在已完成状态。
-
get方法的行为取决于Callable的状态,主要分为五种
任务正常完成,get方法立刻返回结果
任务尚未完成(未开始或执行中),get将阻塞直到任务完成
任务执行中抛出Exception,get方法会抛出ExecutionException
任务被取消,get方法抛出CancellationException
任务超时,get方法有一个超时时间重载方法,若到了时间还未获取到结果,get方法抛出TimeoutException。
-
cancle方法:取消任务的执行
如果这个任务还没开始执行,任务会被正常取消,未来也不会执行,返回true
若任务已完成或已取消,那么这个方法会执行失败,返回false
若任务已经开始执行,则不会直接取消任务,而是根据参数mayInterruptIfRunning做判断
-
FutureTask未来的任务,用它就干一件事,异步调用
-
main方法就像一个冰糖葫芦,一个个方法由main串起来。但解决不了一个问题:正常调用挂起堵塞问题
-
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成 当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
只计算一次,get方法放到最后
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
class MyThread2 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+"come in callable");
return 200;
}
}
public class CallableDemo {
public static void main(String[] args) throws Exception {
//FutureTask<Integer> futureTask = new FutureTask(new MyThread2());
FutureTask<Integer> futureTask = new FutureTask(()->{
System.out.println(Thread.currentThread().getName()+" come in callable");
TimeUnit.SECONDS.sleep(4);
return 1024;
});
FutureTask<Integer> futureTask2 = new FutureTask(()->{
System.out.println(Thread.currentThread().getName()+" come in callable");
TimeUnit.SECONDS.sleep(4);
return 2048;
});
new Thread(futureTask,"zhang3").start();
new Thread(futureTask2,"li4").start();
//System.out.println(futureTask.get());
//System.out.println(futureTask2.get());
//1、一般放在程序后面,直接获取结果
//2、只会计算结果一次
while(!futureTask.isDone()){
System.out.println("***wait");
}
System.out.println(futureTask.get());
System.out.println(Thread.currentThread().getName()+" come over");
}
}
JUC辅助类
- 控制并发流程的工具类,作用就是帮助线程之间的合作
CountDownLatch减少计数
-
数量递减到0时触发,让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
- CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
- 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
- CountDownLatch是不能重用的,如果需要重新计数,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例。
- 一等多
package com.atguigu.thread;
import java.util.concurrent.CountDownLatch;
/**
*
* 解释:6个同学陆续离开教室后值班同学才可以关门。
*
* main主线程必须要等前面6个线程完成全部工作后,自己才能开干
*/
public class CountDownLatchDemo
{
public static void main(String[] args) throws InterruptedException
{
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6; i++) //6个上自习的同学,各自离开教室的时间不一致
{
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 号同学离开教室");
//计数器减一
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
//直到计数器减为0才继续运行
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t****** 班长关门走人,main线程是班长");
}
}
- 多等一
/**
* 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。
*/
public class CountDownLatchDemo2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "准备完毕,等待发令枪");
try {
begin.await();
System.out.println("No." + no + "开始跑步了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
//裁判员检查发令枪...
Thread.sleep(5000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();
}
}
- 结合
/**
* 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。
*/
public class CountDownLatchDemo1And2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "准备完毕,等待发令枪");
try {
begin.await();
System.out.println("No." + no + "开始跑步了");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "跑到终点了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
};
service.submit(runnable);
}
//裁判员检查发令枪...
Thread.sleep(5000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();
end.await();
System.out.println("所有人到达终点,比赛结束");
}
}
CyclicBarrier循环栅栏
-
和CountDownLatch的区别在于:CountDownLatch用于事件,CyclicBarrier用于线程。CountDownLatch只能使用一次,CyclicBarrier可以循环使用
- 允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
- 线程进入屏障通过CyclicBarrier的await()方法。
package com.atguigu.thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier
* 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
* 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
* 直到最后一个线程到达屏障时,屏障才会开门,所有
* 被屏障拦截的线程才会继续干活。
* 线程进入屏障通过CyclicBarrier的await()方法。
*
* 集齐7颗龙珠就可以召唤神龙
*/
public class CyclicBarrierDemo
{
private static final int NUMBER = 7;
public static void main(String[] args)
{
//CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{System.out.println("*****集齐7颗龙珠就可以召唤神龙");}) ;
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+"\t 星龙珠被收集 ");
//在栅栏前等待其他线程,当凑够7个线程在此等待时才会开门
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
/*
收集到第1颗龙珠.
收集到第5颗龙珠.
收集到第2颗龙珠.
收集到第6颗龙珠.
收集到第3颗龙珠.
收集到第4颗龙珠.
收集到第7颗龙珠.
召唤神龙
*/
Semaphore信号灯
- 可以用来限制或管理数量有限资源的使用情况。
-
信号量的作用是维护一个“许可证”的计数
- acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
- release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
- 获取和释放的许可证数量必须一致,注意在初始化时设置公平性
- 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
package com.atguigu.thread;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 在信号量上我们定义两种操作:
* acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
* 要么一直等下去,直到有线程释放信号量,或超时。
* release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
*
* 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
*/
public class SemaphoreDemo
{
public static void main(String[] args)
{
Semaphore semaphore = new Semaphore(3);//模拟3个停车位
for (int i = 1; i <=6; i++) //模拟6部汽车
{
new Thread(() -> {
try
{
//信号量减一
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t 抢到了车位");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+"\t------- 离开");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//信号量加一
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
/*
1 抢到车位.
3 抢到车位.
2 抢到车位.
1 停车3秒钟后离开车位.
3 停车3秒钟后离开车位.
6 抢到车位.
4 抢到车位.
2 停车3秒钟后离开车位.
5 抢到车位.
4 停车3秒钟后离开车位.
6 停车3秒钟后离开车位.
5 停车3秒钟后离开车位.
*/
Condition接口
- 当线程1需要等待某个条件时,它就去执行condition.await()方法,一旦执行了await()方法,线程就会进入await状态。
- 然后通常会有另一个线程,假设是线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这时JVM就会从被阻塞的线程里找哪些等待该condition的线程,当线程1收到可执行信号时,他就会变为runnable可执行状态。
- signalAll会唤醒所有在等待的线程
- signal是公平的,只会唤醒那个等待时间最长的
- Lock用来代替synchronized,condition来代替Object.await/notify;await自动释放所持有的lock锁,调用await必须持有锁,否则报错。
- 基本使用
/**
* 描述: 演示Condition的基本用法
*/
public class ConditionDemo1 {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
void method1() throws InterruptedException {
lock.lock();
try{
System.out.println("条件不满足,开始await");
condition.await();
System.out.println("条件满足了,开始执行后续的任务");
}finally {
lock.unlock();
}
}
void method2() {
lock.lock();
try{
System.out.println("准备工作完成,唤醒其他的线程");
condition.signal();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionDemo1 conditionDemo1 = new ConditionDemo1();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
conditionDemo1.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
conditionDemo1.method1();
}
}
- 生产者消费者模式
/**
* 描述: 演示用Condition实现生产者消费者模式
*/
public class ConditionDemo2 {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满,等待有空余");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}
}
LockSupport
- LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
- LockSupport中的park()和unpark()的作用分别是阻塞线程和解除阻塞线程
线程等待唤醒机制(wait/notify)
-
3种让线程等待和唤醒的方法
- 方式1: 使用Object中的wait()方法让线程等待, 使用Object中的notify()方法唤醒线程
- 方式2: 使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程
- 方式3: LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程
-
Object类中的wait和notify方法实现线程等待和唤醒
-
private static void synchronizedWaitNotify() { new Thread(() - > { synchronized(objectLock) { System.out.println(Thread.currentThread().getName() + "\t" + "------come in"); try { objectLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒"); } }, "A").start(); new Thread(() - > { synchronized(objectLock) { objectLock.notify(); System.out.println(Thread.currentThread().getName() + "\t" + "------通知"); } }, "B").start(); }
-
异常1
- ```java
/**
- 要求: t1线程等待3秒钟,3秒钟后t2线程唤醒t1线程继续工作
- 以下异常情况:
- 2 wait方法和notify方法,两个都去掉同步代码块后看运行效果
- 2.1 异常惰况
- Exception in thread “t1” java.Lang.ILlegalLNonitorStateException at java.lang.Object.wait(Native Method)
- Exception in thread “t2” java.lang.ILlegalWonitorStateException at java.lang.Object.notify(Native Method) *
- 2.2 结论
- Object类中的wait、notify、notifyALlL用于线程等待和唤醒的方法,都必须在synchronized内部执行(必须用到关键字synchronized) * */
public class LockSupportDemo {
static Object objectLock = new Object(); public static void main(String[] args) { new Thread(() - > { synchronized(objectLock) { System.out.println(Thread.currentThread().getName() + "\t" + "------come in"); try { objectLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒"); } }, "A").start(); new Thread(() - > { synchronized(objectLock) { objectLock.notify(); System.out.println(Thread.currentThread().getName() + "\t" + "------通知"); } }, "B").start(); } } ```
-
异常2
- ```java
/**
- 要求: t1线程等待3秒钟,3秒钟后t2线程唤醒t1线程继续工作 *
- 3 将notify放在wait方法前先执行,t1先notify 了,3秒钟后t2线程再执行wait方法
- 3.1程序一直无法结柬
- 3.2结论
- 先wait后notify、notifyall方法,等待中的线程才会被唤醒,否则无法唤醒 * */
public class LockSupportDemo {
static Object objectLock = new Object(); public static void main(String[] args) { new Thread(() - > { synchronized(objectLock) { System.out.println(Thread.currentThread().getName() + "\t" + "------come in"); try { objectLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒"); } }, "A").start(); new Thread(() - > { synchronized(objectLock) { objectLock.notify(); System.out.println(Thread.currentThread().getName() + "\t" + "------通知"); } }, "B").start(); } } ```
-
wait和notify方法必须要在同步块或者方法里面且成对出现使用;先wait后notify才OK
-
Condition接口中的await后signal方法实现线程的等待和唤醒
- ```java
/**
- 要求: t1线程等待3秒钟,3秒钟后t2线程唤醒t1线程继续工作 *
- 3 将notify放在wait方法前先执行,t1先notify 了,3秒钟后t2线程再执行wait方法
- 3.1程序一直无法结柬
- 3.2结论
- 先wait后notify、notifyall方法,等待中的线程才会被唤醒,否则无法唤醒 * */
public class LockSupportDemo {
static Object objectLock = new Object(); static Lock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); public static void main(String[] args) { new Thread(() - > { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "\t" + "------come in"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒"); } finally { lock.unlock(); } }, "A").start(); new Thread(() - > { lock.lock(); try { condition.signal(); System.out.println(Thread.currentThread().getName() + "\t" + "------通知"); } finally { lock.unlock(); } }, "B").start(); }
} ```
- 传统的synchronized和Lock实现等待唤醒通知的约束:线程先要获得并持有锁,必须在锁块(synchronized或lock)中;必须要先等待后唤醒,线程才能够被唤醒
LockSupport类中的park等待和unpark唤醒
-
通过park()和unpark(thread)方法来实现阻塞和唤醒线程的操作
-
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit),permit只有两个值1和零,默认是零。可以把许可看成是一种(0,1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
-
阻塞:park()/park(Object blocker);阻塞当前线程/阻塞传入的具体线程;permit默认是O,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park方法会被唤醒,然后会将permit再次设置为O并返回。
-
唤醒:unpark(Thread thread);唤醒处于阻断状态的指定线程;调用unpark(thread)方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,permit值还是1)会自动唤醒thread线程,即之前阻塞中的LockSupport.park()方法会立即返回。
-
正常+无锁块
-
/** LockSupport:俗称 锁中断 以前的两种方式: 1.以前的等待唤醒通知机制必须synchronized里面有一个wait和notify 2.lock里面有await和signal 这上面这两个都必须要持有锁才能干, LockSupport它的解决的痛点 1。LockSupport不用持有锁块,不用加锁,程序性能好, 2。先后顺序,不容易导致卡死 */ Thread t1 = new Thread(() - > { System.out.println(Thread.currentThread().getName() + "\t ----begi" + System.currentTimeMillis()); LockSupport.park(); //阻塞当前线程 System.out.println(Thread.currentThread().getName() + "\t ----被唤醒" + System.currentTimeMillis()); }, "t1"); t1.start(); LockSupport.unpark(t1); System.out.println(Thread.currentThread().getName() + "\t 通知t1...");
-
之前错误的先唤醒后等待,LockSupport照样支持
-
Thread t1 = new Thread(() - > { try { TimeUnit.SECONDS.sleep(5 L); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t ----begi" + System.currentTimeMillis()); LockSupport.park(); //阻塞当前线程 System.out.println(Thread.currentThread().getName() + "\t ----被唤醒" + System.currentTimeMillis()); }, "t1"); t1.start(); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } LockSupport.unpark(t1); System.out.println(Thread.currentThread().getName() + "\t 通知t1..."); }
重点总结
- LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞的过程
- LockSupport和每个使用它的线程都有一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit,也就是将1变成o,同时park立即返回。如再次调用park会变成阻塞(因为permit为零了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。
- 每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累凭证。
- 线程阻塞需要消耗凭证(permit),这个凭证最多只有1个。
- 当调用park方法时, 如果有凭证,则会直接消耗掉这个凭证然后正常退出;如果无凭证,就必须阻塞等待凭证可用;
- 而unpark则相反,它会增加一个凭证,但凭证最多只能有1个,累加无效。
- 为什么可以先唤醒线程后阻塞线程?因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
- 为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证,证不够,不能放行。
AQS
- AQS是一个用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石, 通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态。
- AQS全名为AbstractQueuedSynchronizer,从java5加入的一个基于FIFO等待队列实现的一个同步器的基础框架。
- AQS为什么是JUC中最重要的基石
- ReentrantLock,CountDownLatch,ReentrantReadWriteLock,Semaphore中都有一个 abstract static class Sync extends AbstractQueuedSynchronizer{}
- 进一步理解锁和同步器的关系: 锁,面向锁的使用者;定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可。 同步器,面向锁的实现者;比如Java并发大神Douglee,提出统一规 范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。
- 加锁会导致阻塞:有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理
- 抢到资源的线程直接使用办理业务,抢占不到资源的线程的必然涉及一种排队等候机制,抢占资源失败的线程继续去等待(类似办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。既然说到了排队等候机制,那么就一定 会有某种队列形成,这样的队列是什么数据结构呢?如果共享资源被占用,就需要一定的阻 塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSuport.park()的方式,维护state变量的状态,使并发达到同步的效果。
AQS初识
-
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的 FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成 一个Node节点来实现锁的分配,通过CAS完成对State值的修改。
-
AQS内部体系结构
-
CLH队列:通过自旋等待,state变量判断是否阻塞,从尾部入队,从头部出队。
-
Node节点
-
static final class Node{ //共享 static final Node SHARED = new Node(); //独占 static final Node EXCLUSIVE = null; //线程被取消了 static final int CANCELLED = 1; //后继线程需要唤醒 static final int SIGNAL = -1; //等待condition唤醒 static final int CONDITION = -2; //共享式同步状态获取将会无条件地传播下去 static final int PROPAGATE = -3; //初始为0,状态是上面的几种 volatile int waitStatus; //前置节点 volatile Node prev; //后置节点 volatile Node next; }
-
AQS同步队列的基本结构
- AQS底层是怎么排队的?是用LockSupport.pork()来进行排队的
三要素
-
AQS最核心的三大部分
state
控制线程抢锁的FIFO队列
协作工具类去实现获取和释放等方法
-
state会根据实现类的不同而不同。在Semaphore里代表“剩余的许可证数量”;在CountDownLatch中表示“还需要倒数的数量”。state是用volatile修饰的,会被并发的修改,所有修改state的方法(getState,setState,compareAndSetState)都要保证线程安全;在ReentrantLock中表示锁的占有情况,包括可重入计数。当state为0时,标识这个lock不被任何线程所持有;
-
这个FIFO队列是用来存放“等待的线程”,AQS就是排队管理器,当多个线程征用同一把锁时,必须有排队机制将那些没能拿到锁的线程串联在一起,当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。
在CountDownLatch中应用
- 调用await方法时,便会尝试获取“共享锁”,但是一开始是获取不到锁的,线程被阻塞。
- 共享锁被获取到的条件就是锁计数器的值为0;锁计数器的初始值为count,每当一个线程调用countDown方法时,才将锁计数器减一。
- count个线程调用countDown后,锁计数器才为0,前面等待锁的线程才能继续运行。
在Semaphore中应用
- 在Semaphore中,state表示许可证的剩余数量
- tryAcquire方法,判断nofairTryAcquireShared大于等于0的话,表示成功。
- 先检查剩余许可证数量够不够这次需要的,用减法计算,如果不够就返回负数,表示失败,如果够了,就用自旋加compareAndSetState来改变state状态,直到改变成功就返回正数。如果期间被人修改导致数量不足也返回负数。
在ReentrantLock中应用
-
释放锁的方法tryRelease:由于是可重入的,所以state代表冲入的次数,每次释放锁,先判断是不是当前持有锁的线程释放的,如果不是就抛异常,如果是就重入次数减一,如果减到了0,就说明完全释放了,于是free就是true,并且把state设置为0。
-
public class AQSDemo { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); //带入一个银行办理业务的案例来模拟我们的AQS如何进行线程的管理和通知唤醒机制 //3个线程模拟3个来银行网点,受理窗口办理业务的顾客 //A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理 new Thread(() - > { lock.lock(); try { System.out.println("-----A thread come in"); try { TimeUnit.MINUTES.sleep(20); } catch (Exception e) { e.printStackTrace(); } } finally { lock.unlock(); } }, "A").start(); //第二个顾客,第二个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待, //进入候客区 new Thread(() - > { lock.lock(); try { System.out.println("-----B thread come in"); } finally { lock.unlock(); } }, "B").start(); //第三个顾客,第三个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时C只能等待, //进入候客区 new Thread(() - > { lock.lock(); try { System.out.println("-----C thread come in"); } finally { lock.unlock(); } }, "C").start(); } }
-
Lock接口的实现类,基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的
-
ReentrantLock原理
-
从最简单的lock方法开始看看公平和非公平
非公平锁lock()
- 对比公平锁和非公平锁的tryAcqure()方法的实现代码, 其实差别就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors()
- hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
- 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
- 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一 个排队线程在unpark(), 之后还是需要竞争锁(存在线程竞争的情况下)
- lock()
- acquire()三大流程走向
- 占位
- tryAcquire(arg)
- nonfairTryAcquire(acquires):return false继续推进条件,走下一个方法addWaiter;return true结束
- addWaiter(Node.EXCLUSIVE)
- enq(node);双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。 真正的第一个有数据的节点,是从第二个节点开始的。
- 假如3号ThreadC线程进来 prev compareAndSetTail next
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
- 假如再抢抢失败就会进入
- shouldParkAterFailedAcquire和parkAndCheckInterrupt方法中
- 占位
- shouldParkAfterFailedAcquire:如果前驱节点的waitstatus是SIGNAL状态(-1),即shouldParkAfterFailedAcquire方法会返回true 程序会继续向下执行parkAndCheckInterrupt方法,用于将当前线程挂起
- parkAndCheckInterrupt
非公平锁unlock()
- sync.release(1);
- tryRelease(arg)
- unparkSuccessor
考点
- AQS里面有个变量叫State,它的值有几种? 3个状态:没占用是0,占用了是1,大于1是可重入锁
- 如果AB两个线程进来了以后,请问这个总共有多少个Node节点? 3个
JUC读写锁
- 如果多个线程同时读和写会出现什么问题?
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"\t 正在写"+key);
//暂停一会儿线程
try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace(); }
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"\t 写完了"+key);
}
public Object get(String key){
Object result = null;
System.out.println(Thread.currentThread().getName()+"\t 正在读"+key);
try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace(); }
result = map.get(key);
System.out.println(Thread.currentThread().getName()+"\t 读完了"+result);
return result;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(()->{
myCache.put(num+"",num+"");
},String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
- ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入。
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 正在写" + key);
//暂停一会儿线程
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t 写完了" + key);
System.out.println();
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public Object get(String key) {
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + "\t 正在读" + key);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
result = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t 读完了" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
return result;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(() -> {
myCache.put(num + "", num + "");
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(() -> {
myCache.get(num + "");
}, String.valueOf(i)).start();
}
}
}
JUC阻塞队列
- 当队列是空的,从队列中获取元素的操作将会被阻塞
- 当队列是满的,从队列中添加元素的操作将会被阻塞
- 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
- 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
种类分析
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。没有容量,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1.");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2.");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3.");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "B").start();
}
}
/*
A put 1.
3s后
B take1
A put 2.
3s后
B take2
A put 3.
3s后
B take3
*/
- LinkedTransferQueue:由链表组成的无界阻塞队列。
- LinkedBlockingDeque:由链表组成的双向阻塞队列。
核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element | peek() | 不可用 | 不可用 |
抛出异常 | 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full。当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException |
---|---|
特殊值 | 插入方法,成功ture失败false。移除方法,成功返回出队列的元素,队列里没有就返回null |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出。当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
- 案例
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("a"));
}
}
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
-------------------------------------------------------------------------
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
}
true
true
true
a
a
a
a
Exception in thread "main" java.util.NoSuchElementException
-------------------------------------------------------------------------
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
}
true
true
true
false
a
a
a
a
null
-------------------------------------------------------------------------
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.put("a");
}
}
-------------------------------------------------------------------------
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.put("a");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
}
-------------------------------------------------------------------------
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
}
}
true
true
true
2s后,
false
-------------------------------------------------------------------------
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}
}
true
true
true
a
a
a
2s后,
null
ThreadPool线程池
- 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
- 它的主要特点为:线程复用;控制最大并发数;管理线程。
- 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
- 第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
- Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
三种常用线程池
-
Executors.newFixedThreadPool(int):
执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的是LinkedBlockingQueue
/**
* 第4种获得/使用java多线程的方式——线程池
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5); //假设银行5个窗口
try {
for (int i = 1; i <= 10; i++) { //10个人来办业务
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务.");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
/*
pool-1-thread-1 办理业务.
pool-1-thread-3 办理业务.
pool-1-thread-4 办理业务.
pool-1-thread-2 办理业务.
pool-1-thread-4 办理业务.
pool-1-thread-3 办理业务.
pool-1-thread-1 办理业务.
pool-1-thread-5 办理业务.
pool-1-thread-4 办理业务.
pool-1-thread-2 办理业务.
*/
-
Executors.newSingleThreadExecutor()
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定的顺序执行。
corePoolSize和maximumPoolSize都设置为1,它使用的LinkedBlockingQueue。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue
/**
* 第4种获得/使用java多线程的方式——线程池
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor(); //周末银行只有1个窗口值班
try {
for (int i = 1; i <= 10; i++) { //10个人来办业务
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务.");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
/*
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
pool-1-thread-1办理业务.
*/
-
Executors.newCachedThreadPool()
执行很多短期异步任务,线程池根据需要创建新线程, 但在先前构建的线程可用时将重用它们。可扩容,遇强则强
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
-
案例
/**
* 第4种获得/使用java多线程的方式——线程池
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool(); //一池多个线程,根据实际业务扩容变动
try {
for (int i = 1; i <= 10; i++) { //10个人来办业务
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务.");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
/*
pool-1-thread-1办理业务.
pool-1-thread-3办理业务.
pool-1-thread-2办理业务.
pool-1-thread-4办理业务.
pool-1-thread-5办理业务.
pool-1-thread-2办理业务.
pool-1-thread-4办理业务.
pool-1-thread-7办理业务.
pool-1-thread-6办理业务.
pool-1-thread-1办理业务.
*/
七大参数
-
corePoolSize:线程池中的常驻核心线程数
在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程
当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
-
maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
-
keepAliveTime:多余的空闲线程的存活时间当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止
-
unit:keepAliveTime的单位
-
workQueue:任务队列,被提交但尚未被执行的任务
-
threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
-
handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的runnable的策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
底层原理
- 在创建了线程池后,开始等待请求。
- 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
- 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
- 验证从core扩容到maximum后,立即运行当前到达的任务,而不是队列中的
public class T1 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 1; i <= 8; i++) {
final int tempInt = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "号窗口,服务顾客" + tempInt);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
/*
core=2,所以1号窗口对应1号顾客,2号窗口对应2号顾客,但是接下来,3、4、5号顾客又来了,进入容量为3的队列中排队,接下来6、7、8号顾客又来了,1、2号窗口正在服务,且队列也满了,此时应该开启3、4、5号窗口来提供服务,为6、7、8号顾客提供服务,然后再由这5个窗口为3、4、5号顾客提供服务
pool-1-thread-1号窗口,服务顾客1
pool-1-thread-3号窗口,服务顾客6
pool-1-thread-4号窗口,服务顾客7
pool-1-thread-5号窗口,服务顾客8
pool-1-thread-2号窗口,服务顾客2
3s后,
pool-1-thread-4号窗口,服务顾客3
pool-1-thread-5号窗口,服务顾客4
pool-1-thread-2号窗口,服务顾客5
*/
合理设置线程池参数
- 拒绝策略:等待队列已经排满了,再也塞不下新任务了同时,线程池中的max线程也达到了,无法继续为新任务服务。这个是时候我们就需要拒绝策略机制合理的处理这个问题。
- JDK内置的拒绝策略:
- AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不 会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中 尝试再次提交当前任务。
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。 如果允许任务丢失,这是最好的一种策略。
- 以上内置拒绝策略均实现了RejectedExecutionHandle接口
- 推荐自定义线程池
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
/**
* 线程池
* Arrays
* Collections
* Executors
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
//new ThreadPoolExecutor.AbortPolicy()
//new ThreadPoolExecutor.CallerRunsPolicy()
//new ThreadPoolExecutor.DiscardOldestPolicy()
new ThreadPoolExecutor.DiscardOldestPolicy()
);
//10个顾客请求
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
- CPU密集型:该任务需要大量的运算,而没有阻塞,CPU会一直全速运行。CPU密集任务只有在真正的多核CPU上才能得到加速(通过多线层),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。CPU密集型任务配置尽可能少的线程数量:一般公式:CPU核数+1个线程的线程池。
- IO密集型:由于IO密集型任务线程并不是一直在执行任务,则应该配置尽可能多的线程,如CPU核数*2
java8流式计算
- 四大函数式接口
-
//R apply(T t);函数型接口,一个参数,一个返回值 Function<String,Integer> function = t ->{return t.length();}; System.out.println(function.apply("abcd")); //boolean test(T t);断定型接口,一个参数,返回boolean Predicate<String> predicate = t->{return t.startsWith("a");}; System.out.println(predicate.test("a")); // void accept(T t);消费型接口,一个参数,没有返回值 Consumer<String> consumer = t->{ System.out.println(t); }; consumer.accept("javaXXXX"); //T get(); 供给型接口,无参数,有返回值 Supplier<String> supplier =()->{return UUID.randomUUID().toString();}; System.out.println(supplier.get());
-
stream流的特点
- Stream 自己不会存储元素
- Stream 不会改变源对象。相反,他们会返回一个持有结果的新Stream
- Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。
-
stream使用流程
- 创建一个Stream:一个数据源(数组、集合)
- 中间操作:一个中间操作,处理数据源数据
- 终止操作:一个终止操作,执行中间操作链,产生结果
分支合并框架
-
Fork:把一个复杂任务进行分拆,大事化小
-
Join:把分拆任务的结果进行合并
-
相关类
- ForkJoinPool
- ForkJoinTask
- RecursiveTask
-
实例
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class MyTask extends RecursiveTask<Integer>{
private static final Integer ADJUST_VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if((end - begin)<=ADJUST_VALUE){
for(int i =begin;i <= end;i++){
result = result + i;
}
}else{
int middle = (begin + end)/2;
MyTask task01 = new MyTask(begin,middle);
MyTask task02 = new MyTask(middle+1,end);
task01.fork();
task02.fork();
result = task01.join() + task02.join();
}
return result;
}
}
/**
* 分支合并例子
* ForkJoinPool
* ForkJoinTask
* RecursiveTask
*/
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0,100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
System.out.println(forkJoinTask.get());
forkJoinPool.shutdown();
}
}
异步回调CompletableFuture
- CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
- 创建异步任务:supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- 常用方法
thnCombine
-合并两个线程任务的结果,并进一步处理。
applyToEither
-两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
acceptEither
-两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
runAfterEither
-两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
runAfterBoth-
两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。
anyOf-anyOf
方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。
allOf-allOf
方法用来实现多 CompletableFuture 的同时返回。
- 案例模拟
//小A进入咖啡馆,点了一杯咖啡,服务员开始制作咖啡,在制作的过程小A也不闲,同时刷了一会抖音后,咖啡制作完成,于是小A走咖啡厅去公司上班
public class Demo2 {
public static void main(String[] args) {
ThreadTool.printTimeAndThread("小A进入咖啡厅");
ThreadTool.printTimeAndThread("小A在咖啡厅和服务员点了一杯咖啡");
// 创建一个带返回值的线程
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("服务员开始制作咖啡");
ThreadTool.sleepMillis(100);
return "咖啡制作好了";
// money表示上面的 咖啡制作好了
}).thenApply(result -> {
ThreadTool.printTimeAndThread(result + "->服务员开始打包");
ThreadTool.sleepMillis(100);
return "咖啡打包好了";
});
ThreadTool.printTimeAndThread("小A正在刷抖音");
ThreadTool.printTimeAndThread(String.format("小A接到服务员%s,接着去公司上班", completableFuture.join()));
}
- 办法A执行完再开始执行B
// 办法A执行完再开始执行B
public class Demo2 {
public static void main(String[] args) {
long time = System.currentTimeMillis();
ThreadTool.printTimeAndThread("小A进入咖啡店");
ThreadTool.printTimeAndThread("小A点了2杯咖啡");
//CompletableFuture<String>返回结果代表String类型
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("服务员A开始制作第1杯咖啡");
ThreadTool.sleepMillis(200);
//返回执行结果
return "服务员制作好了第一杯咖啡";
//传入dist代表返货到上面的返回结果 ->"服务员制作好咖啡机了"
}).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("服务员A开始制作第2杯咖啡");
ThreadTool.sleepMillis(100);
return dish + "服务员制作好了第二杯咖啡";
}));
ThreadTool.printTimeAndThread("小A正在刷抖音");
//completableFuture示例的jion方法,这个join方法的返回类型就是上面的CompletableFuture的泛型
//join方法会等待任务结束,然后返回任务的结果
ThreadTool.printTimeAndThread(String.format("%s ,小A开始喝咖啡", completableFuture.join()));
long ms = System.currentTimeMillis() - time;
System.out.println("办法总消耗的时间:" + ms + " ms");
}
}
- 当方法A与B同时执行完,再执行C
// 当方法A与B同时执行完,再执行C
public class Demo3 {
public static void main(String[] args) {
long time = System.currentTimeMillis();
ThreadTool.printTimeAndThread("小A进入咖啡店");
ThreadTool.printTimeAndThread("小A点了2杯咖啡");
//CompletableFuture<String>返回结果代表String类型
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// 业务一
ThreadTool.printTimeAndThread("服务员A开始制作第1杯咖啡");
ThreadTool.sleepMillis(200);
//返回执行结果
return "服务员制作好了第一杯咖啡";
//传入dist代表返货到上面的返回结果 ->"服务员制作好咖啡机了"
//thenCombine作用表示第一个任务和第二个个任务同时执行
}).thenCombine(CompletableFuture.supplyAsync(() -> {
// 业务二
ThreadTool.printTimeAndThread("服务员B开始制作第2杯咖啡");
ThreadTool.sleepMillis(100);
return "服务员制作好了第二杯咖啡";
// dist是第一个业务执行结果,rece是第二个业务执行结果
}), (dish, rece) -> {
//当第一个业务和第二个业务同时执行完毕后,第三个业务开始执行
ThreadTool.printTimeAndThread("服务员C开始开始打包第一杯咖啡喝第二杯咖啡");
ThreadTool.sleepMillis(100);
return String.format("%s +%s 好了", dish, rece);
});
ThreadTool.printTimeAndThread("小A正在刷抖音");
//completableFuture示例的jion方法,这个join方法的返回类型就是上面的CompletableFuture的泛型
//join方法会等待任务结束,然后返回任务的结果
ThreadTool.printTimeAndThread(String.format("%s ,小A开始拿到咖啡喝", completableFuture.join()));
long ms = System.currentTimeMillis() - time;
System.out.println("办法总消耗的时间:" + ms + " ms");
}
}
- 当业务A与业务B同时执行,哪个优先执行完,则返回结果
// 小A等公交车,200路公交和500公交可以回家,哪个公交优先到坐哪个,如果出现车坏, 则叫出租车回家
public class Demo3 {
public static void main(String[] args) {
ThreadTool.printTimeAndThread("小A在公交站等车");
ThreadTool.printTimeAndThread("小A 回家可以坐200路和205路的公交车回家");
// 创建一个带返回值的线程
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("200公交正在开来");
int a=10/0;
ThreadTool.sleepMillis(100);
return "等到200路公交";
//applyToEither 表示,上个任务和下个任务同时执行,哪个优先执行完,就把结果返回给CompletableFuture(只有一个结果)
}).applyToEither(CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("205公交正在开来");
ThreadTool.sleepMillis(100);
return "等到250路公交";
}),firstComBus -> {
if (firstComBus.startsWith("200公交车坏了")){
ThreadTool.printTimeAndThread("小A打网约车");
}
return firstComBus;
}).exceptionally(e ->{
//如果上面代码出现异常,则执行这里
ThreadTool.printTimeAndThread(e.getMessage());
ThreadTool.printTimeAndThread("小A打网约车");
return "网约车到达";
});
ThreadTool.printTimeAndThread(String.format("小A坐%s公交车回家", completableFuture.join()));
}
}
异步回调
import java.util.concurrent.CompletableFuture;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
//同步,异步,异步回调
//同步
// CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
// System.out.println(Thread.currentThread().getName()+"\t completableFuture1");
// });
// completableFuture1.get();
//异步回调
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"\t completableFuture2");
int i = 10/0;
return 1024;
});
completableFuture2.whenComplete((t,u)->{
System.out.println("-------t="+t);
System.out.println("-------u="+u);
}).exceptionally(f->{
System.out.println("-----exception:"+f.getMessage());
return 444;
}).get();
}
}
错误处理
public void updatePurchase(Purchase purchase) {
// 使用allOf()方法来构建CompletableFuture的步骤。每个参数都是一个并行任务;在allOf()方法结束时,我们只是构建了这些任务,但尚未执行任何操作
CompletableFuture.allOf(
//首先使用supplyAsync()方法提供了一个Supplier,从这个Supplier中我们将检索数据。然后,我们使用thenAccept()来消费supplyAsync()的结果;建议为supplyAsync()方法指定一个自定义的ExecutorService是一个好习惯,这样我们可以对线程池参数有更多的控制
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription),
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription),
CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
.thenAccept(purchase::setBuyerName)
// 设置超时时间5s
.orTimeout(5, TimeUnit.SECONDS)
.handle((result, exception) -> {
if (exception instanceof TimeoutException) {
// 异常处理
return null ;
}
return result ;
})
).join() ;
// 在所有任务构建完毕后调用join()方法来并行运行所有任务并收集它们的结果。由于join()是一个线程阻塞操作,我们只在最后调用它,而不是在每个任务步骤之后调用,这是为了通过减少线程阻塞来优化应用程序性能
}
打造高性能缓存
HashMap
- 使用map简单实现
/**
* 描述: 最简单的缓存形式:HashMap
*/
public class ImoocCache1 {
private final HashMap<String,Integer> cache = new HashMap<>();
public synchronized Integer computer(String userId) throws InterruptedException {
Integer result = cache.get(userId);
//先检查HashMap里面有没有保存过之前的计算结果
if (result == null) {
//如果缓存中找不到,那么需要现在计算一下结果,并且保存到HashMap中
result = doCompute(userId);
cache.put(userId, result);
}
return result;
}
private Integer doCompute(String userId) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return new Integer(userId);
}
public static void main(String[] args) throws InterruptedException {
ImoocCache1 imoocCache1 = new ImoocCache1();
System.out.println("开始计算了");
Integer result = imoocCache1.computer("13");
System.out.println("第一次计算结果:"+result);
result = imoocCache1.computer("13");
System.out.println("第二次计算结果:"+result);
}
}
ConcurrentHashMap
- 用线程安全map
- Computable
/**
* 描述: 有一个计算函数computer,用来代表耗时计算,每个计算器都要实现这个接口,这样就可以无侵入实现缓存功能
*/
public interface Computable <A,V>{
V compute(A arg) throws Exception;
}
- ExpensiveFunction
/**
* 描述: 耗时计算的实现类,实现了Computable接口,但是本身不具备缓存能力,不需要考虑缓存的事情
*/
public class ExpensiveFunction implements Computable<String, Integer>{
@Override
public Integer compute(String arg) throws Exception {
Thread.sleep(5000);
return Integer.valueOf(arg);
}
}
- MayFail
/**
* 描述: 耗时计算的实现类,有概率计算失败
*/
public class MayFail implements Computable<String, Integer>{
@Override
public Integer compute(String arg) throws Exception {
double random = Math.random();
if (random > 0.5) {
throw new IOException("读取文件出错");
}
Thread.sleep(3000);
return Integer.valueOf(arg);
}
}
- test
/**
* 描述: 缩小了synchronized的粒度,提高性能,但是依然并发不安全
*/
public class ImoocCache6<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public ImoocCache6(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
public static void main(String[] args) throws Exception {
ImoocCache6<String, Integer> expensiveComputer = new ImoocCache6<>(
new ExpensiveFunction());
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666");
System.out.println("第一次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666");
System.out.println("第三次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("667");
System.out.println("第二次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
Future
- 重复计算问题(两个线程一前一后,做同样的计算)
/**
* 描述: 利用Future,避免重复计算
*/
public class ImoocCache9<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public ImoocCache9(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException, ExecutionException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<>(callable);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
System.out.println("被取消了");
cache.remove(arg);
throw e;
} catch (InterruptedException e) {
cache.remove(arg);
throw e;
} catch (ExecutionException e) {
System.out.println("计算错误,需要重试");
cache.remove(arg);
}
}
}
public static void main(String[] args) throws Exception {
ImoocCache9<String, Integer> expensiveComputer = new ImoocCache9<>(
new MayFail());
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666");
System.out.println("第一次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666");
System.out.println("第三次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("667");
System.out.println("第二次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
缓存过期
- 为每个结果指定过期时间,并定期扫描过期的元素
- 若同时过期,同时拿不到缓存,会导致直接打爆CPU和MySQL,造成缓存雪崩,缓存击穿
- 缓存过期时间设置为随机
/**
* 描述: 出于安全性考虑,缓存需要设置有效期,到期自动失效,否则如果缓存一直不失效,那么带来缓存不一致等问题
*/
public class ImoocCache10<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public ImoocCache10(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException, ExecutionException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<>(callable);
//原子组合
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
System.out.println("被取消了");
cache.remove(arg);
throw e;
} catch (InterruptedException e) {
cache.remove(arg);
throw e;
} catch (ExecutionException e) {
System.out.println("计算错误,需要重试");
cache.remove(arg);
}
}
}
public V computeRandomExpire(A arg) throws ExecutionException, InterruptedException {
long randomExpire = (long) (Math.random() * 10000);
return compute(arg, randomExpire);
}
public final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
public V compute(A arg, long expire) throws ExecutionException, InterruptedException {
if (expire>0) {
executor.schedule(new Runnable() {
@Override
public void run() {
expire(arg);
}
}, expire, TimeUnit.MILLISECONDS);
}
return compute(arg);
}
public synchronized void expire(A key) {
Future<V> future = cache.get(key);
if (future != null) {
if (!future.isDone()) {
System.out.println("Future任务被取消");
future.cancel(true);
}
System.out.println("过期时间到,缓存被清除");
cache.remove(key);
}
}
public static void main(String[] args) throws Exception {
ImoocCache10<String, Integer> expensiveComputer = new ImoocCache10<>(
new MayFail());
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666",5000L);
System.out.println("第一次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666");
System.out.println("第三次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("667");
System.out.println("第二次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(6000L);
Integer result = expensiveComputer.compute("666");
System.out.println("主线程的计算结果:" + result);
}
}
避坑必看
1线程重用导致信息错乱
-
我们知道,ThreadLocal 适用于变量在线程间隔离,而在方法或类间共享的场景。
-
如果用户信息的获取比较昂贵(比如从数据库查询用户信息),那么在 ThreadLocal 中缓存数据 是比较合适的做法。
-
ThreadLocal可以理解为绑定到线程的Map,相同线程的不同逻辑需要共享数据(但又无法通过传值来共享数据),或为了避免相同线程重复创建对象希望重用数据,可以考虑使用ThreadLocal
-
使用 Spring Boot 创建一个 Web 应用程序,使用 ThreadLocal 存放一个 Integer 的值,来暂且代表需要在线程中保存的用户信息,这个值初始是 null。在业务逻辑中,我先从ThreadLocal 获取一次值,然后把外部传入的参数设置到 ThreadLocal 中,来模拟从当前上下文获取到用户信息的逻辑,随后再获取一次值,最后输出两次获得的值和线程名称。
-
@RestController @RequestMapping("threadlocal") public class ThreadLocalMisuseController { private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null); @GetMapping("wrong") public Map wrong(@RequestParam("userId") Integer userId) { //设置用户信息之前先查询一次ThreadLocal中的用户信息 String before = Thread.currentThread().getName() + ":" + currentUser.get(); //设置用户信息到ThreadLocal currentUser.set(userId); //设置用户信息之后再查询一次ThreadLocal中的用户信息 String after = Thread.currentThread().getName() + ":" + currentUser.get(); //汇总输出两次查询结果 Map result = new HashMap(); result.put("before", before); result.put("after", after); return result; } }
-
按理说,在设置用户信息之前第一次获取的值始终应该是 null,但我们要意识到,程序运行在 Tomcat 中,执行程序的线程是 Tomcat 的工作线程,而 Tomcat 的工作线程是基于线程池的。顾名思义,线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal 中的用户信息就是其他用户的信息。
-
设置一下 Tomcat 的参数,把工作线程池最大线程数设置为 1,这样始终是同一个线程在处理请求
-
server.tomcat.max-threads=1
-
运行程序后先让用户 1 来请求接口,可以看到第一和第二次获取到用户 ID 分别是 null 和1,符合预期。随后用户 2 来请求接口,这次就出现了 Bug,第一和第二次获取到用户 ID 分别是 1 和2,显然第一次获取到了用户 1 的信息,原因就是 Tomcat 的线程池重用了线程。从图中可以看到,两次请求的线程都是同一个线程
-
因为线程的创建比较昂贵,所以 Web 服务器往往会使用线程池来处理请求,这就意味着线程会被重用。这时,使用类似 ThreadLocal 工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据。如果在代码中使用了自定义的线程池,也同样会遇到这个问题。
-
修正这段代码的方案是,在代码的 finally 代码块中,显式清除ThreadLocal 中的数据。这样一来,新的请求过来即使使用了之前的线程也不会获取到错误的用户信息了
-
@GetMapping("right") public Map right(@RequestParam("userId") Integer userId) { String before = Thread.currentThread().getName() + ":" + currentUser.get(); currentUser.set(userId); try { String after = Thread.currentThread().getName() + ":" + currentUser.get(); Map result = new HashMap(); result.put("before", before); result.put("after", after); return result; } finally { //在finally代码块中删除ThreadLocal中的数据,确保数据不串 currentUser.remove(); } }
- ThreadLocal 是利用独占资源的方式,来解决线程安全问题,那如果我们确实需要有资源在线程之前共享,应该怎么办呢?这时,我们可能就需要用到线程安全的容器了
2使用并发安全工具并不一定安全
-
有一个含 900 个元素的Map,现在再补充 100 个元素进去,这个补充操作由 10 个线程并发进行。开发人员误以为使用了 ConcurrentHashMap 就不会有线程安全问题,于是不加思索地写出了下面的代 码:在每一个线程的代码逻辑中先通过 size 方法拿到当前元素数量,计算ConcurrentHashMap 目前还需要补充多少元素,并在日志中输出了这个值,然后通过putAll 方法把缺少的元素添加进去。
-
@RestController @RequestMapping("concurrenthashmapmisuse") @Slf4j public class ConcurrentHashMapMisuseController { //线程个数 private static int THREAD_COUNT = 10; //总元素数量 private static int ITEM_COUNT = 1000; //帮助方法,用来获得一个指定元素数量模拟数据的ConcurrentHashMap private ConcurrentHashMap<String, Long> getData(int count) { return LongStream.rangeClosed(1, count) .boxed() .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(), (o1, o2) -> o1, ConcurrentHashMap::new)); } @GetMapping("wrong") public String wrong() throws InterruptedException { ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100); //初始900个元素 log.info("init size:{}", concurrentHashMap.size()); //使用线程池并发处理逻辑 ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> { //查询还需要补充多少个元素 int gap = ITEM_COUNT - concurrentHashMap.size(); log.info("gap size:{}", gap); //补充元素 concurrentHashMap.putAll(getData(gap)); })); //等待所有任务完成 forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); //最后元素个数会是1000吗? log.info("finish size:{}", concurrentHashMap.size()); return "OK"; } }
-
最后 HashMap 的总项目数是 1536,显然不符合填充满 1000 的预期。因为concurrentHashMap只能保证同一时间只能有一个线程向里边添加元素,但不能保证同一时间只能有一个线程可以查看里面的元素。
-
诸如 size、isEmpty 和 containsValue 等聚合方法,不能确保原子性,在并发情况下可能会反映ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。
-
代码的修改方案很简单,整段逻辑加锁即可
-
@GetMapping("right") public String right() throws InterruptedException { ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100); log.info("init size:{}", concurrentHashMap.size()); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> { //下面的这段复合逻辑需要锁一下这个ConcurrentHashMap synchronized (concurrentHashMap) { int gap = ITEM_COUNT - concurrentHashMap.size(); log.info("gap size:{}", gap); concurrentHashMap.putAll(getData(gap)); } })); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); log.info("finish size:{}", concurrentHashMap.size()); return "OK"; }
- 使用 ConcurrentHashMap 全程加锁,还不如使用普通的HashMap 呢。其实不完全是这样。ConcurrentHashMap 提供了一些原子性的简单复合逻辑方法,用好这些方法就可以发挥其威力。
3发挥concurrentHashMap真正实力
-
使用 Map 来统计 Key 出现次数,使用 ConcurrentHashMap 来统计,Key 的范围是 10。使用最多 10 个并发,循环操作 1000 万次,每次操作累加随机的 Key。如果 Key 不存在的话,首次设置值为 1。
-
@RestController @RequestMapping("concurrenthashmapperformance") @Slf4j public class ConcurrentHashMapPerformanceController { //循环次数 private static int LOOP_COUNT = 10000000; //线程数量 private static int THREAD_COUNT = 10; //元素数量 private static int ITEM_COUNT = 10; //测试方法 @GetMapping("good") public String good() throws InterruptedException { StopWatch stopWatch = new StopWatch(); stopWatch.start("normaluse"); Map<String, Long> normaluse = normaluse(); stopWatch.stop(); //校验元素数量 Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error"); //校验累计总数 Assert.isTrue(normaluse.entrySet().stream() .mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT , "normaluse count error"); stopWatch.start("gooduse"); Map<String, Long> gooduse = gooduse(); stopWatch.stop(); Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error"); Assert.isTrue(gooduse.entrySet().stream() .mapToLong(item -> item.getValue()) .reduce(0, Long::sum) == LOOP_COUNT , "gooduse count error"); log.info(stopWatch.prettyPrint()); return "OK"; } //正常思路 private Map<String, Long> normaluse() throws InterruptedException { ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> { //获得一个随机的Key String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); synchronized (freqs) { if (freqs.containsKey(key)) { //Key存在则+1 freqs.put(key, freqs.get(key) + 1); } else { //Key不存在则初始化为1 freqs.put(key, 1L); } } } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); return freqs; } //更好的思路 private Map<String, Long> gooduse() throws InterruptedException { ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); //利用computeIfAbsent()方法来实例化LongAdder,然后利用LongAdder来进行操作 freqs.computeIfAbsent(key, k -> new LongAdder()).increment(); } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); //因为我们的Value是LongAdder而不是Long,所以需要做一次转换才能返回 return freqs.entrySet().stream() .collect(Collectors.toMap( e -> e.getKey(), e -> e.getValue().longValue()) ); } }
-
使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 来做复合逻辑操作,判断Key 是否存在 Value,如果不存在则把 Lambda 表达式运行后的结果放入 Map 作为Value,也就是新创建一个 LongAdder 对象,最后返回 Value。由于 computeIfAbsent 方法返回的 Value 是 LongAdder,是一个线程安全的累加器,因此可以直接调用其 increment 方法进行累加。
- computeIfAbsent 为什么如此高效呢?Java 自带的 Unsafe 实现的 CAS。它在虚拟机层面确保了写入数据的原子性,比加锁的效率高得多
4错误的使用CopyOnWrite
-
CopyOnWrite 是一个时髦的技术。在 Java 中,CopyOnWriteArrayList 虽然是一个线程安全的 ArrayList,但因为其实现方式是,每次修改数据时都会复制一份数据出来,所以有明显的适用场景,即读多写少或者说希望无锁读的场景。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会非常糟糕。
-
比较下使用 CopyOnWriteArrayList 和普通加锁方式 ArrayList的读写性能
-
@RestController @RequestMapping("copyonwritelistmisuse") @Slf4j public class CopyOnWriteListMisuseController { //测试并发写的性能 @GetMapping("write") public Map testWrite() { List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); StopWatch stopWatch = new StopWatch(); int loopCount = 100000; stopWatch.start("Write:copyOnWriteArrayList"); //循环100000次并发往CopyOnWriteArrayList写入随机元素 IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); stopWatch.start("Write:synchronizedList"); //循环100000次并发往加锁的ArrayList写入随机元素 IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); log.info(stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList", copyOnWriteArrayList.size()); result.put("synchronizedList", synchronizedList.size()); return result; } //帮助方法用来填充List private void addAll(List<Integer> list) { list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList())); } //测试并发读的性能 @GetMapping("read") public Map testRead() { //创建两个测试对象 List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); //填充数据 addAll(copyOnWriteArrayList); addAll(synchronizedList); StopWatch stopWatch = new StopWatch(); int loopCount = 1000000; int count = copyOnWriteArrayList.size(); stopWatch.start("Read:copyOnWriteArrayList"); //循环1000000次并发从CopyOnWriteArrayList随机查询元素 IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); stopWatch.start("Read:synchronizedList"); //循环1000000次并发从加锁的ArrayList随机查询元素 IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); log.info(stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList", copyOnWriteArrayList.size()); result.put("synchronizedList", synchronizedList.size()); return result; } }
- 大量写的场景(10 万次 add 操作),CopyOnWriteArray 几乎比同步的 ArrayList 慢一百倍。而在大量读的场景下(100 万次 get 操作),CopyOnWriteArray 又比同步的 ArrayList快五倍以上。为何在大量写的场景下,CopyOnWriteArrayList 会这么慢呢?以 add 方法为例,每次 add 时,都会用 Arrays.copyOf 创建一个新数组,频繁 add 时内存的申请释放消耗会很大
5手动创建线程池
- 《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。这一条规则的背后,是大量血淋淋的生产事故,最典型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致OOM 问题。
- 初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提交任务,每个任务都会创建一个比较大的字符串然后休眠一小时,执行程序后不久,日志中就出现了如下 OOM。翻看 newFixedThreadPool 方法的源码不难发现,线程池的工作队列直接 new 了一个LinkedBlockingQueue,而默认构造方法的 LinkedBlockingQueue 是一个Integer.MAX_VALUE 长度的队列,可以认为是无界的。虽然使用 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。
- 改为使用 newCachedThreadPool 方法来获得线程池。程序运行不久后,同样看到了如下 OOM 异常。这次 OOM 的原因是无法创建线程,翻看 newCachedThreadPool 的源码可以看到,这种线程池的最大线程数是 Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。由于我们的任务需要 1 小时才能执行完成,大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM
- 线程池默认的工作行为:
- 不会初始化 corePoolSize 个线程,有任务来了才创建工作线程
- 当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列中
- 当工作队列满了后扩容线程池,一直到线程个数达到 maximumPoolSize 为止
- 如果队列已满且达到了最大线程后还有任务进来,按照拒绝策略处理
- 当线程数大于核心线程数时,线程等待 keepAliveTime 后还是没有任务需要处理的话, 收缩线程到核心线程数
- 我们有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队列的 offer 方法,造成这个队列已满的假象呢?由于我们 Hack 了队列,在达到了最大线程后势必会触发拒绝策略,那么能否实现一个自定义的拒绝策略处理程序,这个时候再把任务真正插入队列呢?
6线程池本身是不是可复用的
-
某项目生产环境时不时有报警提示线程数过多,超过2000 个,收到报警后查看监控发现,瞬时线程数比较多但过一会儿又会降下来,线程数抖动很厉害,而应用的访问量变化不大。一般而言,线程池肯定是复用的,有 5 个以内的线程池都可以认为正常,而 1000 多个线程池肯定不正常。
-
@RestController @RequestMapping("threadpoolreuse") @Slf4j public class ThreadPoolReuseController { /* 调用 ThreadPoolHelper的 getThreadPool 方法来获得线程池,然后提交数个任务到线程池处理 但是,来到 ThreadPoolHelper 的实现让人大跌眼镜,getThreadPool 方法居然是每次都使用 Executors.newCachedThreadPool 来创建一个线程池,线程池没有复用 */ @GetMapping("wrong") public String wrong() throws InterruptedException { ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool(); IntStream.rangeClosed(1, 10).forEach(i -> { threadPool.execute(() -> { String payload = IntStream.rangeClosed(1, 1000000) .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } log.debug(payload); }); }); return "OK"; } /* 解决方案 使用一个静态字段来存放线程池的引用,返回线程池的代码直接返回这个静态字段即可 */ static class ThreadPoolHelper { private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, 50, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()); static ThreadPoolExecutor getRightThreadPool() { return threadPoolExecutor; } } }
7线程池的混用策略
- 要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列。对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队列。而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。
- 线程池使用的是CallerRunsPolicy 策略,所以直接使用这个线程池进行异步计算的话,当线程池饱和的时 候,计算任务会在执行 Web 请求的 Tomcat 线程执行,这时就会进一步影响到其他同步处理的线程,甚至造成整个应用程序崩溃
- 解决方案很简单,使用独立的线程池来做这样的“计算任务”即可。可以看到,盲目复用线程池混用线程的问题在于,别人定义的线程池属性不一定适合你的任务,而且混用会相互干扰。
- Java 8 的 parallel stream 功能,可以让我们很方便地并行处理集合中的元素,其背后是共享同一个 ForkJoinPool,默认并行度是CPU 核数 -1。对于 CPU 绑定的任务来说,使用这样的配置比较合适,但如果集合操作涉及同步 IO 操作的话(比如数据库操作、外部服务调用等),建议自定义一个ForkJoinPool(或普通线程池)。