notes notes
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
  • 并发编程

    • 并发编程
      • 并发工具类
      • 线程池
        • 线程池的创建
        • 线程添加规则
        • 增减线程的特点
        • newFixedThreadPool
        • newSingleThreadExecutor
        • newCachedThreadPool
        • newScheduledThreadPool
        • 如何设置线程数
        • 线程池的关闭
        • 拒绝策略
        • 钩子方法
        • 线程池结构体系
        • 线程池状态
        • execute源码
      • ThreadLocal
        • 使用场景
        • 主要方法
        • 注意点
      • Lock
        • 主要方法
        • 锁的分类
      • 原子类
        • AtomicInteger
        • AtomicReference
        • AtomicIntegerFieldUpdater
        • LongAdder
        • LongAccumulator
      • CAS
        • 在Java中的实现
        • 缺点
      • final
        • 修饰变量
        • 修饰方法
        • 修饰类
      • 并发容器
        • 古老的并发容器
        • ConcurrentHashMap
        • HashMap缺点
        • 源码分析
        • put
        • get
        • 正确使用ConcurrentHashMap
        • CopyOnWriteArrayList
        • 读写规则
        • 概要
        • 源码分析
        • add
        • get
        • BlockingQueue
        • ArrayBlockingQueue
        • LinkedBlockingQueue
        • PriorityBlockingQueue
        • SynchronousQueue
        • DelayQueue
        • ConcurrentLinkedQueue
      • 并发流程控制
        • Semaphore
        • CyclicBarrier
        • CountDownLatch
        • Condition
      • AQS
        • 是什么
        • 核心部分
        • state
        • 控制线程抢锁和配合的FIFO队列
        • 希望协作工具类去实现获取/释放等重要方法
        • ASQ应用实例
        • CountDownLatch
        • 构造函数
        • getCount
        • countDown
        • await
        • 总结
        • Semaphore
        • acquire
        • 总结
        • ReentrantLock
        • tryRelease
        • acquire
        • 总结
        • 手写一个
      • Future
    • 多线程
    • 高级篇
  • 设计模式

    • 设计模式
  • 网络编程

    • Netty

      • NIO基础
      • Netty入门
      • Netty进阶
      • 优化与源码
  • 源码篇

    • 环境搭建
    • Spring
  • 云原生

    • Kubernetes
    • Helm
  • ElasticSearch

    • ElasticSearch
  • Java 虚拟机

    • 深入拆解 Java 虚拟机
    • JVM与GC调优
  • MQ

    • RabbitMQ

      • RabbitMQ笔记
      • RabbitMQ集群搭建文档
  • Redis

    • Redis进阶
  • ShardingSphere

    • Sharding-JDBC
  • SpringCloud

    • SpringCloud
  • ZooKeeper

    • ZooKeeper
  • 学习笔记
  • 并发编程
starry
2023-08-03
目录

并发编程

# 并发工具类

分类

  1. 为了并发安全:互斥同步、非互斥同步、无同步方案
  2. 管理线程、提高效率
  3. 线程协作

# 线程池

# 线程池的创建

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数

  • corePoolSize:核心线程数 线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待任务到来时再创建新线程去执行任务。一般情况下线程池中的数量不会小于核心线程数,就算没有任务了,这些线程也不会被销毁
  • maximumPoolSize:最大线程数 当核心线程处理不过来,就需要额外新创建一些线程,但是这些新增的线程是有上限的,就是这个maximumPoolSize
  • keepAliveTime:保持存活时间 如果线程池当前的线程数量多于corePoolSize,那么多于的线程空闲时间超过keepAliveTime,它们就会被终止
  • unit:时间单位
  • workQueue:任务存储队列 3种常见的队列类型
    • 直接队列:SynchronousQueue
    • 无界队列:LinkedBlockingQueue
    • 有界队列:ArrayBlockingQueue
  • threadFactory:生成新线程的工厂 新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否为守护线程等
  • handler:拒绝策略

# 线程添加规则

  1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
  2. 如果线程数大于等于corePoolSize但小于maximumPoolSize,则将任务放入队列
  3. 如果队列已满,并且线程数小于maximumPoolSize,则创建一个新线程来运行任务
  4. 如果队列已满,并且线程数大于或等于maximumPoolSize,则拒绝该任务

# 增减线程的特点

  • 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池
  • 线程池希望保持较少的线程数(corePoolSize),并且只有在负载(workQueue)变得很大时才增加它(maximumPoolSize)
  • 通过设置maximumPoolSize为很高的值,比如Integer.MAX_SIZE,可以允许线程池容纳任意数量的并发任务
  • 只有在队列填充满时才创建多于corePoolSize的线程,所以如果我们使用无界队列(LinkedBlockingQueue),那么线程数量就不会超过corePoolSize

# newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

创建一个corePoolSize和maximumPoolSize相同的线程池,就是固定大小的线程池

额外存活时间为0ms,因为固定大小不存在额外线程,这个没有意义

工作队列为LinkedBlockingQueue无界队列,当线程达到上限后,直接放入工作队列,但是这个工作队列无上限,一直放,直到抛出异常。

# newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

创建一个corePoolSize和maximumPoolSize相同的线程池,都是1,就相当于单线程

和上面的类似,只是这个线程数是1,还是有同样的问题,当请求处理不过来,请求对接,占用大量内存,最终OOM

# newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

核心线程为0,最大线程为Integer.MAX_VALUE,存活时间60s,直接队列

来一个任务创建一个任务,也可能造成OOM

# newScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}


public static void main(String[] args) {
    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(20);
    // 3s执行一次
    threadPool.schedule(new Task(), 3, TimeUnit.SECONDS);
    // 最开始1s执行一次,执行完后3s执行一次
    threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
}

可以指定核心线程数,最大线程数还是Integer.MAX_VALUE

是个定时队列,可以定期执行任务

可以看到Executors的4个创建线程池的方法,都不怎么好用,所以我们需要自己创建线程池。

# 如何设置线程数

  • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1~2倍左右
  • IO密集型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控线上繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法 线程数 = CPU核心数*(1 + 平均等待时间 / 平均工作时间) 假设cpu为8核,读写数据库需要等待1000ms,而业务代码执行只需要10ms,所以线程数就是 8*(1+1000/10)=808

# 线程池的关闭

  • shutdown 使用后不是立即关闭线程池,而是等所有的线程都执行完才进行关闭,如果后续还有任务添加,会直接拒绝
  • isShutdown 判断线程是否进入停止状态(上面的状态)
  • isTerminated 判断线程池是否完全停止了(任务都执行完了)
  • awaitTermination (检测)在指定的时间内线程池是否完全关闭
  • shutdownNow 使用interrupt尝试中断真正线程,并返回在队列中等待的线程

