百味皆苦 java后端开发攻城狮

(MK)并发编程精讲

2019-10-25
百味皆苦

线程八大核心基础

多线程实现方式

  • 实现多线程的方式有一种,两种还是四种?

    • 官方指明实现多线程的方式有两种
    • 方式一:继承Thread类
    //描述:     用Thread方式实现线程
    public class ThreadStyle extends Thread {
      
        @Override
        public void run() {
            System.out.println("用Thread类实现线程");
        }
      
        public static void main(String[] args) {
            new ThreadStyle().start();
        }
    }
    
    • 方式二:实现Runnable接口(优先选择)
    //描述:     用Runnable方式创建线程
    public class RunnableStyle implements Runnable {
      
        public static void main(String[] args) {
            Thread thread = new Thread(new RunnableStyle());
            thread.start();
        }
      
        @Override
        public void run() {
            System.out.println("用Runnable方法实现线程");
        }
    }
    
    • 同时使用两种方式会出现什么情况
    package threadcoreknowledge.createthreads;
      
    //描述:     同时使用Runnable和Thread两种实现线程的方式
    public class BothRunnableThread {
      
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("我来自Runnable");
                }
            }) {
                @Override
                public void run() {
                    System.out.println("我来自Thread");
                }
            }.start();
        }
    }
      
    //结果:我来自Thread
    //原因:看Thread源码
    //因为重写了Thread的run方法,所以原本的run方法就不被执行了
    
    • 总结来说,创建线程只有一种方式,那就是构造Thread类,而实现线程的执行单元有两种方式。

      方式一:实现Runnable接口的run方法,并把Runnable实例传给Thread类

      方式二:重写Thread的run方法(集成Thread类)

  • 错误观点:这些方式都是一些表面现象,本质都是那两种方式

    • 线程池创建线程也算是一种新建线程的方式:内部实现也是创建的Thread对象
    //描述:     线程池创建线程的方法
    public class ThreadPool5 {
      
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < 1000; i++) {
                executorService.submit(new Task() {});
            }
        }
    }
      
    class Task implements Runnable {
      
        @Override
        public void run() {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }
    }
    
    • 通过Callable和FutureTask创建线程,也算是一种创建线程的方式
    • 无返回值是实现Runnable接口,有返回值是实现callable接口,所以callable是实现线程新方式
    • 定时器,匿名内部类,lambda表达式
    //描述:     定时器创建线程
    public class DemoTimmerTask {
      
        public static void main(String[] args) {
            Timer timer = new Timer();
            timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            }, 1000, 1000);
        }
    }
    
    //描述:     匿名内部类的方式
    public class AnonymousInnerClassDemo {
      
        public static void main(String[] args) {
            new Thread() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            }.start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            }).start();
        }
    }
    
    //描述:     lambda表达式创建线程
    public class Lambda {
      
        public static void main(String[] args) {
            new Thread(() -> System.out.println(Thread.currentThread().getName())).start();
        }
    }
    

启动线程正确方式

  • 怎样才是启动线程的正确方式?

  • start方法的含义

    启动新线程

    //描述:     对比start和run两种启动线程的方式
    public class StartAndRunMethod {
      
        public static void main(String[] args) {
          Runnable runnable = () -> {
                System.out.println(Thread.currentThread().getName());
      
            };
          runnable.run();
      
            new Thread(runnable).start();
      }
    }
      
    

    准备工作

    不能重复start,否则会抛出异常,因为线程有一个状态属性,每次启动线程都会去检查

    package threadcoreknowledge.startthread;
      
    //描述:     演示不能两次调用start方法,否则会报错
    public class CantStartTwice {
        public static void main(String[] args) {
            Thread thread = new Thread();
            thread.start();
            thread.start();
        }
      
    }
      
    
  • run方法的含义

    Runnable target = null;
    if(target != null){
      target.run();
    }
    
    
  • 启动线程应该调用start方法,从而间接的调用run方法

正确停止线程

  • 如何正确停止线程?
  • 原理介绍:使用interrupt来通知,而不是强制
  • 正常中断线程
package threadcoreknowledge.stopthreads;

//描述:     run方法内没有sleep或wait方法时,停止线程
public class RightWayStopThreadWithoutSleep implements Runnable {

    @Override
    public void run() {
        int num = 0;
      //通过检查线程是否已经被中断
        while (!Thread.currentThread().isInterrupted() && num <= Integer.MAX_VALUE / 2) {
            if (num % 10000 == 0) {
                System.out.println(num + "是10000的倍数");
            }
            num++;
        }
        System.out.println("任务运行结束了");
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new RightWayStopThreadWithoutSleep());
        thread.start();
        Thread.sleep(2000);
        thread.interrupt();
    }
}

  • 睡眠情况下中断

//描述:     带有sleep的中断线程的写法
public class RightWayStopThreadWithSleep {

    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            int num = 0;
            try {
                while (num <= 300 && !Thread.currentThread().isInterrupted()) {
                    if (num % 100 == 0) {
                        System.out.println(num + "是100的倍数");
                    }
                    num++;
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
        Thread.sleep(500);
        thread.interrupt();
    }
}

  • 循环内睡眠
//描述:     如果在执行过程中,每次循环都会调用sleep或wait等方法,那么不需要每次迭代都检查是否已中断
public class RightWayStopThreadWithSleepEveryLoop {
    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            int num = 0;
            try {
                while (num <= 10000) {
                    if (num % 100 == 0) {
                        System.out.println(num + "是100的倍数");
                    }
                    num++;
                    Thread.sleep(10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
        Thread.sleep(5000);
        thread.interrupt();
    }
}

  • 中断失效,如果在sleep中进行中断,会清除中断标识,所以去检测线程是否被中断是检测不到的
//描述:     如果while里面放try/catch,会导致中断失效
public class CantInterrupt {

    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            int num = 0;
            while (num <= 10000 && !Thread.currentThread().isInterrupted()) {
                if (num % 100 == 0) {
                    System.out.println(num + "是100的倍数");
                }
                num++;
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
        Thread.sleep(5000);
        thread.interrupt();
    }
}

  • 优先选择:传递中断

//描述:     最佳实践:catch了InterruptedExcetion之后的优先选择:在方法签名中抛出异常 那么在run()就会强制try/catch
public class RightWayStopThreadInProd implements Runnable {

    @Override
    public void run() {
        while (true && !Thread.currentThread().isInterrupted()) {
            System.out.println("go");
            try {
                throwInMethod();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                //保存日志、停止程序
                System.out.println("保存日志");
                e.printStackTrace();
            }
        }
    }

    private void throwInMethod() throws InterruptedException {
      		//向上抛出异常
            Thread.sleep(2000);
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new RightWayStopThreadInProd());
        thread.start();
        Thread.sleep(1000);
        thread.interrupt();
    }
}

  • 不想或无法传递:恢复中断
//描述:最佳实践2:在catch子语句中调用Thread.currentThread().interrupt()来恢复设置中断状态,以便于在后续的执行中,依然能够检查到刚才发生了中断
//回到刚才RightWayStopThreadInProd补上中断,让它跳出
public class RightWayStopThreadInProd2 implements Runnable {

    @Override
    public void run() {
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted,程序运行结束");
                break;
            }
            reInterrupt();
        }
    }

    private void reInterrupt() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
          //中断
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new RightWayStopThreadInProd2());
        thread.start();
        Thread.sleep(1000);
        thread.interrupt();
    }
}

  • 不应屏蔽中断

  • 响应中断的方法:

    • Object.wait()/wait(long)/wait(long,int)
    • Thread.sleep(long)/Thread.sleep(long,int)
    • Thread.join()/Thread.join(long)/Thread.join(long,int)
    • java.util.concurrent.BlockingQueue.take()/put(E)
    • java.util.concurrent.locks.Lock.lockInterruptibly()
    • java.util.concurrent.CountDownLatch.await()
    • java.util.concurrent.CyclicBarrier.await()
    • java.util.concurrent.Exchanger.exchange(V)
    • java.nio.channels.InterruptiableChannel相关方法
    • java.nio.channels.Selector的相关方法
  • 正确使用interrupt中断线程的好处:

    通过使用interrupt来发出一个信号,让线程自己去处理,使线程代码更加安全,也完成了清理工作,数据的完整性也得到了保障

错误的停止线程

  • 被弃用的stop,suspend,和resume方法
//描述:     错误的停止方法:用stop()来停止线程,会导致线程运行一半突然停止,没办法完成一个基本单位的操作(一个连队),会造成脏数据(有的连队多领取少领取装备)。
public class StopThread implements Runnable {

    @Override
    public void run() {
        //模拟指挥军队:一共有5个连队,每个连队10人,以连队为单位,发放武器弹药,叫到号的士兵前去领取
        for (int i = 0; i < 5; i++) {
            System.out.println("连队" + i + "开始领取武器");
            for (int j = 0; j < 10; j++) {
                System.out.println(j);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("连队"+i+"已经领取完毕");
        }
    }

    public static void main(String[] args) {
        Thread thread = new Thread(new StopThread());
        thread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread.stop();
        //会导致某个连队中某些人没有领取到
    }
}

  • 用volatile设置boolean标记位
//描述:     演示用volatile的局限:part1 看似可行,但是程序并没有停止
public class WrongWayVolatile implements Runnable {

    private volatile boolean canceled = false;

    @Override
    public void run() {
        int num = 0;
        try {
            while (num <= 100000 && !canceled) {
                if (num % 100 == 0) {
                    System.out.println(num + "是100的倍数。");
                }
                num++;
                Thread.sleep(1);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        WrongWayVolatile r = new WrongWayVolatile();
        Thread thread = new Thread(r);
        thread.start();
        Thread.sleep(5000);
        r.canceled = true;
    }
}
  • 使用生产者和消费者模拟
package threadcoreknowledge.stopthreads.volatiledemo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;

//描述:     演示用volatile的局限part2 陷入阻塞时,volatile是无法停止线程的 此例中,生产者的生产速度很快,消费者消费速度慢,所以阻塞队列满了以后,生产者会阻塞,等待消费者进一步消费
public class WrongWayVolatileCantStop {

    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue storage = new ArrayBlockingQueue(10);

        Producer producer = new Producer(storage);
        Thread producerThread = new Thread(producer);
        producerThread.start();
        Thread.sleep(1000);

        Consumer consumer = new Consumer(storage);
        while (consumer.needMoreNums()) {
            System.out.println(consumer.storage.take()+"被消费了");
            Thread.sleep(100);
        }
        System.out.println("消费者不需要更多数据了。");

        //一旦消费不需要更多数据了,我们应该让生产者也停下来,但是实际情况是程序并没有结束
        producer.canceled=true;
        System.out.println(producer.canceled);
    }
}

class Producer implements Runnable {