# 拒绝策略

当线程关闭后,再添加任务,会直接拒绝

当线程数达到最大线程数,且工作队列也满了的话,会使用指定的拒绝策略拒绝任务

4种拒绝策略

  • AbortPolicy 直接抛出异常
  • DiscardPolicy 直接丢弃提交的任务,不会抛出异常
  • DiscardOldestPolicy 丢弃队列中最老的任务
  • CallerRunsPolicy 直接让传入任务的线程执行

# 钩子方法

在每个线程执行前后都能进行额外操作

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

使用钩子方法实现线程池暂停和恢复

package com.starry.concurrent;

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author starry
 * @version 1.0
 * @date 2021/12/30 20:56
 * @Description 可暂停的线程池
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private boolean isPaused;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    /**
     * 加锁 暂停
     */
    public void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 加锁 恢复
     */
    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 每个任务线程执行前都会执行此方法
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        lock.lock();
        try {
            // 如果是暂停当前线程就等待
            while (isPaused) {
                condition.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        super.beforeExecute(t, r);
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool threadPool = new PauseableThreadPool(1, 2, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));
        for (int i = 0; i < 1000; i++) {
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(2000);

        // 暂停线程池
        threadPool.pause();
        System.out.println("thread paused");
        // 恢复线程池
        Thread.sleep(5000);
        threadPool.resume();
        System.out.println("thread resumed");


    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

}

# 线程池结构体系

Executor是个接口,只有一个execute()方法

ExecutorService实现了Executor接口,并且提供了关闭方法

AbstractExecutorService实现了ExecutorService接口,提供了默认方法实现,是个抽象方法

ThreadPoolExecutor继承AbstractExecutorService,提供具体实现,以及额外方法

Executors和上面的类或接口,没有直接实现或继承关系,是个工具类,只是帮我创建线程的。

# 线程池状态

  • RUNNING 接收新任务,并且处理排队任务
  • SHUTDOWN 不接收新任务,但处理排队任务
  • STOP 不接受新任务,也不处理排队任务,并中断真正进行的任务
  • TIDYING 整洁的意思;所有任务都已终止,workerCount为零时,线程会转到TIDYING状态,并运行terminate()钩子方法
  • TERMINATED terminate()允许完成

# execute源码

public void execute(Runnable command) {
	// command就是我们传入要执行的任务
    if (command == null)
        throw new NullPointerException();
    // 获取正在执行的任务数量
    int c = ctl.get();
    // 如果数量小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 就新增一个工作线程去执行我们传入的任务,true为核心线程
        if (addWorker(command, true))
            return;
        // 再获取一下线程运行数量
        c = ctl.get();
    }
    
    /*任务数量大于等于核心数量*/
    
    // 如果线程池是运行状态,就添加任务到工作队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 再检测一遍线程数量,因为此时可能线程被终止了
        int recheck = ctl.get();
        // 如果是终止状态,就把当前任务从工作队列移除
        if (! isRunning(recheck) && remove(command))
            // 并拒绝任务
            reject(command);
        // 如果线程数量为0(可能抛出异常,导致线程数量减少),也要创建一个新的工作线程,false不是核心线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    /*大于等于核心线程,并且工作队列也满了*/
    
    // 创建非核心线程去执行任务,如果创建失败就拒绝
    else if (!addWorker(command, false))
        reject(command);
}

# ThreadLocal

# 使用场景

主要是下面两大使用场景

  • 每个线程需要有独享的对象(通常是工具类,比如SimpleDateFormat和Random)
/**
 * @author starry
 * @version 1.0
 * @date 2022/1/1 16:10
 * @Description 使用ThreadLocal保证线程安全
 */
public class ThreadLocalTest {

    public static void main(String[] args) {
        // 只有100个SimpleDateFormat对象,而不是每个线程都创建一个对象
        ExecutorService threadPool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            threadPool.execute(() -> {
                String s = printTime(finalI);
                System.out.println(s);
            });
        }
        threadPool.shutdown();

    }

    public static String printTime(int seconds) {
        Date date = new Date(1000L * seconds);
        // 每个线程持有不同的SimpleDateFormat
        SimpleDateFormat formatThreadLocal = ThreadSafeFormatter.dateFormatThreadLocal.get();
        return formatThreadLocal.format(date);
    }
}


class ThreadSafeFormatter{
    public static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
  • 每个线程内需要保存全局变量(例如在拦截器中获取用户信息),可以让不用方法直接使用,避免层层传参的麻烦
/**
 * @author starry
 * @version 1.0
 * @date 2022/1/1 16:29
 * @Description 使用ThreadLocal避免层次传参,同个线程,变量共享
 */
public class ThreadLocalTest02 {

    public static void main(String[] args) {
        new Service1().fun1();
    }


}


class User{
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

class UserContextHolder{
    public static ThreadLocal<User> holder = new ThreadLocal<>();
}

class Service1{
    public void fun1(){
        User user = new User();
        user.setName("starry");
        UserContextHolder.holder.set(user);
        // 服务1执行完,调用服务2
        new Service2().fun2();
    }
}

class Service2{
    public void fun2(){
        User user = UserContextHolder.holder.get();
        System.out.println("fun2"+user.getName());
        // 服务2执行完,调用服务3
        new Service3().fun3();
    }
}


class Service3{
    public void fun3(){
        ThreadLocal<User> holder = UserContextHolder.holder;
        System.out.println("fun3"+holder.get().getName());
        holder.remove();
    }
}

# 主要方法

initialValue() 初始化

  • initialValue() 返回当前线程的初始值,这是一个延迟加载的方法,只有在调用get()的时候,才会触发
  • 线程第一次调用get()方法访问变量时,调用此方法,触发线程向前调用了set()方法,在这种情况下,不会为线程调用initialValue()方法
  • 通常,每个线程最多只调用一次此方法,但是如果已经调用了remove()后,再调用get(),就可以再次调用次方法
  • 如果不重写此方法,这个方法会返回null。一般使用匿名内部类的方法来重写initialValue()方法,以便再后续使用中可以初始化副本对象

set() 为这个线程设置一个新值

get() 得到这个线程对应的value

remove() 删除对应这个线程的值

get()

取出当前线程的ThreadLocalMap,然后调用map.getEntr()方法,把本ThreadLocal的引用作为参数传入,取出map中属于ThreadLocal的value

这里的key和value都是保存在线程(ThreadLocalMap)中的,而不是保存在ThreadLocal中

public T get() {
    // 获取当前线程
    Thread t = Thread.currentThread();
    // 获取当前线程的ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    // 如果ThreadLocalMap不为空
    if (map != null) {
        /*
        * 获取当前线程中的ThreadLocalMap对象,相当于map,里面存放多个ThreadLocal
        * 传入ThreadLocal,获取对应的Entry
        * 再获取Entry的value,即真正存放的值
        */
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 如果为空就set初始化的值,懒加载
    return setInitialValue();
}

// 获取当前线程的ThreadLocalMap对象
ThreadLocalMap getMap(Thread t) {
    // ThreadLocal.ThreadLocalMap threadLocals = null;
    return t.threadLocals;
}

// 设置初始化值
private T setInitialValue() {
    // 返回初始值
    T value = initialValue();
    // 获取当前线程
    Thread t = Thread.currentThread();
    // 获取当前线程的ThreadLocalMap对象
    ThreadLocalMap map = getMap(t);
    // 如果ThreadLocalMap不为空,就设置一个ThreadLocal对象,key就是当前的ThreadLocal,value就是初始值
    if (map != null)
        map.set(this, value);
    else
        // 如果为空(默认为null),就创建一个ThreadLocalMap,再去设置一个ThreadLocal对象,key就是当前的ThreadLocal,value就是初始值
        createMap(t, value);
    // 最后返回默认值
    return value;
}

// 返回初始值,默认为null,可进行重写
protected T initialValue() {
    return null;
}

set()

public void set(T value) {
    // 获取当前线程
    Thread t = Thread.currentThread();
    // 获取当前线程的ThreadLocalMap对象
    ThreadLocalMap map = getMap(t);
    // 不为空就设置
    if (map != null)
        map.set(this, value);
    else
        // 为空就创建 再设置
        createMap(t, value);
}

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

remove()

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        // 根据key去删除Entry对象
        m.remove(this);
}

# 注意点

  • 内存泄漏
  • static对象,共享
  • 尽量使用框架提供的ThreadLocal对象,防止忘记remove 比如:DateTimeContextHolader、RequestContextHolder

内存泄漏:某个对象不再使用,但是内存却不能回收

// Entry 继承 弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        // 将k置为弱引用
        super(k);
        // 但是v还是强引用
        value = v;
    }
}

弱引用,如果这个对象只被弱引用关联(没有任何强引用关联),那么这个对象就可以被回收

  • ThreadLocalMap的每个Entry都是一个key的弱引用,同时,每个Entry都包含一个对value的强引用
  • 正常情况下,当线程终止,保存在ThreadLocalMap里的value会被垃圾回收,因为没有任何强引用了
  • 但是,如果线程不终止(比如线程池的线程重复使用),那么key对应的value就不能被回收 Thread->ThreadLocalMap->Entry(key 为 null)->value有值
    • 因为value和Thread之间还存在强引用,所以导致value无法回收,可能最终导致OOM
    • 作者已经考虑了这个问题,所以在set、remove、rehash返回中会扫描key为null的Entry,并把对应的value设置为null,这样value就可以被回收了
    • 但是如果一个ThreadLocal不被使用,同时线程又不停止,那么调用链就一直存在,就导致了value的泄漏
    • 调用remove方法,就会删除对应的Entry对象,可以避免内存泄漏,所以使用完ThreadLocal后,应该调用remove方法

# Lock

有synchronized为什么还要lock?

  • 效率低:锁的释放情况少,试图获得锁时不能超时,不能中断一个正在试图获得锁的线程
  • 不够灵活:加锁和释放的时机单一,每个锁仅有单一的条件(某个对象),可能是不够的
  • 无法知道是否成功获取到锁

# 主要方法

lock()

  • lock()就是最普通的获取锁。如果锁已经被其他线程获取,则进行等待
  • Lock不会像synchronized一样在异常时自动释放锁
  • 因此,我们需要在finally块中释放锁,以保证发生异常时锁的释放
  • lock()方法不能被中断,一旦陷入死锁,lock()就会永久等待

tryLock()

  • tryLock()用来尝试获取锁,如果当前锁没有被其他线程占用,则获取锁成功,返回true,否则返回false,获取锁失败
  • 相比于lock(),我们可以根据是否能获取到锁来决定后续程序的行为
  • 该方法会立即返回,即便在拿不到锁时也不会一直等待

tryLock(long time, TimeUnit unit) 超时就放弃

lockInterruptibly() 相当于tryLock(long time,TimeUnit unit)把超时时间设置为无限。在等待锁的过程中,线程可以被中断

unlock() 解锁

可见性保证

拥有_happens-before_原则

lock的加解锁和synchronized有同样的内存语义,也就是说,下一个线程加锁后可以看到所有之前线程解锁前发生的所有操作

# 锁的分类

线程要不要锁住同步资源 悲观锁 乐观锁
多个线程能否共享一把锁 共享锁 独占锁
多线程竞争时,是否需要排队 公平锁 非公平锁
同一个线程是否可以重复获得同一把锁 可重入锁 不可重入锁
是否可中断 可中断锁 不可中断锁
等待锁的过程 自旋锁 非自旋锁

ReentrantReadWriteLock

读锁之间不互斥,多个线程可以同时持有读锁

读写之间互斥,如果一个线程持有读锁,其他线程想要申请写锁,必须等读锁释放,只能有一个线程拥有写锁。

如果是公平锁,获取读写锁的顺序和等待顺序一致

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    // 获取写锁是否阻塞
    final boolean writerShouldBlock() {
        // 如果队列前面还有线程要获取锁就阻塞
        return hasQueuedPredecessors();
    }
    // 获取写锁是否阻塞
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

如果是非公平锁

  • 写锁可以随时插队
  • 读锁只有在等待队列的第一个获取的锁的不是写锁时才能插队
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    // 写锁是否阻塞,返回false
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    // 读锁是否阻塞
    final boolean readerShouldBlock() {
        /* As a heuristic to avoid indefinite writer starvation,
         * block if the thread that momentarily appears to be head
         * of queue, if one exists, is a waiting writer.  This is
         * only a probabilistic effect since a new reader will not
         * block if there is a waiting writer behind other enabled
         * readers that have not yet drained from the queue.
         */
        // 判断队列的第一个要获取的锁是不是排他锁(写锁),是就进行阻塞(排队),否则就插队
        return apparentlyFirstQueuedIsExclusive();
    }
}

支持锁的降级(写锁至读锁),不支持升级(防止死锁)

# 原子类

六类原子类:

Atomic*基本类型原子类 AtomicInteger

AtomicLong AtomicBoolean | | AtomicArrray数组类型原子类 | AtomicIntegerArray AtomicLongArray AtomicReferenceArray | | AtomicReference引用类型原子类 | AtomicReference AtomicStampedReference AtomicMarkableReference | | Atomic*FieldUpdater升级类型原子类 | AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater | | Adder累加器 | LongAdder DoubleAdder | | Accumulator累加器 | LongAccumulator DoubleAccumulator |