    public volatile boolean canceled = false;

    BlockingQueue storage;

    public Producer(BlockingQueue storage) {
        this.storage = storage;
    }


    @Override
    public void run() {
        int num = 0;
        try {
            while (num <= 100000 && !canceled) {
                if (num % 100 == 0) {
                    storage.put(num);
                    System.out.println(num + "是100的倍数,被放到仓库中了。");
                }
                num++;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("生产者结束运行");
        }
    }
}

class Consumer {

    BlockingQueue storage;

    public Consumer(BlockingQueue storage) {
        this.storage = storage;
    }

    public boolean needMoreNums() {
        if (Math.random() > 0.95) {
            return false;
        }
        return true;
    }
}
  • 改正为正确停止线程:使用interrupt来中断线程
package threadcoreknowledge.stopthreads.volatiledemo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//描述:     用中断来修复刚才的无尽等待问题
public class WrongWayVolatileFixed {

    public static void main(String[] args) throws InterruptedException {
        WrongWayVolatileFixed body = new WrongWayVolatileFixed();
        ArrayBlockingQueue storage = new ArrayBlockingQueue(10);

        Producer producer = body.new Producer(storage);
        Thread producerThread = new Thread(producer);
        producerThread.start();
        Thread.sleep(1000);

        Consumer consumer = body.new Consumer(storage);
        while (consumer.needMoreNums()) {
            System.out.println(consumer.storage.take() + "被消费了");
            Thread.sleep(100);
        }
        System.out.println("消费者不需要更多数据了。");

		//中断生产者线程
        producerThread.interrupt();
    }


    class Producer implements Runnable {

        BlockingQueue storage;

        public Producer(BlockingQueue storage) {
            this.storage = storage;
        }


        @Override
        public void run() {
            int num = 0;
            try {
                while (num <= 100000 && !Thread.currentThread().isInterrupted()) {
                    if (num % 100 == 0) {
                        storage.put(num);
                        System.out.println(num + "是100的倍数,被放到仓库中了。");
                    }
                    num++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("生产者结束运行");
            }
        }
    }

    class Consumer {

        BlockingQueue storage;

        public Consumer(BlockingQueue storage) {
            this.storage = storage;
        }

        public boolean needMoreNums() {
            if (Math.random() > 0.95) {
                return false;
            }
            return true;
        }
    }
}
  • 正确的中断方式
package threadcoreknowledge.stopthreads;

//描述:     注意Thread.interrupted()方法的目标对象是“当前线程”,而不管本方法来自于哪个对象
public class RightWayInterrupted {

    public static void main(String[] args) throws InterruptedException {

        Thread threadOne = new Thread(new Runnable() {
            @Override
            public void run() {
                for (; ; ) {
                }
            }
        });

        // 启动线程
        threadOne.start();
        //设置中断标志
        threadOne.interrupt();
        //获取中断标志,true
        System.out.println("isInterrupted: " + threadOne.isInterrupted());
        //获取中断标志并重置,false
        System.out.println("isInterrupted: " + threadOne.interrupted());
        //获取中断标志并重直,false
        System.out.println("isInterrupted: " + Thread.interrupted());
        //获取中断标志,true
        System.out.println("isInterrupted: " + threadOne.isInterrupted());
        threadOne.join();
        System.out.println("Main thread is over.");
    }
}
  • 面试问题:

  • 如何停止线程?

    • 原理:用interrupt来请求,而不是stop,volatile。好处是保证数据安全,把主动权交给被中断的线程
    • 想停止线程,要请求方,被停止方,子方法被调用方相互配合
    • 最后说错误的方法:stop/suspend已经废弃,volatile的boolean无法处理长时间阻塞情况
  • 如何处理不可中断的阻塞?

    没有通用解决方案,针对特定的情况用特定的方法,尽可能让线程做到能够响应中断

线程的生命周期

  • 线程的生命周期有哪六种状态?

    New,Runnable,Blocked,Waiting,Timed Waiting,terminated

8aNlMF.png

  • 演示new,runnable,terminated状态
//描述:     展示线程的NEW、RUNNABLE、Terminated状态。即使是正在运行,也是Runnable状态,而不是Running。
public class NewRunnableTerminated implements Runnable {

    public static void main(String[] args) {
        Thread thread = new Thread(new NewRunnableTerminated());
        //打印出NEW的状态
        System.out.println(thread.getState());
        thread.start();
        System.out.println(thread.getState());
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印出RUNNABLE的状态,即使是正在运行,也是RUNNABLE,而不是RUNNING
        System.out.println(thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印出TERMINATED状态
        System.out.println(thread.getState());
    }

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            System.out.println(i);
        }
    }
}