# AtomicInteger

AtomicInteger常用方法

  • public final int get() 获取当前值
  • public final int getAndSet(int newValue) 获取当前值,并设置新值
  • public final int getAndIncrement() 获取当前值,并自增1
  • public final int getAndDecrement() 获取当前值,并自减1
  • public final int getAndAdd(int delta) 获取当前值,并加上指定的值
  • public final boolean compareAndSet(int expect, int update) 如果当前的数组等于预期(expect)的值,才以原子方式将该值设置为输入值(update)

# AtomicReference

使用AtomicReference实现自旋锁

public class AtomicReferenceTest {

    /**
     * 标志位 标志哪个线程拿到锁
     */
    private AtomicReference<Thread> sign = new AtomicReference<>();

    public void lock() {
        Thread thread = Thread.currentThread();
        // 只有标志为null时才加锁成功,并且设置标志为当前线程
        while (!sign.compareAndSet(null,thread)){
            try {
                System.out.println("获取锁失败,睡眠500ms再试");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void unlock() {
        Thread thread = Thread.currentThread();
        // 只有标志是当前线程,才解锁成功,并且设置标志为null
        sign.compareAndSet(thread, null);
    }

    public static void main(String[] args) {
        AtomicReferenceTest lock = new AtomicReferenceTest();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName()+"获取到自旋锁");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        };
        new Thread(runnable).start();
        new Thread(runnable).start();
    }
}

# AtomicIntegerFieldUpdater

变量升级为原子类型

升级是基于反射的,要注意

变量必须被volatile修改

变量不能被private修饰

变量不能被static修饰

package com.starry.concurrent.atomictest;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
 * @author starry
 * @version 1.0
 * @date 2022/1/2 19:12
 * @Description
 */
public class AtomicIntegerFieldUpdaterTest implements Runnable{

    static User u1;
    static User u2;

    static AtomicIntegerFieldUpdater<User> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class,"score");

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            u1.score++;
            fieldUpdater.getAndIncrement(u2);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        u1 = new User();
        u2 = new User();
        AtomicIntegerFieldUpdaterTest r = new AtomicIntegerFieldUpdaterTest();
        Thread thread1 = new Thread(r);
        Thread thread2 = new Thread(r);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(u1.score);
        System.out.println(u2.score);

    }

    public static class User {
        volatile int score;
    }
}

# LongAdder

AtomicLong由于每次操作,都要flush和refresh数据到共享内存,以保证其他线程看到的都是最新值,非常耗费资源。

LongAdder的是实现原理和刚才的AtomicLong是有不同的,LongAdder每个线程会有自己的一个计算器,仅用来在自己线程内部计数,这样依赖就不会有和其他线程的计数器干扰。由于是每个线程都有一个计数器,所以不存在竞争关系,所以在累加过程中,不需要同步机制就,也不要flush和refresh。也没有一个公共的counter来给所有线程统一计数。

LongAdder引入了分段累加的概念,内部有一个base变量和一个Cell[]数组共同参与计算

  • base变量:竞争不激烈时,直接累加到该变量上
  • 竞争激烈时,各个线程分散累加到自己的槽Cell[i]中

最后通过sum()统计各个槽或者base的的值得到最终结果

这个sum方法不是加锁的,也就是说如果调用sum时,还有线程在进行累加,这个结果就不是准确的

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
 	// 如果数组是空的话,就直接返回base的值
    if (as != null) {
        // 否则累加每个槽上的值到sum上
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    // 最后返回sum
    return sum;
}

# LongAccumulator

也是累加,但是是自定义的累加函数。不仅仅可以进行累加,也可以进行乘,取最大最小值等,只需要自己实现函数即可。

accumulatorFunction为累加函数,identity为累加函数的初始值

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                       long identity)

# CAS

# 在Java中的实现

以AtomicInteger为例:

  • AtomicInteger加载Unsafe工具,用来直接操作内存数据
  • 用Unsafe来实现底层操作
  • 用Volatile修饰value字段,保证可见性
  • getAndAddInt

Unsafe类是CAS的核心类。Java无法直接访问底层操作系统,而是通过本地Native方法来访问。

valueOffset标识变量值在内存中的偏移位置,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了。

静态代码块,类加载时,就加载Unsafe工具类,并且使用Unsafe来获取value字段的内存地址值

getAndSet方法调用unsafe.getAndSetInt

这里是do...while循环,如果失败就进行重试,和乐观锁类似;如果成功就返回旧值

最后的compareAndSwapInt就是本地方法了

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

# 缺点

  • ABA问题
  • 自旋时间长

# final

# 修饰变量

被final修饰的变量,意味着值不能被修改。如果是对象,那么对象的引用不能变,但是对象自身的内容仍然可以变化。

package com.starry.concurrent.finaltest;

/**
 * @author starry
 * @version 1.0
 * @date 2022/1/7 20:36
 * @Description
 */
public class FinalTest {

    public static void main(String[] args) {
        final Person person = new Person();
        // 编译期间就报错
//        person.age = 1;
        // 对象内容可变
        person.address = "beijing";
        person.address = "shanghai";

    }
}

class Person {
    final int age = 0;
    final String name = "starry";
    String address;
}

属性被声明为final后,该变量则只能被赋值一次。且一旦被赋值,final的变量就不能再被改变。

修饰3种变量

  • 类中的final属性
    1. 声明变量时直接在等号右边赋值
    2. 构造函数中赋值
    3. 在类的初始代码块中赋值

如果不采用第一种方法赋值,那么就必须在2、3中选一种方法来赋值,不能不赋值。

class FinalVariable {
    //方法1
//    private int i = 1;
    private int i;

    // 方法2
//    public FinalVariable(int i) {
//        this.i = i;
//    }

    // 方法3
    {
        i = 1;
    }
}
  • 类中的static final属性
    1. 声明变量时直接在等号右边赋值
    2. 在类的static代码块中赋值
class FinalVariable {
    //方法1
//    private static int i = 1;
    private static int i;

    // 方法2
    static  {
        i = 1;
    }
}
  • 方法中的final属性 方法中没有构造函数,也没有初始代码块 可以等号右边赋值,只要在使用之前赋值就可以了
void testFinal() {
    final int t;
    System.out.println("其他代码");
    System.out.println("需要使用final变量");
    t = 1;
    System.out.println(t);
}

# 修饰方法

  • 构造方法不能被final修饰
public class FinalMethodTest {

    int i;

    // 编译报错
//    public final FinalMethodTest(int i) {
//        this.i = i;
//    }
    
}
  • final修饰的方法不能被重写,即override
public class FinalMethodTest {

    int i;

    // 编译报错
//    public final FinalMethodTest(int i) {
//        this.i = i;
//    }

    void drink() {}

    final void eat() {}

    static void sleep(){}

}

class SubClass extends FinalMethodTest{

    @Override
    void drink() {
        super.drink();
    }

    // 编译报错
    //void eat() {}

    // 不是重写,只是长得和父类一样,但是没有任何关系
    // 在类加载时就被绑定到两个不同的类
    public static void sleep() {
        
    }


}

# 修饰类

类不能被继承,比如String类

public final class String
    implements java.io.Serializable, Comparable<String>, CharSequence {}

测试1

static void testFinalString01() {
    String s1 = "wuKong2";
    final String s2 = "wuKong";
    String s3 = "wuKong";
    String s4 = s2 + 2;
    String s5 = s3 + 2;
    // true
    System.out.println( s1 == s4);
    // false
    System.out.println( s1 == s5);
}

s1和s4相等。因为s2被final修饰,在编译时就知道值,相当于常量,字面量2也是常量,所以编译时就知道s4的值,即知道s4是“wuKong2”,运行时会存放放在常量池中

s1是字符串(“wuKong2”)没被final修饰,运行时放入常量池,s4也要放入常量池,但是发现常量池已经有“wuKong2”了,就直接把s4的地址指向常量池的“wuKong2”。s1和s4都指向常量池的“wuKong2”,所以相等。


s1和s5不等。虽然s3在常量池,但是s3+2是在运行期间确定的,在堆中生产对象。s1是直接在常量池中的,所以不等。

测试2

static void testFinalString02() {
    String s1 = "wuKong2";
    final String s2 = returnString();
    String s3 = s2 + 2;
    // false
    System.out.println(s1 == s3);
}

不相等。因为s2的值是方法返回的,方法的返回是运行时才知道的结果,即在堆上。s1是常量池中的,所以不等。如果方法直接返回“wuKong2”并进行比较就是相等的,因为生成返回结果是字符串,先去常量池看看有没有这个字符串,有就用常量池的。

# 并发容器

# 古老的并发容器

Vector直接在方法上加上synchronized

Hashtable也是直接在方法上加上synchronized

Collections.synchronizedList(new ArrayList());将一个集合升级为安全的

public static <T> List<T> synchronizedList(List<T> list) {
    // ArrayList实现了RandomAccess(随机访问)接口,走new SynchronizedRandomAccessList<>(list)
    return (list instanceof RandomAccess ?
            new SynchronizedRandomAccessList<>(list) :
            new SynchronizedList<>(list));
}

继承自SynchronizedList

static class SynchronizedRandomAccessList<E>
    extends SynchronizedList<E>
    implements RandomAccess {

    SynchronizedRandomAccessList(List<E> list) {
        super(list);
    }
}

查看SynchronizedList,虽然不是直接在方法上加锁,但是在同一个对象上加锁,和Vector差不多。而且listIterator方法没有加锁,不是线程安全的,如果想要保证线程安全,需要我们在调用此方法时自行加锁。

由于以上的都是直接使用synchronized,效率低,所以出现ConcurrentHashMap和CopyOnWriteArrayList取代了他们

# ConcurrentHashMap

# HashMap缺点

HashMap的不安全体现在哪里?

  • 同时put时hash碰撞(值一样),导致数据丢失(只有一个对象put成功)
  • 同时put时需要扩容导致数据丢失(只有一个数组被应用)
  • 死循环造成CPU100%(JDK1.7及以前) 并不是HashMap的bug,HashMap不是线程安全的,如果想要使用线程安全的Map应该使用ConcurrentHashMap
// 扩容操作,从一个数组转移到另一个数组
void transfer(Entry[] newTable) {undefined
    Entry[] src = table;
    int newCapacity = newTable.length;
    for (int j = 0; j < src.length; j++) {undefined
        Entry<K,V> e = src[j];
        if (e != null) {undefined
          src[j] = null;
          do {undefined
            Entry<K,V> next = e.next; //多个线程执行可能会形成循环链表
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i]; 
            newTable[i] = e;
            e = next;
         } while (e != null); // 可能导致死循环
       }
    }
 }

# 源码分析

JDK1.7的ConcurrentHashMap实现和分析 2.png

JDK1.8的ConcurrentHashMap实现和分析 1.png

# put
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 不允许key和value为空
    if (key == null || value == null) throw new NullPointerException();
    // hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 放入key value
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果节点数组为空就进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果要插入的位置节点为空就直接原子操作cas的放入
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                // 添加完成,就退出循环
                break;                   // no lock when adding to empty bin
        }
        // 如果要插入的节点位置正在进行扩容,就帮助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 这里就是要放入的节点有值 锁住要插入的节点
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        // 链表操作
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // key存在 进行替换
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // key不存在,创建新节点,加在最后(尾插法)
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树操作
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 添加到红黑树中
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 添加完成
            if (binCount != 0) {
                // 如果当前节点上的元素数量 >= 8 尝试转为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    // 还要满足条件数组长度>= 64
                    treeifyBin(tab, i);
                if (oldVal != null)
                    // 返回旧值
                    return oldVal;
                break;
            }
        }
    }
    // 元素数量+1
    addCount(1L, binCount);
    return null;
}

putVal流程

  • 判断key,value不为空
  • 计算hash值
  • 根据对应位置节点的类型,进行赋值,或者helpTransfer,或者增长链表,或者给红黑树增加节点
  • 检测满足阈值就树化
  • 返回oldVal
# get
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // hash值
    int h = spread(key.hashCode());
    // 如果节点数组为空就直接返回null,还没初始化
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果hash值相等 并且key相等 直接返回节点的值
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 如果hash值小于0,说明是红黑树或则是转义节点(扩容中、临时hash)
        else if (eh < 0)
            // 就去寻找有没有对应的 有就返回旧值 否则返回null
            return (p = e.find(h, key)) != null ? p.val : null;
        // 这里就是链表
        while ((e = e.next) != null) {
            // 遍历链表查找是否有相等的hash和key
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get流程

  • 计算hash值
  • 找到对应的位置,根据情况进行查找
    • 直接取值
    • 红黑树里找值
    • 遍历链表取值
  • 返回找到的结果

# 正确使用ConcurrentHashMap

反例

public class OptionsNotSafe implements Runnable{

    private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            Integer score = scores.get("starry");
            // 线程不安全
            Integer newScore = score + 1;
            scores.put("starry", newScore);

        }
    }

    public static void main(String[] args) throws InterruptedException {
        scores.put("starry", 0);
        Thread thread1 = new Thread(new OptionsNotSafe());
        Thread thread2 = new Thread(new OptionsNotSafe());
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(scores.get("starry"));

    }
}