  • 演示blocked,waiting,timed_waitting状态
//描述:     展示Blocked, Waiting, TimedWaiting
public class BlockedWaitingTimedWaiting implements Runnable{
    public static void main(String[] args) {
        BlockedWaitingTimedWaiting runnable = new BlockedWaitingTimedWaiting();
        Thread thread1 = new Thread(runnable);
        thread1.start();
        Thread thread2 = new Thread(runnable);
        thread2.start();
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印出Timed_Waiting状态,因为正在执行Thread.sleep(1000);
        System.out.println(thread1.getState());
        //打印出BLOCKED状态,因为thread2想拿得到sync()的锁却拿不到
        System.out.println(thread2.getState());
        try {
            Thread.sleep(1300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印出WAITING状态,因为执行了wait()
        System.out.println(thread1.getState());

    }

    @Override
    public void run() {
        syn();
    }

    private synchronized void syn() {
        try {
            Thread.sleep(1000);
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  • 阻塞状态:blocked(被阻塞),waitting(等待),timed_waitting(计时等待),都属于阻塞
  • 线程有哪几种状态,生命周期是什么?

Thread和Object重要方法

  • 为什么线程通信的方法wait(),notify(),notifyAll()被定义在Object中,而sleep方法被定义在Thread类中?
  • 怎样用三种方法实现生产者模式?
  • Java SE 8和Java 1.8 和JDK 8 是什么关系,是否是同一种东西?
  • join和sleep和wait期间线程的状态分别是什么?为什么?

方法概览

84jmH1.png

  • currentThread
package threadcoreknowledge.threadobjectclasscommonmethods;

//描述:     演示打印main, Thread-0, Thread-1
public class CurrentThread implements Runnable {

    public static void main(String[] args) {
        new CurrentThread().run();
        new Thread(new CurrentThread()).start();
        new Thread(new CurrentThread()).start();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

//main
//Thread-0
//Thread-1

  • wait,notify,notifyAll作用

  • 阻塞阶段:直到以下四种情况之一发生时,才会被唤醒

    另一个线程调用这个对象的notify方法且刚好被唤醒的线程是本线程

    package threadcoreknowledge.threadobjectclasscommonmethods;
      
    //描述:     展示wait和notify的基本用法 1. 研究代码执行顺序 2. 证明wait释放锁
    public class Wait {
      
        public static Object object = new Object();
      
        static class Thread1 extends Thread {
      
            @Override
            public void run() {
                synchronized (object) {
                    System.out.println(Thread.currentThread().getName() + "开始执行了");
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程" + Thread.currentThread().getName() + "获取到了锁。");
                }
            }
        }
      
        static class Thread2 extends Thread {
      
            @Override
            public void run() {
                synchronized (object) {
                    object.notify();
                    System.out.println("线程" + Thread.currentThread().getName() + "调用了notify()");
                }
            }
        }
      
        public static void main(String[] args) throws InterruptedException {
            Thread1 thread1 = new Thread1();
            Thread2 thread2 = new Thread2();
            thread1.start();
            Thread.sleep(200);
            thread2.start();
        }
    }
      
    //线程1开始执行了
    //线程2调用了notify()
    //线程1获取到了锁
      
    

    另一个线程调用这个对象的notifyAll方法

    package threadcoreknowledge.threadobjectclasscommonmethods;
      
    //描述:     3个线程,线程1和线程2首先被阻塞,线程3唤醒它们。notify, notifyAll。 start先执行不代表线程先启动。
    public class WaitNotifyAll implements Runnable {
      
        private static final Object resourceA = new Object();
        public static void main(String[] args) throws InterruptedException {
          Runnable r = new WaitNotifyAll();
          Thread threadA = new Thread(r);
          Thread threadB = new Thread(r);
          Thread threadC = new Thread(new Runnable() {
              @Override
              public void run() {
                  synchronized (resourceA) {
                      resourceA.notifyAll();
                      System.out.println("ThreadC notified.");
                  }
              }
          });
          threadA.start();
          threadB.start();
            
          Thread.sleep(200);
          threadC.start();
      }
      @Override
      public void run() {
          synchronized (resourceA) {
              System.out.println(Thread.currentThread().getName()+" got resourceA lock.");
              try {
                  System.out.println(Thread.currentThread().getName()+" waits to start.");
                  resourceA.wait();
                  System.out.println(Thread.currentThread().getName()+"'s waiting to end.");
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
     }
      /*
      线程1 got resourceA lock.
      线程1 waits to start.
      线程2 got resourceA lock.
      线程2 waits to start.
      ThreadC notified.
      线程2 's waiting to end.
      线程1 's waiting to end.
      */
    

====

package threadcoreknowledge.threadobjectclasscommonmethods;

  //描述:     证明wait只释放当前的那把锁
  public class WaitNotifyReleaseOwnMonitor {

      private static volatile Object resourceA = new Object();
      private static volatile Object resourceB = new Object();

      public static void main(String[] args) {
          Thread thread1 = new Thread(new Runnable() {
              @Override
              public void run() {
                  synchronized (resourceA) {
                      System.out.println("ThreadA got resourceA lock.");
                      synchronized (resourceB) {
                          System.out.println("ThreadA got resourceB lock.");
                          try {
                              System.out.println("ThreadA releases resourceA lock.");
                              resourceA.wait();

                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }
              }
          });

          Thread thread2 = new Thread(new Runnable() {
              @Override
              public void run() {
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  synchronized (resourceA) {
                      System.out.println("ThreadB got resourceA lock.");
                      System.out.println("ThreadB tries to resourceB lock.");

                      synchronized (resourceB) {
                          System.out.println("ThreadB got resourceB lock.");
                      }
                  }
              }
          });

          thread1.start();
          thread2.start();
      }
  }
  /*
  ThreadA got resourceA lock.
  ThreadA got resourceB lock.
  ThreadA releases resourceA lock.
  ThreadB got resourceA lock.
  ThreadB tries to resourceB lock.
  打印结果完毕,程序并没有停止运行
  */

过了wait(long timeout)规定的超时时间,如果传入0就是永久等待

线程自身调用了interrupt()

  • 唤醒阶段:

  • 遇到中断:

  • wait,notify,notifyAll特点和性质

    用必须先拥有monitor

    只能唤醒其中一个

    属于Object类

    类似功能的Condition

生产者消费者模式

85zWYq.png

  • 代码实现
package threadcoreknowledge.threadobjectclasscommonmethods;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

//描述:     用wait/notify来实现生产者消费者模式
public class ProducerConsumerModel {
    public static void main(String[] args) {
        EventStorage eventStorage = new EventStorage();
        Producer producer = new Producer(eventStorage);
        Consumer consumer = new Consumer(eventStorage);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

class Producer implements Runnable {

    private EventStorage storage;

    public Producer(
            EventStorage storage) {
        this.storage = storage;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            storage.put();
        }
    }
}

class Consumer implements Runnable {

    private EventStorage storage;

    public Consumer(
            EventStorage storage) {
        this.storage = storage;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            storage.take();
        }
    }
}

class EventStorage {

    private int maxSize;
    private LinkedList<Date> storage;

    public EventStorage() {
        maxSize = 10;
        storage = new LinkedList<>();
    }

    public synchronized void put() {
        while (storage.size() == maxSize) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        storage.add(new Date());
        System.out.println("仓库里有了" + storage.size() + "个产品。");
        notify();
    }

    public synchronized void take() {
        while (storage.size() == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("拿到了" + storage.poll() + ",现在仓库还剩下" + storage.size());
        notify();
    }
}

线程交替打印奇偶数

  • 使用两个线程交替打印0到100的奇偶数
  • 基本思路,使用synchronized实现
package threadcoreknowledge.threadobjectclasscommonmethods;

//描述:     两个线程交替打印0~100的奇偶数,用synchronized关键字实现
public class WaitNotifyPrintOddEvenSyn {

    private static int count;

    private static final Object lock = new Object();

    //新建2个线程
    //1个只处理偶数,第二个只处理奇数(用位运算)
    //用synchronized来通信
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (count < 100) {
                    synchronized (lock) {
                        if ((count & 1) == 0) {
                            System.out.println(Thread.currentThread().getName() + ":" + count++);
                        }
                    }
                }
            }
        }, "偶数").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (count < 100) {
                    synchronized (lock) {
                        if ((count & 1) == 1) {
                            System.out.println(Thread.currentThread().getName() + ":" + count++);
                        }
                    }
                }
            }
        }, "奇数").start();
    }
}
//这个程序有许多的费操作,比如偶数线程连续拿到锁,但它只会执行一次if块内容,这与线程的竞争有关
  • 更好的方法是使用wait和notify实现
package threadcoreknowledge.threadobjectclasscommonmethods;


//描述:     两个线程交替打印0~100的奇偶数,用wait和notify
public class WaitNotifyPrintOddEveWait {

    private static int count = 0;
    private static final Object lock = new Object();


    public static void main(String[] args) {
        new Thread(new TurningRunner(), "偶数").start();
        new Thread(new TurningRunner(), "奇数").start();
    }

    //1. 拿到锁,我们就打印
    //2. 打印完,唤醒其他线程,自己就休眠
    static class TurningRunner implements Runnable {

        @Override
        public void run() {
            while (count <= 100) {
                synchronized (lock) {
                    //拿到锁就打印
                    System.out.println(Thread.currentThread().getName() + ":" + count++);
                    lock.notify();
                    if (count <= 100) {
                        try {
                            //如果任务还没结束,就让出当前的锁,并休眠
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}
  • 为什么wait方法需要在同步代码块中使用,而sleep不需要?

    防止死锁的发生,如果在wait之前跳到了其他线程中,那这个线程再跳回来执行wait之后就没有其他线程唤醒它了

  • 为什么线程通信的方法:wait,notify,notifyAll定义在Object中,而sleep方法定义在Thread中?

sleep详解

  • 作用:只想让线程在预期的时间执行,其他时间不要占用CPU资源

  • 不释放锁(包括synchronized和lock)

    package threadcoreknowledge.threadobjectclasscommonmethods;
      
    import sun.awt.windows.ThemeReader;
      
    //展示线程sleep的时候不释放synchronized的monitor,等sleep时间到了以后,正常结束后才释放锁
    public class SleepDontReleaseMonitor implements Runnable {
      
        public static void main(String[] args) {
          SleepDontReleaseMonitor sleepDontReleaseMonitor = new SleepDontReleaseMonitor();
            new Thread(sleepDontReleaseMonitor).start();
            new Thread(sleepDontReleaseMonitor).start();
        }
      
        @Override
      public void run() {
            syn();
        }
      
        private synchronized void syn() {
          System.out.println("线程" + Thread.currentThread().getName() + "获取到了monitor。");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程" + Thread.currentThread().getName() + "退出了同步代码块");
        }
    }
    /*
    线程Thread-0获取到了monitor
    线程Thread-0退出了同步代码块
    线程thread-1获取到了monitor
    线程Thread-1退出了同步代码块
    */
    

    ```java package threadcoreknowledge.threadobjectclasscommonmethods;

    import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

    /**描述: 演示sleep不释放lock(lock需要手动释放) */ public class SleepDontReleaseLock implements Runnable { private static final Lock lock = new ReentrantLock(); @Override public void run() { lock.lock(); System.out.println(“线程” + Thread.currentThread().getName() + “获取到了锁”); try { Thread.sleep(5000); System.out.println(“线程” + Thread.currentThread().getName() + “已经苏醒”); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }

    public static void main(String[] args) { SleepDontReleaseLock sleepDontReleaseLock = new SleepDontReleaseLock(); new Thread(sleepDontReleaseLock).start(); new Thread(sleepDontReleaseLock).start(); } } /* 线程Thread-0获取到了锁 线程Thread-0已经苏醒 线程Thread-1获取到了锁 线程Thread-1已经苏醒 */

sleep方法响应中断

​ 抛出InterruptedException

​ 清除中断状态


  package threadcoreknowledge.threadobjectclasscommonmethods;

  import java.util.Date;
  import java.util.concurrent.TimeUnit;

  /**
   * 描述:     每个1秒钟输出当前时间,被中断,观察。
   * Thread.sleep()
   * TimeUnit.SECONDS.sleep()
   */
  public class SleepInterrupted implements Runnable{

      public static void main(String[] args) throws InterruptedException {
          Thread thread = new Thread(new SleepInterrupted());
          thread.start();
          Thread.sleep(6500);
          thread.interrupt();
      }
      @Override
      public void run() {
          for (int i = 0; i < 10; i++) {
              System.out.println(new Date());
              try {
                  TimeUnit.HOURS.sleep(3);
                  TimeUnit.MINUTES.sleep(25);
                  TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                  System.out.println("我被中断了!");
                  e.printStackTrace();
              }
          }
      }
  }

  • sleep方法可以让线程进入waiting状态,并且不占用CPU资源,但是不释放锁,直到规定的时间后再执行,休眠期间如果被中断,会抛出异常并清除中断状态。

join详解

  • 作用:因为新的线程加入了我们,所以我们要等他执行完再出发

  • 用法:main等待thread1执行完毕,注意谁等谁

    package threadcoreknowledge.threadobjectclasscommonmethods;
    
    /**
     * 描述:     演示join,注意语句输出顺序,会变化。
     */
    public class Join {
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "执行完毕");
                }
            });
            Thread thread2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "执行完毕");
                }
            });
    
            thread.start();
            thread2.start();
            System.out.println("开始等待子线程运行完毕");
            thread.join();
            thread2.join();
            System.out.println("所有子线程执行完毕");
        }
    }
    /*
    开始等待子线程执行完毕
    Thread-1执行完毕
    Thread-0执行完毕
    所有子线程执行完毕
    */
    
  • 遇到中断

package threadcoreknowledge.threadobjectclasscommonmethods;

/**
 * 描述:     演示join期间被中断的效果
 */
public class JoinInterrupt {
    public static void main(String[] args) {
        Thread mainThread = Thread.currentThread();
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    mainThread.interrupt();
                    Thread.sleep(5000);
                    System.out.println("Thread1 finished.");
                } catch (InterruptedException e) {
                    System.out.println("子线程中断");
                }
            }
        });
        thread1.start();
        System.out.println("等待子线程运行完毕");
        try {
            thread1.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+"主线程中断了");
            thread1.interrupt();
        }
        System.out.println("子线程已运行完毕");
    }

}

  • 在join期间,线程到底是什么状态?waiting

    package threadcoreknowledge.threadobjectclasscommonmethods;
    
    /**
     * 描述:     先join再mainThread.getState()
     * 通过debugger看线程join前后状态的对比
     */
    public class JoinThreadState {
        public static void main(String[] args) throws InterruptedException {
            Thread mainThread = Thread.currentThread();
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                        System.out.println(mainThread.getState());
                        System.out.println("Thread-0运行结束");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
            System.out.println("等待子线程运行完毕");
            thread.join();
            System.out.println("子线程运行完毕");
    
        }
    }
    
    
  • join原理

package threadcoreknowledge.threadobjectclasscommonmethods;

/**
 * 描述:     通过讲解join原理,分析出join的代替写法
 */
public class JoinPrinciple {

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行完毕");
            }
        });

        thread.start();
        System.out.println("开始等待子线程运行完毕");
        thread.join();
      //等价代码
//        synchronized (thread) {
//            thread.wait();
//        }
        System.out.println("所有子线程执行完毕");
    }
}

yield详解

  • 作用:释放我的CPU时间片
  • 定位:JVM不保证遵循
  • 和sleep区别:是否随时可能再被调度

线程各属性

属性名称 用途 注意事项
编号(ID) 每个线程有自己的ID,用于标识不同的线程 唯一性,不允许被修改
名称(NAME) 在开发,调试或运行过程中,更容易区分每个不同的线程,定位问题 默认名称
是否是守护线程(isDaemon) true代表是【守护线程】,false代表不是【守护线程】,也就是【用户线程】 二选一;继承父线程;setDaemon
优先级(Priority) 告诉线程调度器,用户希望哪些线程相对多运行,哪些少运行 默认和父线程的优先级相等,共有十个等级,默认值5;不应依赖

线程ID

package threadcoreknowledge.threadobjectclasscommonmethods;

/**
 * 描述:     ID从1开始,JVM运行起来后,我们自己创建的线程的ID早已不是2.
 */
public class Id {

    public static void main(String[] args) {
        Thread thread = new Thread();
        System.out.println("主线程的ID"+Thread.currentThread().getId());
        System.out.println("子线程的ID"+thread.getId());
    }
}

线程名称

  • 默认线程名称使Thread-自增数
  • 可以对线程的名字进行修改

守护线程

  • 作用:给用户线程提供服务
  • 三个特性:
    • 线程类型默认继承自父线程
    • 被谁启动
    • 不影响JVM退出

优先级

  • 一共有十个,默认是5
  • 程序设计不应该依赖于优先级,因为不同操作系统优先级不一样

未捕获异常如何处理?

未捕获异常

  • 未捕获异常:UncaughtException
  • 需要使用全局异常处理器捕获:UncaughtExceptionHandler
    • 因为主线程可以轻松发现异常,子线程不行
    • 子线程异常无法用传统方式捕获
  • 自定义异常处理器
package threadcoreknowledge.uncaughtexception;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 描述:     自己的MyUncaughtExceptionHanlder
 */
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

    private String name;

    public MyUncaughtExceptionHandler(String name) {
        this.name = name;
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.WARNING, "线程异常,终止啦" + t.getName());
        System.out.println(name + "捕获了异常" + t.getName() + "异常");
    }
}

  • 使用异常处理器
package threadcoreknowledge.uncaughtexception;

/**
 * 描述:     使用刚才自己写的UncaughtExceptionHandler
 */
public class UseOwnUncaughtExceptionHandler implements Runnable {

    public static void main(String[] args) throws InterruptedException {
        Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler("捕获器1"));

        new Thread(new UseOwnUncaughtExceptionHandler(), "MyThread-1").start();
        Thread.sleep(300);
        new Thread(new UseOwnUncaughtExceptionHandler(), "MyThread-2").start();
        Thread.sleep(300);
        new Thread(new UseOwnUncaughtExceptionHandler(), "MyThread-3").start();
        Thread.sleep(300);
        new Thread(new UseOwnUncaughtExceptionHandler(), "MyThread-4").start();
    }


    @Override
    public void run() {
        throw new RuntimeException();
    }
}

双刃剑:多线程会导致的问题

*线程安全

  • 什么是线程安全:当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那这个对象是线程安全的
  • 什么是线程安全:不管业务中遇到怎样的多个线程访问某对象或某方法的情况,而在编写这个业务逻辑的时候,都不需要做任何额外的处理(也就是可以像单线程编程一样),程序也可以正常运行(不会因为多线程而出错),就可以称为线程安全。
  • 线程不安全:get同时set,额外同步

  • 什么情况下会出现线程安全问题?
a++情况
  • 运行结果错误,a++多线程下出现消失的请求现象

    package background;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 描述:     第一种:运行结果出错。 演示计数不准确(减少),找出具体出错的位置。
     */
    public class MultiThreadsError implements Runnable {
    
        static MultiThreadsError instance = new MultiThreadsError();
        int index = 0;
        static AtomicInteger realIndex = new AtomicInteger();
        static AtomicInteger wrongCount = new AtomicInteger();
        //让线程根据我们的需要在某个位置进行等待,直到所等待的人员都就绪再一起触发
        static volatile CyclicBarrier cyclicBarrier1 = new CyclicBarrier(2);//等待两个线程
        static volatile CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
    
       //记录线程发生冲突的错误位置
        final boolean[] marked = new boolean[10000000];
    
        public static void main(String[] args) throws InterruptedException {
    
            Thread thread1 = new Thread(instance);
            Thread thread2 = new Thread(instance);
            thread1.start();
            thread2.start();
            thread1.join();
            thread2.join();
            System.out.println("表面上结果是" + instance.index);//运行结果普遍偏少
            System.out.println("真正运行的次数" + realIndex.get());//正确结果
            System.out.println("错误次数" + wrongCount.get());
    
        }
    
        @Override
        public void run() {
          //特殊情况,如果第一次就发生了线程错误
            marked[0] = true;
            for (int i = 0; i < 10000; i++) {
                try {
                    //重置
                    cyclicBarrier2.reset();
                    //等待
                    cyclicBarrier1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                index++;
                try {
                    cyclicBarrier1.reset();
                    cyclicBarrier2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                realIndex.incrementAndGet();
                synchronized (instance) {
                    if (marked[index] && marked[index - 1]) {
                        System.out.println("发生错误" + index);
                        wrongCount.incrementAndGet();
                    }
                    marked[index] = true;
                }
            }
        }
    }
    
    

    image.png

  • 活跃性问题:死锁,活锁,饥饿

死锁

image.png

package background;

/**
 * 描述:     第二章线程安全问题,演示死锁。
 */
public class MultiThreadError implements Runnable {

    int flag = 1;
    static Object o1 = new Object();
    static Object o2 = new Object();

    public static void main(String[] args) {
        MultiThreadError r1 = new MultiThreadError();
        MultiThreadError r2 = new MultiThreadError();
        r1.flag = 1;
        r2.flag = 0;
        new Thread(r1).start();
        new Thread(r2).start();
    }

    @Override
    public void run() {
        System.out.println("flag = " + flag);
        if (flag == 1) {
            synchronized (o1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o2) {
                    System.out.println("1");
                }
            }
        }
        if (flag == 0) {
            synchronized (o2) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o1) {
                    System.out.println("0");
                }
            }
        }
    }
}

对象发布和初始化
  • 对象发布和初始化的时候的安全问题:

    • 什么是逸出:

      • 1.方法返回一个private对象(private的本意是不让外部访问)
      package background;
          
      import com.sun.javafx.geom.Matrix3f;
      import java.util.HashMap;
      import java.util.Map;
          
      /**
       * 描述:     发布逸出
       */
      public class MultiThreadsError3 {
          
          private Map<String, String> states;
          
          public MultiThreadsError3() {
              states = new HashMap<>();
              states.put("1", "周一");
              states.put("2", "周二");
              states.put("3", "周三");
              states.put("4", "周四");
          }
          
          public Map<String, String> getStates() {
              return states;
          }
          
          public Map<String, String> getStatesImproved() {
              return new HashMap<>(states);
          }
          
          public static void main(String[] args) {
              MultiThreadsError3 multiThreadsError3 = new MultiThreadsError3();
              Map<String, String> states = multiThreadsError3.getStates();
      //        System.out.println(states.get("1"));
      //        states.remove("1");map是服务于很多个类的,不能随意篡改map的数据,会出现安全隐患
      //        System.out.println(states.get("1"));
          
              System.out.println(multiThreadsError3.getStatesImproved().get("1"));
              multiThreadsError3.getStatesImproved().remove("1");
              System.out.println(multiThreadsError3.getStatesImproved().get("1"));
          
          }
      }
          
      
      • 2.还未完成初始化(构造函数没完全执行完毕)就把对象提供给外界,比如:
      • 在构造函数中未初始化完毕就this赋值
      package background;
          
      /**
       * 描述:     初始化未完毕,就this赋值
       */
      public class MultiThreadsError4 {
          
          static Point point;
          
          public static void main(String[] args) throws InterruptedException {
              new PointMaker().start();
      //        Thread.sleep(10);
              Thread.sleep(105);
              if (point != null) {
                  System.out.println(point);
              }
          }
      }
          
      class Point {
          
          private final int x, y;
          
          public Point(int x, int y) throws InterruptedException {
              this.x = x;
              MultiThreadsError4.point = this;
              Thread.sleep(100);
              this.y = y;
          }
          
          @Override
          public String toString() {
              return x + "," + y;
          }
      }
          
      class PointMaker extends Thread {
          
          @Override
          public void run() {
              try {
                  new Point(1, 1);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      
      • 隐式逸出——注册监听事件
      package background;
          
      /**
       * 描述:     观察者模式
       */
      public class MultiThreadsError5 {
          
          int count;
          
          public MultiThreadsError5(MySource source) {
              source.registerListener(new EventListener() {
                  @Override
                  public void onEvent(Event e) {
                      System.out.println("\n我得到的数字是" + count);
                  }
          
              });
              for (int i = 0; i < 10000; i++) {
                  System.out.print(i);
              }
              count = 100;
          }
          
          public static void main(String[] args) {
              MySource mySource = new MySource();
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          Thread.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      mySource.eventCome(new Event() {
                      });
                  }
              }).start();
              MultiThreadsError5 multiThreadsError5 = new MultiThreadsError5(mySource);
          }
          
          static class MySource {
          
              private EventListener listener;
          
              void registerListener(EventListener eventListener) {
                  this.listener = eventListener;
              }
          
              void eventCome(Event e) {
                  if (listener != null) {
                      listener.onEvent(e);
                  } else {
                      System.out.println("还未初始化完毕");
                  }
              }
          
          }
          
          interface EventListener {
          
              void onEvent(Event e);
          }
          
          interface Event {
          
          }
      }
          
      

      用工厂模式修复刚才的初始化问题

      package background;
      /**
       * 描述:     用工厂模式修复刚才的初始化问题
       */
      public class MultiThreadsError7 {
          
          int count;
          private EventListener listener;
          
          private MultiThreadsError7(MySource source) {
              listener = new EventListener() {
                  @Override
                  public void onEvent(MultiThreadsError5.Event e) {
                      System.out.println("\n我得到的数字是" + count);
                  }
          
              };
              for (int i = 0; i < 10000; i++) {
                  System.out.print(i);
              }
              count = 100;
          }
          
          public static MultiThreadsError7 getInstance(MySource source) {
              MultiThreadsError7 safeListener = new MultiThreadsError7(source);
              source.registerListener(safeListener.listener);
              return safeListener;
          }
          
          public static void main(String[] args) {
              MySource mySource = new MySource();
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          Thread.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      mySource.eventCome(new MultiThreadsError5.Event() {
                      });
                  }
              }).start();
              MultiThreadsError7 multiThreadsError7 = new MultiThreadsError7(mySource);
          }
          
          static class MySource {
          
              private EventListener listener;
          
              void registerListener(EventListener eventListener) {
                  this.listener = eventListener;
              }
          
              void eventCome(MultiThreadsError5.Event e) {
                  if (listener != null) {
                      listener.onEvent(e);
                  } else {
                      System.out.println("还未初始化完毕");
                  }
              }
          
          }
          
          interface EventListener {
          
              void onEvent(MultiThreadsError5.Event e);
          }
          
          interface Event {
          
          }
      }
      

      构造函数中运行线程

      package background;
          
      import java.util.HashMap;
      import java.util.Map;
          
      /**
       * 描述:     构造函数中新建线程
       */
      public class MultiThreadsError6 {
          
          private Map<String, String> states;
          
          public MultiThreadsError6() {
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      states = new HashMap<>();
                      states.put("1", "周一");
                      states.put("2", "周二");
                      states.put("3", "周三");
                      states.put("4", "周四");
                  }
              }).start();
          }
          
          public Map<String, String> getStates() {
              return states;
          }
          
          public static void main(String[] args) throws InterruptedException {
              MultiThreadsError6 multiThreadsError6 = new MultiThreadsError6();
              Thread.sleep(1000);
              System.out.println(multiThreadsError6.getStates().get("1"));
          }
      }
      
  • 各种需要考虑线程安全的情况

    • 访问共享资源或变量,会有并发风险,比如对象的属性,静态变量,共享缓存,数据库等
    • 所有依赖时序的操作,即使每一步操作都是线程安全的,还是存在并发问题:read-modify-write、check-then-act
    • 不同的数据之间存在捆绑关系的时候
    • 我们使用其他类的时候,如果对方没有声明自己是线程安全的

性能问题

  • 为什么多线程会带来性能问题:
    • 调度:上下文切换
      • 什么是上下文?
      • 缓存开销
      • 何时会导致密集的上下文切换
    • 协作:内存同步

Java内存模型

  • JVM内存结构,和Java虚拟机的运行时区域有关。
  • Java内存模型,和Java的并发编程有关
  • Java对象模型,和Java对象在虚拟机中的表现形式有关。

Java内存结构

image.png

Java对象模型

image.png

  • Java对象自身的存储模型
  • JVM会给这个类创建一个instanceKlass,保存在方法区,用来在JVM层表示该Java类
  • 当我们在Java代码中,使用new创建一个对象的时候,JVM会创建一个instanceOopDesc对象,这个对象中包含了对象头以及实例数据。

Java内存模型

  • JMM(Java memory model):是一组规范,需要各个JVM的实现来遵守JMM规范,以便于开发者可以利用这些规范,更方便地开发多线程程序。如果没有这样的一个JMM内存模型来规范,那么很可能经过了不同JVM的不同规则的重排序之后,导致不同的虚拟机上运行的结果不一样,那是很大的问题。
  • volatile,synchronized,lock等的原理都是JMM
  • 如果没有JMM,那就需要我们自己指定什么时候用内存栅栏等,那是相当麻烦的,幸好有了JMM,让我们只需要用同步工具和关键词就可以开发并发程序。
  • 最重要三点内容:重排序,可见性,原子性。
  • 为什么需要JMM?
    • c语言不存在内存模型的概念
    • 依赖处理器,不同处理器结果不一样
    • 无法保证并发安全
    • 需要一个标准,让多线程运行的结果可预期

重排序

  • 重排序的好处:提高处理速度
  • 重排序的三种情况:编译器优化、CPU指令重排、内存的“重排序”。
package jmm;

import java.util.concurrent.CountDownLatch;

/**
 * 描述:     演示重排序的现象 “直到达到某个条件才停止”,测试小概率事件
 */
public class OutOfOrderExecution {

    private static int x = 0, y = 0;
    private static int a = 0, b = 0;

    public static void main(String[] args) throws InterruptedException {
        int i = 0;
        for (; ; ) {
            i++;
            x = 0;
            y = 0;
            a = 0;
            b = 0;
			//闸门工具类
            CountDownLatch latch = new CountDownLatch(3);

            Thread one = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        latch.countDown();
                        latch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    a = 1;
                    x = b;
                }
            });
            Thread two = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        latch.countDown();
                        latch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    b = 1;
                    y = a;
                }
            });
            two.start();
            one.start();
            latch.countDown();
            one.join();
            two.join();

            String result = "第" + i + "次(" + x + "," + y + ")";
            if (x == 0 && y == 0) {
                System.out.println(result);
                break;
            } else {
                System.out.println(result);
            }
        }
    }


}
/*
程序一共有三种情况:
(1) a=1;x=b(0);b=1;y=a(1),最终结果为x=0,y=1;先执行one,后执行two
(2) b=1;y=a(0);a=1;x=b(1),最终结果为x=1,y=0;先执行two,后执行one
(3) b=1;a=1;x=b(1);y=a(1),最终结果为x=1,y=1;这种情况需要使用闸门,先执行两个线程的第一行代码
*/
  • 是否会出现x=0,y=0的情况?答案是会的,那是因为重排序发生了,四行代码的执行顺序的其中一种可能是:y=a; a=1; x=b; b=1;
  • 什么是重排序:在线程1内部的两行代码的实际执行顺序和代码在Java文件中的顺序不一致,代码指令并不是严格按照代码语句顺序执行的,他们的顺序被改变了,这就是重排序,这里被颠倒的是y=a和b=1这两行语句。

image.png

  • 编译器优化:包括JVM,JIT编译器等。
  • CPU指令重排:就算编译器不发生重排,CPU也可能对指令进行重排
  • 内存的“重排序”:线程A的修改操作对于线程B来说是看不到的,引出可见性问题

可见性

  • 线程之间的通讯有时间延迟,可能会发生数据读取错误问题

image.png

  • 首先修改线程去修改了x的值,这时还没有同步到主内存,而读取线程就去读取主内存中的x值,显然读取到的值是错误的
  • 为什么会有可见性问题?
    • CPU有多级缓存,导致读取的数据过期;
    • 高速缓存的容量比主内存小,但是速度仅次于寄存器,所以在CPU和主存之间就多了cache层;
    • 线程间的对于共享变量的可见性问题不是直接由多核引起的,而是由多缓存引起的。
    • 如果所有个核心都只用一个缓存,那么也就不存在内存可见性问题了;
    • 每个核心都会将自己需要的数据读取到独占缓存中,数据修改后也是写入到缓存中,然后等待刷入到主存中。所以会导致有些核心读取的值是一个过期的值。
package jmm;

/**
 * 描述:     演示可见性带来的问题
 */
public class FieldVisibility {

  //会出现可见性问题
     int a = 1;
     int b = 2;

    private void change() {
        a = 3;
        b = a;
    }


    private void print() {
        System.out.println("b=" + b + ";a=" + a);
    }

    public static void main(String[] args) {
        while (true) {
            FieldVisibility test = new FieldVisibility();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    test.change();
                }
            }).start();

            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    test.print();
                }
            }).start();
        }

    }


}
/*
a=3,b=2
a=1,b=2
a=3,b=3
a=1,b=3(出现了可见性问题,使用volatile来解决)
*/
  • 给b加volatile关键字来解决可见性问题:对b进行读操作时一定可见对b的写操作前的所有操作。
/**
 * 描述:     使用volatile解决可见性带来的问题
 */
public class FieldVisibility {

  //会出现可见性问题
     int a = 1;
     volatile int b = 2;

    private void change() {
        a = 3;
        b = a;
    }


    private void print() {
        System.out.println("b=" + b + ";a=" + a);
    }

    public static void main(String[] args) {
        while (true) {
            FieldVisibility test = new FieldVisibility();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    test.change();
                }
            }).start();

            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    test.print();
                }
            }).start();
        }

    }


}
/*
a=3,b=2
a=1,b=2
a=3,b=3
a=1,b=3(这种情况就不会出现了)
*/

主内存和本地内存

  • Java作为高级语言,屏蔽了这些底层细节,用JMM定义了一套读写内存数据的规范,虽然我们不再需要关心一级缓存和二级缓存的问题,但是,JMM抽象了主内存和本地内存的概念。
  • 这里说的本地内存并不是真的是一块给每个线程分配的内存,而是JMM的一个抽象,是对于寄存器、一级缓存、二级缓存等的抽象。

image.png

  • JMM有以下规定:
    • 所有的变量都存储在主内存中,同时每个线程也有自己独立的工作内存,工作内存中的变量内容是主内存中的拷贝
    • 线程不能直接读写主内存中的变量,而是只能操作自己工作内存中的变量,然后再同步到主内存中
    • 主内存是多个线程共享的,但线程间不共享工作内存,如果线程间需要通信,必须借助主内存中转来完成。
  • 所有的共享变量存在于主内存中,每个线程有自己的本地内存,而且线程读写共享数据也是通过本地内存交换的,所以才导致了可见性问题。

Happens-Before规则

单线程规则
  • 在单线程中,后面的操作一定知道前面的操作

image.png

锁操作
  • 锁包括synchronized和lock

image.png

image.png

volatile变量
  • 如果A是对volatile变量的写操作,B是对同一变量的读操作,那么hb(A,B)

image.png

线程启动

image.png

线程join

image.png

传递性
  • 如果hb(A,B)而且hb(B,C),那么可以推出hb(A,C)
中断
  • 一个线程被其他线程interrupt时,那么检测中断(isInterrupted)或者抛出InterruptedException一定能看到。
构造方法
  • 对象构造方法的最后一行指令happens-before于finalize()方法的第一条指令。
工具类
  • 线程安全的容器get一定能看到在此之前的put等存入动作
  • CountDownLatch
  • Semaphore
  • Future
  • 线程池
  • CyclicBarrier

volatile关键字

  • volatile是什么:volatile是一种同步机制,比synchronized或者lock相关类更轻量,因为使用volatile并不会发生上下文切换等开销很大的行为
  • 如果一个变量被修饰成volatile,那么JVM就会知道这个变量可能会被并发修改。
  • 但是开销小,相应的能力也小,虽然说volatile是用来同步的保证线程安全的,但是volatile做不到synchronized那样的原子保护,volatile仅在很有限的场景下才能发挥作用。
  • volatile不适用于a++场景
package jmm;


import java.util.concurrent.atomic.AtomicInteger;

/**
 * 描述:     不适用于volatile的场景
 */
public class NoVolatile implements Runnable {

    volatile int a;
  //原子变量,记录正确的结果
    AtomicInteger realA = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {
        Runnable r =  new NoVolatile();
        Thread thread1 = new Thread(r);
        Thread thread2 = new Thread(r);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(((NoVolatile) r).a);
        System.out.println(((NoVolatile) r).realA.get());
    }
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            a++;
          //增加
            realA.incrementAndGet();
        }
    }
}

  • volatile适用场合1,也是与synchronized的关系:boolean flag,如果一个共享变量自始至终只被各个线程赋值,而没有其他的操作,那么就可以用volatile来代替synchronized或者代替原子变量,因为赋值自身是有原子性的,而volatile又保证了可见性,所以就足以保证线程安全。
package jmm;

import java.util.concurrent.atomic.AtomicInteger;
import singleton.Singleton8;

/**
 * 描述:     volatile适用的情况1
 */
public class UseVolatile1 implements Runnable {

    volatile boolean done = false;
    AtomicInteger realA = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {
        Runnable r =  new UseVolatile1();
        Thread thread1 = new Thread(r);
        Thread thread2 = new Thread(r);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(((UseVolatile1) r).done);
        System.out.println(((UseVolatile1) r).realA.get());
    }
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            setDone();
            realA.incrementAndGet();
        }
    }

    private void setDone() {
      //只对变量进行赋值,且不依赖于以前变量的值是什么
        done = true;
    }
}

  • 适用场景2:作为刷新之前变量的触发器

image.png

  • 其中initialized就充当了触发器,当去读取initialized时,对initialized赋值以上的操作都是对当前线程是可见的。
两点作用
  • 可见性:读取一个volatile变量之前,需要先使相应的本地缓存失效,这样就必须到主内存读取最新值,写一个volatile属性会立即刷入到主内存。
  • 禁止指令重排序优化:解决单例双重锁乱序问题
    • OutOfOrderExecution类变量加了volatile后,就不会再出现x=0,y=0的情况了
小结
  • volatile修饰符适用于以下场景:某个属性被多个线程共享,其中有一个线程修改了此属性,其他线程可以立即得到修改后的值,比如boolean flag;或者作为触发器,实现轻量级同步。
  • volatile属性的读写操作都是无锁的,它不能代替synchronized,因为它没有提供原子性和互斥性。因为无锁,不需要花费时间在获取锁和释放锁上,所以说它是低成本的。
  • volatile只能作用于属性,我们用volatile修饰属性,这样compolers就不会对这个属性做指令重排序。
  • volatile提供了可见性,任何一个线程对其的修改将立即对其他线程可见。volatile属性不会被线程缓存,始终从主存中读取。
  • volatile提供了happens-before保证,对volatile变量v的写入happens-before所有其他线程后续对v的读操作。
  • volatile可以使得long和double的赋值是原子的。

synchronized

  • synchronized不仅保证了原子性,还保证了可见性。
  • synchronized不仅让被保护的代码安全,还近朱者赤

image.png

原子性

  • 什么是原子性:一系列的操作,要么全部执行成功,要么全部不执行,不会出现执行一半的情况,是不可分割的。典型案例为ATM取款。
  • i++不是原子性的,可以使用synchronized实现原子性
  • Java中的原子操作有哪些:
    • 除了long和double之外的基本类型(int ,byte,boolean,short,char,float)的赋值操作;
    • 所有引用reference的赋值操作,不管是32位的机器还是64位的机器
    • java.concurrent.Atomic.*包中所有的原子操作
long和double

image.png

  • 在32位上的JVM上,long和double的操作不是原子的,但是在64位的JVM上是原子的。在实际开发商用Java虚拟机中是不会出现的。
  • 原子操作+原子操作!=原子操作
  • 全同步的HashMap也不完全安全

单例模式的8种写法

饿汉式

  • 简单,但是没有懒加载,会造成资源的浪费
package singleton;

/**
 * 描述:     饿汉式(静态常量)(可用)
 */
public class Singleton1 {
	//类在加载的时候就实例化,避免了线程同步问题
    private final static Singleton1 INSTANCE = new Singleton1();

    private Singleton1() {
		//初始化操作
    }
    //发布到外部提供访问
    public static Singleton1 getInstance() {
        return INSTANCE;
    }

}

package singleton;

/**
 * 描述:     饿汉式(静态代码块)(可用)
 */
public class Singleton2 {

    private final static Singleton2 INSTANCE;

    static {
        INSTANCE = new Singleton2();
    }

    private Singleton2() {
    }

    public static Singleton2 getInstance() {
        return INSTANCE;
    }
}

懒汉式

  • 有线程安全问题
package singleton;

/**
 * 描述:     懒汉式(线程不安全)
 */
public class Singleton3 {

    private static Singleton3 instance;

    private Singleton3() {

    }

    public static Singleton3 getInstance() {
      //如果两个线程同时通过了if条件就会创建多个实例,所以线程不安全
        if (instance == null) {
            instance = new Singleton3();
        }
        return instance;
    }
}

package singleton;

/**
 * 描述:     懒汉式(线程安全)(不推荐)
 */
public class Singleton4 {

    private static Singleton4 instance;

    private Singleton4() {

    }
	//加synchronized关键字虽然线程安全了,但是效率变低了
    public synchronized static Singleton4 getInstance() {
        if (instance == null) {
            instance = new Singleton4();
        }
        return instance;
    }
}

package singleton;

/**
 * 描述:     懒汉式(线程不安全)(不推荐)
 */
public class Singleton5 {

    private static Singleton5 instance;

    private Singleton5() {

    }

    public static Singleton5 getInstance() {
        if (instance == null) {
          //只要多个线程同时进入了if代码块,就不能阻止他们创建实例了,只不过是谁先谁后的问题
            synchronized (Singleton5.class) {
                instance = new Singleton5();
            }
        }
        return instance;
    }
}

双重检查*

  • 优点:线程安全;延迟加载;效率较高。
  • 为什么要双重检查?因为要保证线程安全
  • 使用单重检查行不行?行,但是当线程比较多时性能不如双重检查
  • 为什么要用volatile?
    • 重排序会带来NPE:新建对象实际上有三个步骤,如果不加volatile当创建对象实例的时候可能构造函数还没有执行完第二个线程就去判断引用是否为空了,这时显然不是空的,但是对象里的属性却是空的,如果使用这个引用就会发送NPE
    • 防止重排序
package singleton;

/**
 * 描述:     双重检查(推荐面试使用)
 */
public class Singleton6 {

  //加volatile防止重排序,保证可见性
    private volatile static Singleton6 instance;

    private Singleton6() {

    }
	//使用双重锁
    public static Singleton6 getInstance() {
        if (instance == null) {
            synchronized (Singleton6.class) {
                if (instance == null) {
                    instance = new Singleton6();
                }
            }
        }
        return instance;
    }
}

静态内部类

  • 可用
package singleton;

/**
 * 描述:     静态内部类方式,可用
 */
public class Singleton7 {

    private Singleton7() {
    }

    private static class SingletonInstance {

        private static final Singleton7 INSTANCE = new Singleton7();
    }

    public static Singleton7 getInstance() {
        return SingletonInstance.INSTANCE;
    }
}

枚举模式*

  • 最好
  • 写法简单
  • 线程安全有保障
  • 避免反序列化破坏单例,防止反序列化重新创建新的对象
package singleton;

/**
 * 描述:     枚举单例
 */
public enum Singleton8 {
    INSTANCE;

    public void whatever() {

    }
}
//外部调用:Singleton8.INSTANCE.whatever();

死锁

  • 什么是死锁:发生在并发中,当两个或更多线程或进程相互持有对方所需要的资源,又不主动释放,导致所有人都无法继续前进,导致程序陷入无尽的阻塞,这就是死锁。

image.png

  • 如果多个线程之间的依赖关系是环形,存在环路的锁的依赖关系,那么也可能会发生死锁。

image.png

  • 死锁的影响:
    • 死锁的影响在不同系统中是不一样的,这取决于系统对死锁的处理能力
    • 在数据库中:检测并放弃事务
    • JVM中:无法自动处理
  • 必然发生死锁的情况
package deadlock;

/**
 * 描述:     必定发生死锁的情况
 */
public class MustDeadLock implements Runnable {

    int flag = 1;

    static Object o1 = new Object();
    static Object o2 = new Object();

    public static void main(String[] args) {
        MustDeadLock r1 = new MustDeadLock();
        MustDeadLock r2 = new MustDeadLock();
        r1.flag = 1;
        r2.flag = 0;
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
    }

    @Override
    public void run() {
        System.out.println("flag = " + flag);
        if (flag == 1) {
            synchronized (o1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o2) {
                    System.out.println("线程1成功拿到两把锁");
                }
            }
        }
        if (flag == 0) {
            synchronized (o2) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o1) {
                    System.out.println("线程2成功拿到两把锁");
                }
            }
        }
    }
}

  • 模拟多人转账发生死锁问题
package deadlock;

/**
 * 描述:     转账时候遇到死锁,一旦打开注释,便会发生死锁
 */
public class TransferMoney implements Runnable {

    int flag = 1;
    static Account a = new Account(500);
    static Account b = new Account(500);

    public static void main(String[] args) throws InterruptedException {
        TransferMoney r1 = new TransferMoney();
        TransferMoney r2 = new TransferMoney();
        r1.flag = 1;
        r2.flag = 0;
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("a的余额" + a.balance);
        System.out.println("b的余额" + b.balance);
    }

    @Override
    public void run() {
        if (flag == 1) {
            transferMoney(a, b, 200);
        }
        if (flag == 0) {
            transferMoney(b, a, 200);
        }
    }

    public static void transferMoney(Account from, Account to, int amount) {
        synchronized (from) {
          /*try {
            Thread.sleep(500);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }*/
          synchronized (to) {
            if (from.balance - amount < 0) {
              System.out.println("余额不足,转账失败。");
            }
            from.balance -= amount;
            to.balance = to.balance + amount;
            System.out.println("成功转账" + amount + "元");
          }
        }
    }
  
    static class Account {

        public Account(int balance) {
            this.balance = balance;
        }

        int balance;

    }
}

package deadlock;

import deadlock.TransferMoney.Account;
import java.util.Random;

/**
 * 描述:     多人同时转账,依然很危险
 */
public class MultiTransferMoney {

    private static final int NUM_ACCOUNTS = 500;
    private static final int NUM_MONEY = 1000;
    private static final int NUM_ITERATIONS = 1000000;
    private static final int NUM_THREADS = 20;

    public static void main(String[] args) {

        Random rnd = new Random();
        Account[] accounts = new Account[NUM_ACCOUNTS];
        for (int i = 0; i < accounts.length; i++) {
            accounts[i] = new Account(NUM_MONEY);
        }
        class TransferThread extends Thread {

            @Override
            public void run() {
                for (int i = 0; i < NUM_ITERATIONS; i++) {
                    int fromAcct = rnd.nextInt(NUM_ACCOUNTS);
                    int toAcct = rnd.nextInt(NUM_ACCOUNTS);
                    int amount = rnd.nextInt(NUM_MONEY);
                    TransferMoney.transferMoney(accounts[fromAcct], accounts[toAcct], amount);
                }
                System.out.println("运行结束");
            }
        }
        for (int i = 0; i < NUM_THREADS; i++) {
            new TransferThread().start();
        }
    }
}

  • 死锁发生的四个必要条件
    • 互斥条件
    • 请求与保持条件
    • 不可剥夺条件
    • 循环等待条件

定位死锁

  • 使用Java的jstack命令行工具
  • 使用ThreadMXBean
package deadlock;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;

/**
 * 描述:     用ThreadMXBean检测死锁
 */
public class ThreadMXBeanDetection implements Runnable {

    int flag = 1;

    static Object o1 = new Object();
    static Object o2 = new Object();

    public static void main(String[] args) throws InterruptedException {
        ThreadMXBeanDetection r1 = new ThreadMXBeanDetection();
        ThreadMXBeanDetection r2 = new ThreadMXBeanDetection();
        r1.flag = 1;
        r2.flag = 0;
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
        Thread.sleep(1000);
      //定位死锁
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
      //如果发生了死锁
        if (deadlockedThreads != null && deadlockedThreads.length > 0) {
            for (int i = 0; i < deadlockedThreads.length; i++) {
                ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
                System.out.println("发现死锁" + threadInfo.getThreadName());
            }
        }
    }

    @Override
    public void run() {
        System.out.println("flag = " + flag);
        if (flag == 1) {
            synchronized (o1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o2) {
                    System.out.println("线程1成功拿到两把锁");
                }
            }
        }
        if (flag == 0) {
            synchronized (o2) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o1) {
                    System.out.println("线程2成功拿到两把锁");
                }
            }
        }
    }
}

修复死锁

  • 线上发生死锁应该怎么处理?

    • 保存案发现场然后立即重启服务器
    • 暂时保证线上服务的安全,然后再利用刚才保存的信息,排查死锁,修改代码,重新发布
  • 常见修复策略

    • 避免策略:哲学家就餐的换手方案,转账换序方案。
    package deadlock;
      
    /**
     * 描述:     转账时候遇到死锁,一旦打开注释,便会发生死锁
     */
    public class TransferMoney implements Runnable {
      
        int flag = 1;
        static Account a = new Account(500);
        static Account b = new Account(500);
        static Object lock = new Object();
      
        public static void main(String[] args) throws InterruptedException {
            TransferMoney r1 = new TransferMoney();
            TransferMoney r2 = new TransferMoney();
            r1.flag = 1;
            r2.flag = 0;
            Thread t1 = new Thread(r1);
            Thread t2 = new Thread(r2);
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println("a的余额" + a.balance);
            System.out.println("b的余额" + b.balance);
        }
      
        @Override
        public void run() {
            if (flag == 1) {
                transferMoney(a, b, 200);
            }
            if (flag == 0) {
                transferMoney(b, a, 200);
            }
        }
      
        public static void transferMoney(Account from, Account to, int amount) {
            class Helper {
      
                public void transfer() {
                    if (from.balance - amount < 0) {
                        System.out.println("余额不足,转账失败。");
                        return;
                    }
                    from.balance -= amount;
                    to.balance = to.balance + amount;
                    System.out.println("成功转账" + amount + "元");
                }
            }
            //计算哈希值,按照哈希值的大小来获取锁
            //实际开发中使用主键更加方便,因为主键是自增的不重复的
            int fromHash = System.identityHashCode(from);
            int toHash = System.identityHashCode(to);
            if (fromHash < toHash) {
                synchronized (from) {
                    synchronized (to) {
                        new Helper().transfer();
                    }
                }
            }
            else if (fromHash > toHash) {
                synchronized (to) {
                    synchronized (from) {
                        new Helper().transfer();
                    }
                }
            }else  {
              //避免出现哈希冲突时发生死锁
                synchronized (lock) {
                    synchronized (to) {
                        synchronized (from) {
                            new Helper().transfer();
                        }
                    }
                }
            }
      
        }
        static class Account {
      
          public Account(int balance) {
              this.balance = balance;
          }
      
          int balance;
      
      }
        }
    

死锁


  package deadlock;


  /**
   * 描述:     演示哲学家就餐问题导致的死锁(一个圆桌,五个人,五根筷子)
   */
  public class DiningPhilosophers {

      public static class Philosopher implements Runnable {

          //左筷子和右筷子
          private Object leftChopstick;
          private Object rightChopstick;

          public Philosopher(Object leftChopstick, Object rightChopstick) {
              this.leftChopstick = leftChopstick;
              this.rightChopstick = rightChopstick;
          }

          

          @Override
          public void run() {
              try {
                  while (true) {
                    //先思考
                      doAction("Thinking");
                    //获取左筷子
                      synchronized (leftChopstick) {
                          doAction("Picked up left chopstick");
                        //获取右筷子
                          synchronized (rightChopstick) {
                              doAction("Picked up right chopstick - eating");
                            //吃完后放下右筷子
                              doAction("Put down right chopstick");
                          }
                        //放下左筷子
                          doAction("Put down left chopstick");
                      }
                  }
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }

          private void doAction(String action) throws InterruptedException {
              System.out.println(Thread.currentThread().getName() + " " + action);
              Thread.sleep((long) (Math.random() * 10));
          }
      }

      public static void main(String[] args) {
        //新建五个哲学家
          Philosopher[] philosophers = new Philosopher[5];
        //新建筷子,等于哲学家人数
          Object[] chopsticks = new Object[philosophers.length];
          for (int i = 0; i < chopsticks.length; i++) {
              chopsticks[i] = new Object();
          }
          for (int i = 0; i < philosophers.length; i++) {
              Object leftChopstick = chopsticks[i];
              Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
            //避免策略,改变获取筷子的顺序
              if (i == philosophers.length - 1) {
                  philosophers[i] = new Philosopher(rightChopstick, leftChopstick);
              } else {
                  philosophers[i] = new Philosopher(leftChopstick, rightChopstick);
              }
              new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start();
          }
      }
  }

  • 检测与恢复策略:一段时间检测是否有死锁,如果有就剥夺某一个资源,来打开死锁
  • 鸵鸟策略:如果我们发生死锁的概率极其低,那么我们就直接忽略他,直到死锁发生的时候,再人工修复

避免死锁

  • 设置超时时间
package deadlock;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 描述:     用tryLock来避免死锁
 */
public class TryLockDeadlock implements Runnable {

    int flag = 1;
    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();

    public static void main(String[] args) {
        TryLockDeadlock r1 = new TryLockDeadlock();
        TryLockDeadlock r2 = new TryLockDeadlock();
        r1.flag = 1;
        r2.flag = 0;
        new Thread(r1).start();
        new Thread(r2).start();
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            if (flag == 1) {
                try {
                    if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
                        System.out.println("线程1获取到了锁1");
                        Thread.sleep(new Random().nextInt(1000));
                        if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) {
                            System.out.println("线程1获取到了锁2");
                            System.out.println("线程1成功获取到了两把锁");
                            lock2.unlock();
                            lock1.unlock();
                            break;
                        } else {
                            System.out.println("线程1尝试获取锁2失败,已重试");
                            lock1.unlock();
                            Thread.sleep(new Random().nextInt(1000));
                        }
                    } else {
                        System.out.println("线程1获取锁1失败,已重试");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (flag == 0) {
                try {
                    if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) {
                        System.out.println("线程2获取到了锁2");

                        Thread.sleep(new Random().nextInt(1000));
                        if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) {
                            System.out.println("线程2获取到了锁1");
                            System.out.println("线程2成功获取到了两把锁");
                            lock1.unlock();
                            lock2.unlock();
                            break;
                        } else {
                            System.out.println("线程2尝试获取锁1失败,已重试");
                            lock2.unlock();
                            Thread.sleep(new Random().nextInt(1000));
                        }
                    } else {
                        System.out.println("线程2获取锁2失败,已重试");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  • 多使用并发类,而不是自己设计锁
  • 尽量降低锁的使用粒度:用不同的锁而不是一个锁
  • 如果能使用同步代码块,就不使用同步方法:自己指定锁对象
  • 给你的线程起一个有意义的名字:debug和排查时事半功倍,框架和JDK都遵循这个最佳实践
  • 避免锁的嵌套:例如MustDeadLock类
  • 分配资源前先看能不能收回来
  • 尽量不要几个功能用同一把锁:专锁专用

活性故障

活锁
  • 虽然线程并没有阻塞,也始终在运行(所以叫做“活”锁,线程是“活”的),但是程序却得不到进展,因为线程始终重复做同样的事。
package deadlock;

import java.util.Random;
import jdk.management.resource.internal.inst.RandomAccessFileRMHooks;

/**
 * 描述:     演示活锁问题
 */
public class LiveLock {

    static class Spoon {

        private Diner owner;

        public Spoon(Diner owner) {
            this.owner = owner;
        }

        public Diner getOwner() {
            return owner;
        }

        public void setOwner(Diner owner) {
            this.owner = owner;
        }

        public synchronized void use() {
            System.out.printf("%s吃完了!", owner.name);


        }
    }

    static class Diner {

        private String name;
        private boolean isHungry;

        public Diner(String name) {
            this.name = name;
            isHungry = true;
        }

        public void eatWith(Spoon spoon, Diner spouse) {
            while (isHungry) {
                if (spoon.owner != this) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }
              //解决活锁
                Random random = new Random();
                if (spouse.isHungry && random.nextInt(10) < 9) {
                    System.out.println(name + ": 亲爱的" + spouse.name + "你先吃吧");
                    spoon.setOwner(spouse);
                    continue;
                }

                spoon.use();
                isHungry = false;
                System.out.println(name + ": 我吃完了");
                spoon.setOwner(spouse);

            }
        }
    }


    public static void main(String[] args) {
        Diner husband = new Diner("牛郎");
        Diner wife = new Diner("织女");

        Spoon spoon = new Spoon(husband);

        new Thread(new Runnable() {
            @Override
            public void run() {
                husband.eatWith(spoon, wife);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                wife.eatWith(spoon, husband);
            }
        }).start();
    }
}

饥饿
  • 当线程需要某些资源(例如CPU),但是却始终得不到
  • 线程的优先级设置的过于低,或者有某线程持有锁同时又无限循环而不释放锁,或者某程序始终占用某文件的写锁。
  • 饥饿可能会导致响应性能变差

脑图

线程核心基础

内存模型

死锁

体系

并发工具类

避坑必看

1清楚锁和保护对象的层面

  • 静态字段属于类,类级别的锁才能保护;而非静态字段属于类实例,实例级别的锁就可以保护。

  • 在类 Data 中定义了一个静态的 int 字段 counter 和一个非静态的 wrong 方法,实现 counter 字段的累加操作。

  • class Data {
      @Getter
      private static int counter = 0;
      public static int reset() {
        counter = 0;
        return counter;
      }
      public synchronized void wrong() {
        counter++;
      }
    }
    
  • @RestController
    @RequestMapping("lockscope")
    @Slf4j
    public class LockScopeController {
      
        @GetMapping("wrong")
        public int wrong(@RequestParam(value = "count", defaultValue = "1000000") int count) {
            Data.reset();
            IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().wrong());
            return Data.getCounter();
        }
      
        @GetMapping("right")
        public int right(@RequestParam(value = "count", defaultValue = "1000000") int count) {
            Data.reset();
            IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().right());
            return Data.getCounter();
        }
    }
    
  • 因为默认运行 100 万次,所以执行后应该输出 100 万,但页面输出的是 639242。在非静态的 wrong 方法上加锁,只能确保多个线程无法执行同一个实例的 wrong 方法,却不能保证不会执行不同实例的 wrong 方法。而静态的 counter 在多个实例中共享,所以必然会出现线程安全问题。

  • 修正方法就很清晰了:同样在类中定义一个 Object 类型的静态字段,在操作counter 之前对这个字段加锁。

  • class Data {
      @Getter
      private static int counter = 0;
      private static Object locker = new Object();
      public void right() {
        synchronized (locker) {
          counter++;
        }
      }
    }
    
  • 把 wrong 方法定义为静态不就可以了,这个时候锁是类级别的。可以是可以,但我们不可能为了解决线程安全问题改变代码结构,把实例方法改为静态方法。

2加锁要考虑锁的场景和粒度

  • 在方法上加 synchronized 关键字实现加锁确实简单,也因此我曾看到一些业务代码中几乎所有方法都加了 synchronized,但这种滥用 synchronized 的做法:

    • 一是,没必要。通常情况下 60% 的业务代码是三层架构,数据经过无状态的Controller、Service、Repository 流转到数据库,没必要使用 synchronized 来保护什么数据。
    • 二是,可能会极大地降低性能。使用 Spring 框架时,默认情况下 Controller、Service、Repository 是单例的,加上 synchronized 会导致整个程序几乎就只能支持单线程,造成极大的性能问题。
  • 即使我们确实有一些共享资源需要保护,也要尽可能降低锁的粒度,仅对必要的代码块甚至是需要保护的资源本身加锁。

  • 比如,在业务代码中,有一个 ArrayList 因为会被多个线程操作而需要保护,又有一段比较耗时的操作(代码中的 slow 方法)不涉及线程安全问题,应该如何加锁呢?

  • 错误的做法是,给整段业务逻辑加锁,把 slow 方法和操作 ArrayList 的代码同时纳入synchronized 代码块;更合适的做法是,把加锁的粒度降到最低,只在操作 ArrayList 的时候给这个 ArrayList 加锁。

  • @RestController
    @RequestMapping("lockgranularity")
    @Slf4j
    public class LockGranularityController {
      
        private List<Integer> data = new ArrayList<>();
      
        //不涉及共享资源的慢方法
        private void slow() {
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
            }
        }
      
        //错误的加锁方法
        @GetMapping("wrong")
        public int wrong() {
            long begin = System.currentTimeMillis();
            IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
                //加锁粒度太粗了
                synchronized (this) {
                    slow();
                    data.add(i);
                }
            });
            log.info("took:{}", System.currentTimeMillis() - begin);
            return data.size();
        }
      
        //正确的加锁方法
        @GetMapping("right")
        public int right() {
            long begin = System.currentTimeMillis();
            IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
                slow();
                //只对List加锁
                synchronized (data) {
                    data.add(i);
                }
            });
            log.info("took:{}", System.currentTimeMillis() - begin);
            return data.size();
        }
      
    }
    
  • 执行这段代码,同样是 1000 次业务操作,正确加锁的版本耗时 1.4 秒,而对整个业务逻辑加锁的话耗时 11 秒。

  • 如果精细化考虑了锁应用范围后,性能还无法满足需求的话,我们就要考虑另一个维度的粒度问题了,即:区分读写场景以及资源的访问冲突,考虑使用悲观方式的锁还是乐观方式的锁。

  • 对于读写比例差异明显的场景,考虑使用 ReentrantReadWriteLock 细化区分读写锁,来提高性能。

  • 如果你的 JDK 版本高于 1.8、共享资源的冲突概率也没那么大的话,考虑使用StampedLock 的乐观读的特性,进一步提高性能。

  • JDK 里 ReentrantLock 和 ReentrantReadWriteLock 都提供了公平锁的版本,在没有明确需求的情况下不要轻易开启公平锁特性,在任务很轻的情况下开启公平锁可能会让性能下降上百倍。

3小心死锁问题

  • 首先,定义一个商品类型,包含商品名、库存剩余和商品的库存锁三个属性,每一种商品默认库存 1000 个;然后,初始化 10 个这样的商品对象来模拟商品清单

  • @Data
    @RequiredArgsConstructor
    static class Item {
      final String name; //商品名
      int remaining = 1000; //库存剩余
      @ToString.Exclude //ToString不包含这个字段
      ReentrantLock lock = new ReentrantLock();
    }
    
  • 创建购物车,写一个方法模拟在购物车进行商品选购,每次从商品清单(items 字段)中随机选购三个商品(为了逻辑简单,我们不考虑每次选购多个同类商品的逻辑,购物车中不体现商品数量)

  • 创建订单,先声明一个 List 来保存所有获得的锁,然后遍历购物车中的商品依次尝试获得商品的锁,最长等待 10 秒,获得全部锁之后再扣减库存;如果有无法获得锁的情况则解锁之前获得的所有锁,返回 false 下单失败。

  • @RestController
    @RequestMapping("deadlock")
    @Slf4j
    public class DeadLockController {
      
        private ConcurrentHashMap<String, Item> items = new ConcurrentHashMap<>();
      
        public DeadLockController() {
            IntStream.range(0, 10).forEach(i -> items.put("item" + i, new Item("item" + i)));
        }
      
        //创建订单
        private boolean createOrder(List<Item> order) {
            //存放所有获得的锁
            List<ReentrantLock> locks = new ArrayList<>();
      
            for (Item item : order) {
                try {
                    //获得锁10秒超时
                    if (item.lock.tryLock(10, TimeUnit.SECONDS)) {
                        locks.add(item.lock);
                    } else {
                        locks.forEach(ReentrantLock::unlock);
                        return false;
                    }
                } catch (InterruptedException e) {
                }
            }
            //锁全部拿到之后执行扣减库存业务逻辑
            try {
                order.forEach(item -> item.remaining--);
            } finally {
                locks.forEach(ReentrantLock::unlock);
            }
            return true;
        }
      
        //创建购物车
        private List<Item> createCart() {
            return IntStream.rangeClosed(1, 3)
                    .mapToObj(i -> "item" + ThreadLocalRandom.current().nextInt(items.size()))
                    .map(name -> items.get(name)).collect(Collectors.toList());
        }
      
        //失败案例,会发生死锁
        /*
        假设一个购物车中的商品是 item1 和 item2,另一个购物车中的商品是 item2 和 item1,一个线程先获取到了item1 的锁,同时另一个线程获取到了 item2 的锁,然后两个线程接下来要分别获取item2 和 item1 的锁,这个时候锁已经被对方获取了,只能相互等待一直到 10 秒超时。
        */
        @GetMapping("wrong")
        public long wrong() {
            long begin = System.currentTimeMillis();
            //并发进行100次下单操作,统计成功次数
            long success = IntStream.rangeClosed(1, 100).parallel()
                    .mapToObj(i -> {
                        List<Item> cart = createCart();
                        return createOrder(cart);
                    })
                    .filter(result -> result)
                    .count();
            log.info("success:{} totalRemaining:{} took:{}ms items:{}",
                    success,
                    items.entrySet().stream().map(item -> item.getValue().remaining).reduce(0, Integer::sum),
                    System.currentTimeMillis() - begin, items);
            return success;
        }
      
        //成功案例
        /*
        为购物车中的商品排一下序,让所有的线程一定是先获取item1 的锁然后获取 item2 的锁,就不会有问题了。
        测试一下 right 方法,不管执行多少次都是 100 次成功下单,而且性能相当高,达到了3000 以上的 TPS
        */
        @GetMapping("right")
        public long right() {
            long begin = System.currentTimeMillis();
            long success = IntStream.rangeClosed(1, 100).parallel()
                    .mapToObj(i -> {
                        List<Item> cart = createCart().stream()
                                .sorted(Comparator.comparing(Item::getName))
                                .collect(Collectors.toList());
                        return createOrder(cart);
                    })
                    .filter(result -> result)
                    .count();
            log.info("success:{} totalRemaining:{} took:{}ms items:{}",
                    success,
                    items.entrySet().stream().map(item -> item.getValue().remaining).reduce(0, Integer::sum),
                    System.currentTimeMillis() - begin, items);
            return success;
        }
      
        //实体类
        @Data
        @RequiredArgsConstructor
        static class Item {
            final String name;
            int remaining = 1000;
            @ToString.Exclude
            ReentrantLock lock = new ReentrantLock();
        }
    }
    
  • 如果业务逻辑中锁的实现比较复杂的话,要仔细看看加锁和释放是否配对,是否有遗漏释放或重复释放的可能性;并且要考虑锁自动超时释放了,而业务逻辑却还在进行的情况下,如果别的线程或进程拿到了相同的锁,可能会导致重复执行。

下一篇 尚硅谷JUC

Comments

Content