修改代码使多个组合操作线程安全

@Override
public void run() {
    for (int i = 0; i < 1000; i++) {
        while (true) {
            Integer score = scores.get("starry");
            // 线程不安全
            Integer newScore = score + 1;
            /*
             * 虽然get和put线程安全的,但是业务代码是不安全的
             * 此时就要使用replace,来保证线程安全,替换先比较旧值是否相等
             */
            boolean flag = scores.replace("starry", score, newScore);
            if (flag) {
                break;
            }
        }
    }
}

# CopyOnWriteArrayList

ArrayList缺点

  • 遍历时不能修改
  • 线程不安全

# 读写规则

回顾读写锁:读读共享、读写互斥、写写互斥

CopyOnWriteArrayList读写规则:读取不加锁,写入也不会阻塞。只有写入和写入之间需要进行同步等待

# 概要

CopyOnWrite容器是写时生成一个对象的副本,在副本上进行操作,操作完后把之前的对象指针指到副本上,以此来保证线程安全。读写分离

缺点:

  • 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。如果我们希望写入的数据,马上能读到,就不要使用CopyOnWrite容器
  • 内存占用问题:因为CopyOnWrite是写时复制,所以在进行写操作时,内存里会同时存在两个对象的内存

# 源码分析

# add
public boolean add(E e) {
 	// 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取旧数组
        Object[] elements = getArray();
        // 旧数组长度
        int len = elements.length;
        // 复制旧数组 -> 生成新数组,新数组长度为旧数组长度+1
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 要添加的元素放到新数组的最后一个位置
        newElements[len] = e;
        // 新数组(副本)替换旧数组
        setArray(newElements);
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}
# get
// 通过索引获取数据
public E get(int index) {
    return get(getArray(), index);
}

// 直接根据索引在数组中查找并返回
private E get(Object[] a, int index) {
    return (E) a[index];
}

# BlockingQueue

一个接口,表示阻塞队列,常用于数据共享的通道

主要方法:

  1. take:获取并移除队列的头结点,如果队列没有数据就一阻塞,直到有数据 put:插入元素。如果队列已满,一直阻塞,直到队列有空闲空间
  2. add:添加元素,如果队列满了,抛出异常 remove:异常元素,如果队列没有元素,抛出异常 element:返回队列的头元素,如果是空,也抛出异常
  3. offer:添加元素,返回boolean值,是否添加成功 poll:移除元素,返回boolean值,是否添加成功 peek:返回队列的头元素,如果是空,返回null

# ArrayBlockingQueue

public void put(E e) throws InterruptedException {
    // 非空判断
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 加可中断锁
    lock.lockInterruptibly();
    try {
        // 如果当前队列长度=队列总长度 就阻塞(等待)
        while (count == items.length)
            notFull.await();
        // 队列没满就添加
        enqueue(e);
    } finally {
        // 解锁
        lock.unlock();
    }
}

# LinkedBlockingQueue

// 两把锁
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 如果当前节点数量=最大节点数量就等待(默认为Integer.MAX_VALUE)
            while (count.get() == capacity) {
                notFull.await();
            }
            // 添加元素
            enqueue(node);
            c = count.getAndIncrement();
            // 添加完后小于最大数量
            if (c + 1 < capacity)
                // 唤醒
                notFull.signal();
        } finally {
            // 解锁
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

# PriorityBlockingQueue

  • 支持优先级
  • 自然顺序(不是先进先出)
  • 无界队列

# SynchronousQueue

  • 容量为0,直接传递
  • peek方法没有意义直接返回null,因为没有存储元素
  • Executors.newCachedThreadPool()使用的阻塞队列

# DelayQueue

  • 延迟队列,根据延迟时间排序
  • 元素需要实现Delayed接口,规定排序规则

# ConcurrentLinkedQueue

高效的非阻塞并发队列,使用链表实现。可以看作一个线程安全的LinkedList。

并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种。

顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景。用的相对比较少一些

# 并发流程控制

# Semaphore

package com.starry.concurrent.semaphoretest;

import java.util.concurrent.*;

/**
 * @author starry
 * @version 1.0
 * @date 2022/1/9 16:45
 * @Description
 */
public class SemaphoreTest {

    static Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 0, TimeUnit.SECONDS, new SynchronousQueue<>(true), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 100; i++) {
            executor.execute(new Task());
        }
        executor.shutdown();
    }

     static class Task implements Runnable {

        @Override
        public void run() {
            try {
                // 获取信号量
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+ "拿到了许可证");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放信号量
                System.out.println(Thread.currentThread().getName()+ "释放了许可证");
                semaphore.release();
            }
        }
    }
}

# CyclicBarrier

CyclicBarrier和CountDownLatch区别

作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0 ,也就是说CountDownLatch用于事件(等待数字0这个事件),但是CyclicBarrier是用于线程的(等待指定的线程数量)。

可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier可以重复使用。

public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            System.out.println("人员到齐,准备出发~~~");
        });

        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i,cyclicBarrier)).start();
        }

    }

    static class Task implements Runnable {

        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep((long) (Math.random()*10000));
                System.out.println("人员" + id + "到达集合地点");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}

# CountDownLatch

用法一:一个线程等待多个线程都执行完毕,再继续自己的工作

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(5);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        Runnable runnable = () -> {
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(finalI + "执行完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        };
        executor.execute(runnable);
    }
    System.out.println("等待所有任务执行~~~");
    countDownLatch.await();
    System.out.println("所有任务执行完成");
    executor.shutdown();

}

用法二:多个线程等待某一个线程的信号,同时开始执行

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        Runnable runnable = () -> {
            try {
                System.out.println(finalI + "准备就绪");
                countDownLatch.await();
                System.out.println(finalI + "开始执行任务");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        executor.execute(runnable);
    }
    Thread.sleep(5000);
    System.out.println("准备开始~~~");
    countDownLatch.countDown();
    executor.shutdown();

}

结合用法一和用法二

public static void main(String[] args) throws InterruptedException {
    CountDownLatch begin = new CountDownLatch(1);
    CountDownLatch end = new CountDownLatch(5);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        Runnable runnable = () -> {
            try {
                System.out.println(finalI + "准备就绪");
                begin.await();
                System.out.println(finalI + "开始执行任务");
                Thread.sleep((long) (Math.random()*10000));
                System.out.println(finalI + "完成任务");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                end.countDown();
            }
        };
        executor.execute(runnable);
    }
    Thread.sleep(5000);
    System.out.println("准备开始~~~");
    begin.countDown();

    end.await();
    System.out.println("所有任务都执行完成");
    executor.shutdown();

}

# Condition

public class ConditionTest {

    private int queueSize = 10;
    private Queue<Integer> queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock(true);
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }

        private void consume() {
            // 一直消费数据
            while (true) {
                lock.lock();
                try {
                    while (queue.isEmpty()) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 消费数据
                    queue.poll();
                    System.out.println("从队列取出一个数据,当前剩余"+queue.size());
                    // 唤醒线程
                    notFull.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }

        private void produce() {
            // 一直生产
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满了,等待消费数据");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 添加数据
                    queue.offer(1);
                    System.out.println("向队列添加一个元素,队列剩余空间:" + (queueSize - queue.size()));
                    // 唤醒线程
                    notEmpty.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        }
    }


    public static void main(String[] args) throws InterruptedException {
        ConditionTest conditionTest = new ConditionTest();
        Consumer consumer = conditionTest.new Consumer();
        Producer producer = conditionTest.new Producer();
        consumer.start();
        producer.start();

    }
}

# AQS

# 是什么

即AbstractQueuedSynchronizer,用于构建锁、同步器、协作工具类的攻击类(框架)。有了AQS后,更多的协作工具都可以很方便的被写出来。

总结:有了AQS,构建线程协作类就容易多了。

# 核心部分

  • state
  • 控制线程抢锁和配合的FIFO队列
  • 希望协作工具类去实现获取/释放等重要方法

# state

private volatile int state;

state是volatile修饰的,会被并发的修改,所有所有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态。

state的具体含义,会根据实现类的不同而不同。比如在Semaphore里,它表示”剩余的许可证数量“;而在CountDownLatch里,它表示”还需要倒数的数量“;在ReentrantLock中,state用来表示锁的占用情况,包括可重入计数。当state为0时,标识该Lock不被任何线程所占有

在ReentrantLock中,state用来表示锁的占有情况,包括可重入计数。当state的值为0的时候,标识该Lock不被任何线程占有。

# 控制线程抢锁和配合的FIFO队列

  • 这个队列用来存放等待的线程,AQS就是排队管理器,当多个线程争用同一个把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁
  • AQS会维护一个等待的线程队列,把线程都放到这个队列里
  • 这是一个双向链表的队列

# 希望协作工具类去实现获取/释放等重要方法

这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同

获取方法

获取操作会依赖state变量,经常会阻塞(比如获取不到锁的时候)

  • 在Semaphore中,获取就是acquire方法,作用是获取一个许可证
  • 在CountDownLatch中,获取就是await方法,作用是等待,直到倒数结束

释放方法

释放操作不会阻塞

  • 在Semaphore中,释放就是release方法,作用是释放一个许可证
  • 在CountDownLatch中,释放就是countDown方法,作用是倒数一个数

tryAcquire和tryRelease等方法需要自己的线程协作类去实现

# ASQ应用实例

  1. 写一个类,想好协作的逻辑,实现获取/释放方法
  2. 内部写一个Sync类继承AbstractQueuedSynchronizer
  3. 根据是否独占来重写tryAcquire/tryRelease 或 tryAcquireShared(int acuires)和tryReleaseShared(int releases)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或则Shared方法

# CountDownLatch

# 构造函数

将传入的参数赋值给Sync内部类,Sync再设置到AQS的state上

private static final class Sync extends AbstractQueuedSynchronizer {
    
    Sync(int count) {
        setState(count);
    }
    int getCount() {
        return getState();
    }
}

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
# getCount

就是返回state的值

public long getCount() {
    return sync.getCount();
}
# countDown
public void countDown() {
    sync.releaseShared(1);
}

来到AbstractQueuedSynchronizer.java

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 唤醒等待的全部线程
        doReleaseShared();
        return true;
    }
    return false;
}

调用CountDownLatch重写的tryReleaseShared方法

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前state值
        int c = getState();
        if (c == 0)
            return false;
        // 计数-1
        int nextc = c-1;
        // cas设置值
        if (compareAndSetState(c, nextc))
            // 减一后的值为0才返回true
            return nextc == 0;
    }
}

tryReleaseShared 返回 true 后,就会执行 doReleaseShared(); 来唤醒所有等待的线程

# await
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

来到 AbstractQueuedSynchronizer.java

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

CountDownLatch重写tryAcquireShared方法

当前数值是否为0,是返回1,不是返回-1

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

得到的结果是否小于0,小于0,进入等待队列;否则,继续执行

小于0,执行doAcquireSharedInterruptibly,当前线程进入等待队列

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 把当前线程包装成一个 Node
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

主要方法是parkAndCheckInterrupt(),挂起当前线程,进入阻塞状态

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

LockSupport.java

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

Unsafe.class

public native void park(boolean var1, long var2);
# 总结
  • 调用CountDownLatch的await方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞
  • 而”共享锁“可获取到的条件,就是”锁计数器“的值为0
  • 而”锁计数器“的初始值为count,每个线程调用该CountDownLatch对象的countDown()方法时,才将”锁计数器“-1
  • count个线程调用countDown()之后,”锁计数器“才为0,而前面提到的等待获取共享锁的线程才能继续允许

# Semaphore

# acquire

公平锁

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

AbstractQueuedSynchronizer.java

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 小于0,进入队列阻塞
        doAcquireSharedInterruptibly(arg);
}

调用Semaphore的实现

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                // 数量
                int available = getState();
                // 计算后的数量
                int remaining = available - acquires;
                // 如果计算后的数量小于0 或者 cas设置成功都返回计算后的数量
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

然后根据返回的结果判断是否进入等待,小于0等待;否则进行允许

# 总结
  • 在Semaphore中,state表示许可证的剩余数量
  • 看tryAcquire方法,判断nonfairTryAcquireShared大于等于0的话,表示成功
  • 这里会先检查剩余许可证数量够不够,用减法来计算。如果不够,返回负数,代表失败;如果够了,就自旋cas来设置state的值,直到改变成功就返回计算后的值;或者期间被其他线程修改导致剩余数量不够,那也返回负数代表获取失败。

# ReentrantLock

非公平锁

# tryRelease
protected final boolean tryRelease(int releases) {
    // 释放一次锁后的state值
    int c = getState() - releases;
    // 是否当前线程持有锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 本次释放完锁后state为0
    if (c == 0) {
        free = true;
        // 当前持有锁的线程设置为null
        setExclusiveOwnerThread(null);
    }
    // 不用cas,因为只有当前线程才能释放
    setState(c);
    // 只有重入的锁都释放完了才返回true,否则返回false
    return free;
}

AbstractQueuedSynchronizer.java

public final boolean release(int arg) {
    // 上面返回的结果
    if (tryRelease(arg)) {
        // 释放成功
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒节点的后继者(如果存在)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

上面就是unlock的实现。

# acquire
final void lock() {
    // cas把state值从0改为1
    if (compareAndSetState(0, 1))
        // 设置当前拥有独占访问权限的线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // cas修改失败
        acquire(1);
}

AbstractQueuedSynchronizer.java

public final void acquire(int arg) {
    // 尝试获取锁,如果获取失败放入等待队列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

ReentrantLock.java

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // cas 更新
        if (compareAndSetState(0, acquires)) {
            // 设置锁为当前线程持有
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 重入了
    else if (current == getExclusiveOwnerThread()) {
        // 当前重入次数+1(默认)
        int nextc = c + acquires;
        // int最大值+1,发生溢出
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新重入次数
        setState(nextc);
        return true;
    }
    return false;
}

根据返回结果,返回false取反,true,将当前线程放入等待队列;返回true取反,false,继续运行

public final void acquire(int arg) {
    // 尝试获取锁,如果获取失败放入等待队列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
# 总结

加解锁要判断是不是当前线程 和 是否重入。

# 手写一个

模拟一个CountDownLatch

public class OneShotLatch {

    /**
     * 自定义latch 假设:state 1放行,0等待
     */
    private static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 小于 0 放入阻塞队列
         * @param arg
         * @return
         */
        @Override
        protected int tryAcquireShared(int arg) {
            return getState() == 1 ? 1 : -1;
        }

        /**
         * true 继续执行,false 继续阻塞
         * @param arg
         * @return
         */
        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    /**
     * 阻塞当前线程
     */
    public void await() {
        // 调用父类acquireShared方法,找到我们写的实现类tryAcquireShared方法
        sync.acquireShared(0);
    }

    /**
     * 唤醒全部等待线程
     */
    public void signal() {
        sync.releaseShared(0);
    }

    public static void main(String[] args) throws InterruptedException {
        OneShotLatch latch = new OneShotLatch();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+"开始等待"+new Date());
                latch.await();
                System.out.println(Thread.currentThread().getName()+"继续运行"+new Date());
            }).start();
        }
        Thread.sleep(3000);
        // 唤醒全部线程
        latch.signal();
        // 一次性latch,后续线程都不用等待

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"开始等待"+new Date());
            latch.await();
            System.out.println(Thread.currentThread().getName()+"继续运行"+new Date());
        }).start();
    }
}

# Future

Callable和Future的关系

  • 我们可以用Future.get来获取Callable接口返回的执行结果,还可以通过Future.isDone()来判断任务是否已经执行完了,以及取消这个任务,限时获取任务的结果等等
  • 在call()未执行完毕前,调用get()的线程(假定此时是主线程)会被阻塞,直到call()方法返回了结果后,此时future.get()才会得到该结果,然后主线程才会切换到runnable状态
  • 所以Future是一个容器,它存储了call()这个任务的结果,而这个任务的执行时间是无法提取确定的,因为这完全取决于call()方法执行的情况

主要方法

  • get():获取结果,get的结果取决于Callable任务的状态
    • 任务正常完成,get方法会立刻返回结果
    • 任务尚未完成(任务还没开始或进行中),get将阻塞直到任务完成
    • 任务执行过程中抛出异常,get方法会抛出ExecutionException:不论call()执行时抛出的异常类型是什么,最后get方法抛出的异常类型都是ExecutionException
    • 任务被取消,get方法会抛出CancellationException
    • 任务超时,get方法有一个重载方法,是传入一个延迟时间,如果时间到了还没有结果返回,get方法就会抛出TimeoutException get(long timeout, TimeUnit unit)
  • cancel:取消任务的执行
    • 如果这个任务还是没有执行,任务会被正常取消,未来也不会被执行,方法返回true
    • 如果任务已经完成,或者已取消,那么cancel会执行失败,返回false
    • 如果这个任务已经开始执行了,那么这个取消方法将不会直接取消该方法,而是根据我们传入的参数_mayInterruptIfRunning_做判断
      • true:对执行中的任务发出中断信号,任务可以选择中断或者不中断,主动权在任务,我们只是先它发出信号。
      • false:不会对执行中的任务发出中断信号,对任务没有任何影响,任务继续执行

Future.cancel(true):适用于,我们知道任务有能力正确处理中断

Future.cancel(false):适用于

  • 任务没有能力处理中断
  • 不清楚任务是否支持取消
  • isDone:方法是否执行完毕
  • isCancelled:任务是否被取消

简单使用

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    Future<Integer> future = pool.submit(() -> {
        Thread.sleep(3000);
        return new Random().nextInt();
    });
    System.out.println(future.get());
    pool.shutdown();
}

批量接收结果

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);
    List<Future> list = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
        Future<Integer> future = pool.submit(() -> {
            Thread.sleep(1000);
            return new Random().nextInt();
        });
        list.add(future);
    }
    for (int i = 0; i < list.size(); i++) {
        System.out.println(list.get(i).get());
    }
    pool.shutdown();
}

异常演示

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);
    Future<Integer> future = pool.submit(() -> {
        throw new NullPointerException();
    });

    for (int i = 0; i < 3; i++) {
        Thread.sleep(1000);
        System.out.println(i);
    }
    System.out.println("任务执行完成:"+future.isDone());
    try {
        future.get();
    } catch (NullPointerException e1) {
        e1.printStackTrace();
        System.out.println("NullPointerException异常");
    } catch (ExecutionException e2) {
        e2.printStackTrace();
        System.out.println("ExecutionException异常");
    }

    pool.shutdown();
}

输出结果

0
1
2
任务执行完成:true
ExecutionException异常

任务一开始就抛出异常,但是没有get线程就能继续执行,只有在get时才会抛出异常,且异常是ExecutionException;isDone只是判断任务是否完成,不论成功还是失败(异常)。

演示cancel

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService pool = Executors.newFixedThreadPool(3);
        Future<String> future = pool.submit(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("正常执行");
                return "ok";
            } catch (InterruptedException e) {
                System.out.println("处理中断~~~");
                return "被中断了";
            }

        });
        Thread.sleep(1000);
        future.cancel(true);
//        future.cancel(false);
        pool.shutdown();

    }

FutureTask

前面都是通过线程池的submit来得到Future,还是可以使用FutureTask来创建Future

FutureTask继承了Runnable和Future

public static void main(String[] args) throws ExecutionException, InterruptedException {

    ExecutorService pool = Executors.newCachedThreadPool();

    FutureTask<String> futureTask = new FutureTask<>(() -> {
        Thread.sleep(3000);
        return "ok";
    });
    pool.submit(futureTask);
    System.out.println(futureTask.get());
}
上次更新: 2024/03/03, 08:36:37
多线程

多线程→

Theme by Vdoing | Copyright © 2023-2024 Starry | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式