并发编程
# 并发工具类
分类
- 为了并发安全:互斥同步、非互斥同步、无同步方案
- 管理线程、提高效率
- 线程协作
# 线程池
# 线程池的创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数
- corePoolSize:核心线程数 线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待任务到来时再创建新线程去执行任务。一般情况下线程池中的数量不会小于核心线程数,就算没有任务了,这些线程也不会被销毁
- maximumPoolSize:最大线程数 当核心线程处理不过来,就需要额外新创建一些线程,但是这些新增的线程是有上限的,就是这个maximumPoolSize
- keepAliveTime:保持存活时间 如果线程池当前的线程数量多于corePoolSize,那么多于的线程空闲时间超过keepAliveTime,它们就会被终止
- unit:时间单位
- workQueue:任务存储队列
3种常见的队列类型
- 直接队列:SynchronousQueue
- 无界队列:LinkedBlockingQueue
- 有界队列:ArrayBlockingQueue
- threadFactory:生成新线程的工厂 新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否为守护线程等
- handler:拒绝策略
# 线程添加规则
- 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
- 如果线程数大于等于corePoolSize但小于maximumPoolSize,则将任务放入队列
- 如果队列已满,并且线程数小于maximumPoolSize,则创建一个新线程来运行任务
- 如果队列已满,并且线程数大于或等于maximumPoolSize,则拒绝该任务
# 增减线程的特点
- 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池
- 线程池希望保持较少的线程数(corePoolSize),并且只有在负载(workQueue)变得很大时才增加它(maximumPoolSize)
- 通过设置maximumPoolSize为很高的值,比如Integer.MAX_SIZE,可以允许线程池容纳任意数量的并发任务
- 只有在队列填充满时才创建多于corePoolSize的线程,所以如果我们使用无界队列(LinkedBlockingQueue),那么线程数量就不会超过corePoolSize
# newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建一个corePoolSize和maximumPoolSize相同的线程池,就是固定大小的线程池
额外存活时间为0ms,因为固定大小不存在额外线程,这个没有意义
工作队列为LinkedBlockingQueue无界队列,当线程达到上限后,直接放入工作队列,但是这个工作队列无上限,一直放,直到抛出异常。
# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个corePoolSize和maximumPoolSize相同的线程池,都是1,就相当于单线程
和上面的类似,只是这个线程数是1,还是有同样的问题,当请求处理不过来,请求对接,占用大量内存,最终OOM
# newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程为0,最大线程为Integer.MAX_VALUE,存活时间60s,直接队列
来一个任务创建一个任务,也可能造成OOM
# newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(20);
// 3s执行一次
threadPool.schedule(new Task(), 3, TimeUnit.SECONDS);
// 最开始1s执行一次,执行完后3s执行一次
threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
}
可以指定核心线程数,最大线程数还是Integer.MAX_VALUE
是个定时队列,可以定期执行任务
可以看到Executors的4个创建线程池的方法,都不怎么好用,所以我们需要自己创建线程池。
# 如何设置线程数
- CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1~2倍左右
- IO密集型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控线上繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法 线程数 = CPU核心数*(1 + 平均等待时间 / 平均工作时间) 假设cpu为8核,读写数据库需要等待1000ms,而业务代码执行只需要10ms,所以线程数就是 8*(1+1000/10)=808
# 线程池的关闭
- shutdown 使用后不是立即关闭线程池,而是等所有的线程都执行完才进行关闭,如果后续还有任务添加,会直接拒绝
- isShutdown 判断线程是否进入停止状态(上面的状态)
- isTerminated 判断线程池是否完全停止了(任务都执行完了)
- awaitTermination (检测)在指定的时间内线程池是否完全关闭
- shutdownNow 使用interrupt尝试中断真正线程,并返回在队列中等待的线程
# 拒绝策略
当线程关闭后,再添加任务,会直接拒绝
当线程数达到最大线程数,且工作队列也满了的话,会使用指定的拒绝策略拒绝任务
4种拒绝策略
- AbortPolicy 直接抛出异常
- DiscardPolicy 直接丢弃提交的任务,不会抛出异常
- DiscardOldestPolicy 丢弃队列中最老的任务
- CallerRunsPolicy 直接让传入任务的线程执行
# 钩子方法
在每个线程执行前后都能进行额外操作
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
使用钩子方法实现线程池暂停和恢复
package com.starry.concurrent;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author starry
* @version 1.0
* @date 2021/12/30 20:56
* @Description 可暂停的线程池
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
/**
* 加锁 暂停
*/
public void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
/**
* 加锁 恢复
*/
public void resume() {
lock.lock();
try {
isPaused = false;
condition.signalAll();
} finally {
lock.unlock();
}
}
/**
* 每个任务线程执行前都会执行此方法
* @param t
* @param r
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
lock.lock();
try {
// 如果是暂停当前线程就等待
while (isPaused) {
condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
super.beforeExecute(t, r);
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool threadPool = new PauseableThreadPool(1, 2, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));
for (int i = 0; i < 1000; i++) {
threadPool.execute(() -> {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(2000);
// 暂停线程池
threadPool.pause();
System.out.println("thread paused");
// 恢复线程池
Thread.sleep(5000);
threadPool.resume();
System.out.println("thread resumed");
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
}
# 线程池结构体系
Executor是个接口,只有一个execute()
方法
ExecutorService实现了Executor接口,并且提供了关闭方法
AbstractExecutorService实现了ExecutorService接口,提供了默认方法实现,是个抽象方法
ThreadPoolExecutor继承AbstractExecutorService,提供具体实现,以及额外方法
Executors和上面的类或接口,没有直接实现或继承关系,是个工具类,只是帮我创建线程的。
# 线程池状态
- RUNNING 接收新任务,并且处理排队任务
- SHUTDOWN 不接收新任务,但处理排队任务
- STOP 不接受新任务,也不处理排队任务,并中断真正进行的任务
- TIDYING 整洁的意思;所有任务都已终止,
workerCount
为零时,线程会转到TIDYING状态,并运行terminate()
钩子方法 - TERMINATED
terminate()
允许完成
# execute源码
public void execute(Runnable command) {
// command就是我们传入要执行的任务
if (command == null)
throw new NullPointerException();
// 获取正在执行的任务数量
int c = ctl.get();
// 如果数量小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 就新增一个工作线程去执行我们传入的任务,true为核心线程
if (addWorker(command, true))
return;
// 再获取一下线程运行数量
c = ctl.get();
}
/*任务数量大于等于核心数量*/
// 如果线程池是运行状态,就添加任务到工作队列
if (isRunning(c) && workQueue.offer(command)) {
// 再检测一遍线程数量,因为此时可能线程被终止了
int recheck = ctl.get();
// 如果是终止状态,就把当前任务从工作队列移除
if (! isRunning(recheck) && remove(command))
// 并拒绝任务
reject(command);
// 如果线程数量为0(可能抛出异常,导致线程数量减少),也要创建一个新的工作线程,false不是核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*大于等于核心线程,并且工作队列也满了*/
// 创建非核心线程去执行任务,如果创建失败就拒绝
else if (!addWorker(command, false))
reject(command);
}
# ThreadLocal
# 使用场景
主要是下面两大使用场景
- 每个线程需要有独享的对象(通常是工具类,比如SimpleDateFormat和Random)
/**
* @author starry
* @version 1.0
* @date 2022/1/1 16:10
* @Description 使用ThreadLocal保证线程安全
*/
public class ThreadLocalTest {
public static void main(String[] args) {
// 只有100个SimpleDateFormat对象,而不是每个线程都创建一个对象
ExecutorService threadPool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 1000; i++) {
int finalI = i;
threadPool.execute(() -> {
String s = printTime(finalI);
System.out.println(s);
});
}
threadPool.shutdown();
}
public static String printTime(int seconds) {
Date date = new Date(1000L * seconds);
// 每个线程持有不同的SimpleDateFormat
SimpleDateFormat formatThreadLocal = ThreadSafeFormatter.dateFormatThreadLocal.get();
return formatThreadLocal.format(date);
}
}
class ThreadSafeFormatter{
public static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
- 每个线程内需要保存全局变量(例如在拦截器中获取用户信息),可以让不用方法直接使用,避免层层传参的麻烦
/**
* @author starry
* @version 1.0
* @date 2022/1/1 16:29
* @Description 使用ThreadLocal避免层次传参,同个线程,变量共享
*/
public class ThreadLocalTest02 {
public static void main(String[] args) {
new Service1().fun1();
}
}
class User{
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
class UserContextHolder{
public static ThreadLocal<User> holder = new ThreadLocal<>();
}
class Service1{
public void fun1(){
User user = new User();
user.setName("starry");
UserContextHolder.holder.set(user);
// 服务1执行完,调用服务2
new Service2().fun2();
}
}
class Service2{
public void fun2(){
User user = UserContextHolder.holder.get();
System.out.println("fun2"+user.getName());
// 服务2执行完,调用服务3
new Service3().fun3();
}
}
class Service3{
public void fun3(){
ThreadLocal<User> holder = UserContextHolder.holder;
System.out.println("fun3"+holder.get().getName());
holder.remove();
}
}
# 主要方法
initialValue() 初始化
- initialValue() 返回当前线程的初始值,这是一个延迟加载的方法,只有在调用get()的时候,才会触发
- 线程第一次调用get()方法访问变量时,调用此方法,触发线程向前调用了set()方法,在这种情况下,不会为线程调用initialValue()方法
- 通常,每个线程最多只调用一次此方法,但是如果已经调用了remove()后,再调用get(),就可以再次调用次方法
- 如果不重写此方法,这个方法会返回null。一般使用匿名内部类的方法来重写initialValue()方法,以便再后续使用中可以初始化副本对象
set() 为这个线程设置一个新值
get() 得到这个线程对应的value
remove() 删除对应这个线程的值
get()
取出当前线程的ThreadLocalMap
,然后调用map.getEntr()
方法,把本ThreadLocal
的引用作为参数传入,取出map
中属于ThreadLocal
的value
这里的key和value都是保存在线程(ThreadLocalMap)中的,而不是保存在ThreadLocal中
public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap
ThreadLocalMap map = getMap(t);
// 如果ThreadLocalMap不为空
if (map != null) {
/*
* 获取当前线程中的ThreadLocalMap对象,相当于map,里面存放多个ThreadLocal
* 传入ThreadLocal,获取对应的Entry
* 再获取Entry的value,即真正存放的值
*/
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 如果为空就set初始化的值,懒加载
return setInitialValue();
}
// 获取当前线程的ThreadLocalMap对象
ThreadLocalMap getMap(Thread t) {
// ThreadLocal.ThreadLocalMap threadLocals = null;
return t.threadLocals;
}
// 设置初始化值
private T setInitialValue() {
// 返回初始值
T value = initialValue();
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果ThreadLocalMap不为空,就设置一个ThreadLocal对象,key就是当前的ThreadLocal,value就是初始值
if (map != null)
map.set(this, value);
else
// 如果为空(默认为null),就创建一个ThreadLocalMap,再去设置一个ThreadLocal对象,key就是当前的ThreadLocal,value就是初始值
createMap(t, value);
// 最后返回默认值
return value;
}
// 返回初始值,默认为null,可进行重写
protected T initialValue() {
return null;
}
set()
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 不为空就设置
if (map != null)
map.set(this, value);
else
// 为空就创建 再设置
createMap(t, value);
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
remove()
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
// 根据key去删除Entry对象
m.remove(this);
}
# 注意点
- 内存泄漏
- static对象,共享
- 尽量使用框架提供的ThreadLocal对象,防止忘记remove 比如:DateTimeContextHolader、RequestContextHolder
内存泄漏:某个对象不再使用,但是内存却不能回收
// Entry 继承 弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
// 将k置为弱引用
super(k);
// 但是v还是强引用
value = v;
}
}
弱引用,如果这个对象只被弱引用关联(没有任何强引用关联),那么这个对象就可以被回收
- ThreadLocalMap的每个Entry都是一个key的弱引用,同时,每个Entry都包含一个对value的强引用
- 正常情况下,当线程终止,保存在ThreadLocalMap里的value会被垃圾回收,因为没有任何强引用了
- 但是,如果线程不终止(比如线程池的线程重复使用),那么key对应的value就不能被回收
Thread->ThreadLocalMap->Entry(key 为 null)->value有值
- 因为value和Thread之间还存在强引用,所以导致value无法回收,可能最终导致OOM
- 作者已经考虑了这个问题,所以在set、remove、rehash返回中会扫描key为null的Entry,并把对应的value设置为null,这样value就可以被回收了
- 但是如果一个ThreadLocal不被使用,同时线程又不停止,那么调用链就一直存在,就导致了value的泄漏
- 调用remove方法,就会删除对应的Entry对象,可以避免内存泄漏,所以使用完ThreadLocal后,应该调用remove方法
# Lock
有synchronized为什么还要lock?
- 效率低:锁的释放情况少,试图获得锁时不能超时,不能中断一个正在试图获得锁的线程
- 不够灵活:加锁和释放的时机单一,每个锁仅有单一的条件(某个对象),可能是不够的
- 无法知道是否成功获取到锁
# 主要方法
lock()
- lock()就是最普通的获取锁。如果锁已经被其他线程获取,则进行等待
- Lock不会像synchronized一样在异常时自动释放锁
- 因此,我们需要在finally块中释放锁,以保证发生异常时锁的释放
- lock()方法不能被中断,一旦陷入死锁,lock()就会永久等待
tryLock()
- tryLock()用来尝试获取锁,如果当前锁没有被其他线程占用,则获取锁成功,返回true,否则返回false,获取锁失败
- 相比于lock(),我们可以根据是否能获取到锁来决定后续程序的行为
- 该方法会立即返回,即便在拿不到锁时也不会一直等待
tryLock(long time, TimeUnit unit) 超时就放弃
lockInterruptibly() 相当于tryLock(long time,TimeUnit unit)把超时时间设置为无限。在等待锁的过程中,线程可以被中断
unlock() 解锁
可见性保证
拥有_happens-before_原则
lock的加解锁和synchronized有同样的内存语义,也就是说,下一个线程加锁后可以看到所有之前线程解锁前发生的所有操作
# 锁的分类
线程要不要锁住同步资源 | 悲观锁 乐观锁 |
多个线程能否共享一把锁 | 共享锁 独占锁 |
多线程竞争时,是否需要排队 | 公平锁 非公平锁 |
同一个线程是否可以重复获得同一把锁 | 可重入锁 不可重入锁 |
是否可中断 | 可中断锁 不可中断锁 |
等待锁的过程 | 自旋锁 非自旋锁 |
ReentrantReadWriteLock
读锁之间不互斥,多个线程可以同时持有读锁
读写之间互斥,如果一个线程持有读锁,其他线程想要申请写锁,必须等读锁释放,只能有一个线程拥有写锁。
如果是公平锁,获取读写锁的顺序和等待顺序一致
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
// 获取写锁是否阻塞
final boolean writerShouldBlock() {
// 如果队列前面还有线程要获取锁就阻塞
return hasQueuedPredecessors();
}
// 获取写锁是否阻塞
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
如果是非公平锁
- 写锁可以随时插队
- 读锁只有在等待队列的第一个获取的锁的不是写锁时才能插队
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
// 写锁是否阻塞,返回false
final boolean writerShouldBlock() {
return false; // writers can always barge
}
// 读锁是否阻塞
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
// 判断队列的第一个要获取的锁是不是排他锁(写锁),是就进行阻塞(排队),否则就插队
return apparentlyFirstQueuedIsExclusive();
}
}
支持锁的降级(写锁至读锁),不支持升级(防止死锁)
# 原子类
六类原子类:
Atomic*基本类型原子类 | AtomicInteger |
AtomicLong AtomicBoolean | | AtomicArrray数组类型原子类 | AtomicIntegerArray AtomicLongArray AtomicReferenceArray | | AtomicReference引用类型原子类 | AtomicReference AtomicStampedReference AtomicMarkableReference | | Atomic*FieldUpdater升级类型原子类 | AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater | | Adder累加器 | LongAdder DoubleAdder | | Accumulator累加器 | LongAccumulator DoubleAccumulator |
# AtomicInteger
AtomicInteger常用方法
- public final int get() 获取当前值
- public final int getAndSet(int newValue) 获取当前值,并设置新值
- public final int getAndIncrement() 获取当前值,并自增1
- public final int getAndDecrement() 获取当前值,并自减1
- public final int getAndAdd(int delta) 获取当前值,并加上指定的值
- public final boolean compareAndSet(int expect, int update) 如果当前的数组等于预期(expect)的值,才以原子方式将该值设置为输入值(update)
# AtomicReference
使用AtomicReference实现自旋锁
public class AtomicReferenceTest {
/**
* 标志位 标志哪个线程拿到锁
*/
private AtomicReference<Thread> sign = new AtomicReference<>();
public void lock() {
Thread thread = Thread.currentThread();
// 只有标志为null时才加锁成功,并且设置标志为当前线程
while (!sign.compareAndSet(null,thread)){
try {
System.out.println("获取锁失败,睡眠500ms再试");
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
// 只有标志是当前线程,才解锁成功,并且设置标志为null
sign.compareAndSet(thread, null);
}
public static void main(String[] args) {
AtomicReferenceTest lock = new AtomicReferenceTest();
Runnable runnable = new Runnable() {
@Override
public void run() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"获取到自旋锁");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
};
new Thread(runnable).start();
new Thread(runnable).start();
}
}
# AtomicIntegerFieldUpdater
变量升级为原子类型
升级是基于反射的,要注意
变量必须被volatile修改
变量不能被private修饰
变量不能被static修饰
package com.starry.concurrent.atomictest;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* @author starry
* @version 1.0
* @date 2022/1/2 19:12
* @Description
*/
public class AtomicIntegerFieldUpdaterTest implements Runnable{
static User u1;
static User u2;
static AtomicIntegerFieldUpdater<User> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class,"score");
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
u1.score++;
fieldUpdater.getAndIncrement(u2);
}
}
public static void main(String[] args) throws InterruptedException {
u1 = new User();
u2 = new User();
AtomicIntegerFieldUpdaterTest r = new AtomicIntegerFieldUpdaterTest();
Thread thread1 = new Thread(r);
Thread thread2 = new Thread(r);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(u1.score);
System.out.println(u2.score);
}
public static class User {
volatile int score;
}
}
# LongAdder
AtomicLong由于每次操作,都要flush和refresh数据到共享内存,以保证其他线程看到的都是最新值,非常耗费资源。
LongAdder的是实现原理和刚才的AtomicLong是有不同的,LongAdder每个线程会有自己的一个计算器,仅用来在自己线程内部计数,这样依赖就不会有和其他线程的计数器干扰。由于是每个线程都有一个计数器,所以不存在竞争关系,所以在累加过程中,不需要同步机制就,也不要flush和refresh。也没有一个公共的counter来给所有线程统一计数。
LongAdder引入了分段累加的概念,内部有一个base变量和一个Cell[]数组共同参与计算
- base变量:竞争不激烈时,直接累加到该变量上
- 竞争激烈时,各个线程分散累加到自己的槽Cell[i]中
最后通过sum()统计各个槽或者base的的值得到最终结果
这个sum方法不是加锁的,也就是说如果调用sum时,还有线程在进行累加,这个结果就不是准确的
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
// 如果数组是空的话,就直接返回base的值
if (as != null) {
// 否则累加每个槽上的值到sum上
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
// 最后返回sum
return sum;
}
# LongAccumulator
也是累加,但是是自定义的累加函数。不仅仅可以进行累加,也可以进行乘,取最大最小值等,只需要自己实现函数即可。
accumulatorFunction为累加函数,identity为累加函数的初始值
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity)
# CAS
# 在Java中的实现
以AtomicInteger为例:
- AtomicInteger加载Unsafe工具,用来直接操作内存数据
- 用Unsafe来实现底层操作
- 用Volatile修饰value字段,保证可见性
- getAndAddInt
Unsafe类是CAS的核心类。Java无法直接访问底层操作系统,而是通过本地Native方法来访问。
valueOffset标识变量值在内存中的偏移位置,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了。
静态代码块,类加载时,就加载Unsafe
工具类,并且使用Unsafe
来获取value
字段的内存地址值
getAndSet
方法调用unsafe.getAndSetInt
这里是do...while
循环,如果失败就进行重试,和乐观锁类似;如果成功就返回旧值
最后的compareAndSwapInt
就是本地方法了
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
# 缺点
- ABA问题
- 自旋时间长
# final
# 修饰变量
被final修饰的变量,意味着值不能被修改。如果是对象,那么对象的引用不能变,但是对象自身的内容仍然可以变化。
package com.starry.concurrent.finaltest;
/**
* @author starry
* @version 1.0
* @date 2022/1/7 20:36
* @Description
*/
public class FinalTest {
public static void main(String[] args) {
final Person person = new Person();
// 编译期间就报错
// person.age = 1;
// 对象内容可变
person.address = "beijing";
person.address = "shanghai";
}
}
class Person {
final int age = 0;
final String name = "starry";
String address;
}
属性被声明为final后,该变量则只能被赋值一次。且一旦被赋值,final的变量就不能再被改变。
修饰3种变量
- 类中的final属性
- 声明变量时直接在等号右边赋值
- 构造函数中赋值
- 在类的初始代码块中赋值
如果不采用第一种方法赋值,那么就必须在2、3中选一种方法来赋值,不能不赋值。
class FinalVariable {
//方法1
// private int i = 1;
private int i;
// 方法2
// public FinalVariable(int i) {
// this.i = i;
// }
// 方法3
{
i = 1;
}
}
- 类中的static final属性
- 声明变量时直接在等号右边赋值
- 在类的static代码块中赋值
class FinalVariable {
//方法1
// private static int i = 1;
private static int i;
// 方法2
static {
i = 1;
}
}
- 方法中的final属性 方法中没有构造函数,也没有初始代码块 可以等号右边赋值,只要在使用之前赋值就可以了
void testFinal() {
final int t;
System.out.println("其他代码");
System.out.println("需要使用final变量");
t = 1;
System.out.println(t);
}
# 修饰方法
- 构造方法不能被final修饰
public class FinalMethodTest {
int i;
// 编译报错
// public final FinalMethodTest(int i) {
// this.i = i;
// }
}
- final修饰的方法不能被重写,即override
public class FinalMethodTest {
int i;
// 编译报错
// public final FinalMethodTest(int i) {
// this.i = i;
// }
void drink() {}
final void eat() {}
static void sleep(){}
}
class SubClass extends FinalMethodTest{
@Override
void drink() {
super.drink();
}
// 编译报错
//void eat() {}
// 不是重写,只是长得和父类一样,但是没有任何关系
// 在类加载时就被绑定到两个不同的类
public static void sleep() {
}
}
# 修饰类
类不能被继承,比如String类
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {}
测试1
static void testFinalString01() {
String s1 = "wuKong2";
final String s2 = "wuKong";
String s3 = "wuKong";
String s4 = s2 + 2;
String s5 = s3 + 2;
// true
System.out.println( s1 == s4);
// false
System.out.println( s1 == s5);
}
s1和s4相等。因为s2被final修饰,在编译时就知道值,相当于常量,字面量2也是常量,所以编译时就知道s4的值,即知道s4是“wuKong2”,运行时会存放放在常量池中
s1是字符串(“wuKong2”)没被final修饰,运行时放入常量池,s4也要放入常量池,但是发现常量池已经有“wuKong2”了,就直接把s4的地址指向常量池的“wuKong2”。s1和s4都指向常量池的“wuKong2”,所以相等。
s1和s5不等。虽然s3在常量池,但是s3+2是在运行期间确定的,在堆中生产对象。s1是直接在常量池中的,所以不等。
测试2
static void testFinalString02() {
String s1 = "wuKong2";
final String s2 = returnString();
String s3 = s2 + 2;
// false
System.out.println(s1 == s3);
}
不相等。因为s2的值是方法返回的,方法的返回是运行时才知道的结果,即在堆上。s1是常量池中的,所以不等。如果方法直接返回“wuKong2”并进行比较就是相等的,因为生成返回结果是字符串,先去常量池看看有没有这个字符串,有就用常量池的。
# 并发容器
# 古老的并发容器
Vector
直接在方法上加上synchronized
Hashtable
也是直接在方法上加上synchronized
Collections.synchronizedList(new ArrayList());
将一个集合升级为安全的
public static <T> List<T> synchronizedList(List<T> list) {
// ArrayList实现了RandomAccess(随机访问)接口,走new SynchronizedRandomAccessList<>(list)
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
继承自SynchronizedList
static class SynchronizedRandomAccessList<E>
extends SynchronizedList<E>
implements RandomAccess {
SynchronizedRandomAccessList(List<E> list) {
super(list);
}
}
查看SynchronizedList
,虽然不是直接在方法上加锁,但是在同一个对象上加锁,和Vector
差不多。而且listIterator
方法没有加锁,不是线程安全的,如果想要保证线程安全,需要我们在调用此方法时自行加锁。
由于以上的都是直接使用synchronized
,效率低,所以出现ConcurrentHashMap
和CopyOnWriteArrayList
取代了他们
# ConcurrentHashMap
# HashMap缺点
HashMap的不安全体现在哪里?
- 同时put时hash碰撞(值一样),导致数据丢失(只有一个对象put成功)
- 同时put时需要扩容导致数据丢失(只有一个数组被应用)
- 死循环造成CPU100%(JDK1.7及以前) 并不是HashMap的bug,HashMap不是线程安全的,如果想要使用线程安全的Map应该使用ConcurrentHashMap
// 扩容操作,从一个数组转移到另一个数组
void transfer(Entry[] newTable) {undefined
Entry[] src = table;
int newCapacity = newTable.length;
for (int j = 0; j < src.length; j++) {undefined
Entry<K,V> e = src[j];
if (e != null) {undefined
src[j] = null;
do {undefined
Entry<K,V> next = e.next; //多个线程执行可能会形成循环链表
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
} while (e != null); // 可能导致死循环
}
}
}
# 源码分析
# put
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许key和value为空
if (key == null || value == null) throw new NullPointerException();
// hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 放入key value
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果节点数组为空就进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果要插入的位置节点为空就直接原子操作cas的放入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// 添加完成,就退出循环
break; // no lock when adding to empty bin
}
// 如果要插入的节点位置正在进行扩容,就帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 这里就是要放入的节点有值 锁住要插入的节点
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
// 链表操作
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key存在 进行替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// key不存在,创建新节点,加在最后(尾插法)
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树操作
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 添加到红黑树中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 添加完成
if (binCount != 0) {
// 如果当前节点上的元素数量 >= 8 尝试转为红黑树
if (binCount >= TREEIFY_THRESHOLD)
// 还要满足条件数组长度>= 64
treeifyBin(tab, i);
if (oldVal != null)
// 返回旧值
return oldVal;
break;
}
}
}
// 元素数量+1
addCount(1L, binCount);
return null;
}
putVal流程
- 判断key,value不为空
- 计算hash值
- 根据对应位置节点的类型,进行赋值,或者helpTransfer,或者增长链表,或者给红黑树增加节点
- 检测满足阈值就树化
- 返回oldVal
# get
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// hash值
int h = spread(key.hashCode());
// 如果节点数组为空就直接返回null,还没初始化
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果hash值相等 并且key相等 直接返回节点的值
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果hash值小于0,说明是红黑树或则是转义节点(扩容中、临时hash)
else if (eh < 0)
// 就去寻找有没有对应的 有就返回旧值 否则返回null
return (p = e.find(h, key)) != null ? p.val : null;
// 这里就是链表
while ((e = e.next) != null) {
// 遍历链表查找是否有相等的hash和key
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get流程
- 计算hash值
- 找到对应的位置,根据情况进行查找
- 直接取值
- 红黑树里找值
- 遍历链表取值
- 返回找到的结果
# 正确使用ConcurrentHashMap
反例
public class OptionsNotSafe implements Runnable{
private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
Integer score = scores.get("starry");
// 线程不安全
Integer newScore = score + 1;
scores.put("starry", newScore);
}
}
public static void main(String[] args) throws InterruptedException {
scores.put("starry", 0);
Thread thread1 = new Thread(new OptionsNotSafe());
Thread thread2 = new Thread(new OptionsNotSafe());
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(scores.get("starry"));
}
}
修改代码使多个组合操作线程安全
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
while (true) {
Integer score = scores.get("starry");
// 线程不安全
Integer newScore = score + 1;
/*
* 虽然get和put线程安全的,但是业务代码是不安全的
* 此时就要使用replace,来保证线程安全,替换先比较旧值是否相等
*/
boolean flag = scores.replace("starry", score, newScore);
if (flag) {
break;
}
}
}
}
# CopyOnWriteArrayList
ArrayList缺点
- 遍历时不能修改
- 线程不安全
# 读写规则
回顾读写锁:读读共享、读写互斥、写写互斥
CopyOnWriteArrayList读写规则:读取不加锁,写入也不会阻塞。只有写入和写入之间需要进行同步等待
# 概要
CopyOnWrite容器是写时生成一个对象的副本,在副本上进行操作,操作完后把之前的对象指针指到副本上,以此来保证线程安全。读写分离
缺点:
- 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。如果我们希望写入的数据,马上能读到,就不要使用CopyOnWrite容器
- 内存占用问题:因为CopyOnWrite是写时复制,所以在进行写操作时,内存里会同时存在两个对象的内存
# 源码分析
# add
public boolean add(E e) {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取旧数组
Object[] elements = getArray();
// 旧数组长度
int len = elements.length;
// 复制旧数组 -> 生成新数组,新数组长度为旧数组长度+1
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 要添加的元素放到新数组的最后一个位置
newElements[len] = e;
// 新数组(副本)替换旧数组
setArray(newElements);
return true;
} finally {
// 解锁
lock.unlock();
}
}
# get
// 通过索引获取数据
public E get(int index) {
return get(getArray(), index);
}
// 直接根据索引在数组中查找并返回
private E get(Object[] a, int index) {
return (E) a[index];
}
# BlockingQueue
一个接口,表示阻塞队列,常用于数据共享的通道
主要方法:
- take:获取并移除队列的头结点,如果队列没有数据就一阻塞,直到有数据 put:插入元素。如果队列已满,一直阻塞,直到队列有空闲空间
- add:添加元素,如果队列满了,抛出异常 remove:异常元素,如果队列没有元素,抛出异常 element:返回队列的头元素,如果是空,也抛出异常
- offer:添加元素,返回boolean值,是否添加成功 poll:移除元素,返回boolean值,是否添加成功 peek:返回队列的头元素,如果是空,返回null
# ArrayBlockingQueue
public void put(E e) throws InterruptedException {
// 非空判断
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加可中断锁
lock.lockInterruptibly();
try {
// 如果当前队列长度=队列总长度 就阻塞(等待)
while (count == items.length)
notFull.await();
// 队列没满就添加
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}
# LinkedBlockingQueue
// 两把锁
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 如果当前节点数量=最大节点数量就等待(默认为Integer.MAX_VALUE)
while (count.get() == capacity) {
notFull.await();
}
// 添加元素
enqueue(node);
c = count.getAndIncrement();
// 添加完后小于最大数量
if (c + 1 < capacity)
// 唤醒
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
# PriorityBlockingQueue
- 支持优先级
- 自然顺序(不是先进先出)
- 无界队列
# SynchronousQueue
- 容量为0,直接传递
- peek方法没有意义直接返回null,因为没有存储元素
- Executors.newCachedThreadPool()使用的阻塞队列
# DelayQueue
- 延迟队列,根据延迟时间排序
- 元素需要实现Delayed接口,规定排序规则
# ConcurrentLinkedQueue
高效的非阻塞并发队列,使用链表实现。可以看作一个线程安全的LinkedList。
并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种。
顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景。用的相对比较少一些
# 并发流程控制
# Semaphore
package com.starry.concurrent.semaphoretest;
import java.util.concurrent.*;
/**
* @author starry
* @version 1.0
* @date 2022/1/9 16:45
* @Description
*/
public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 0, TimeUnit.SECONDS, new SynchronousQueue<>(true), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 100; i++) {
executor.execute(new Task());
}
executor.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
try {
// 获取信号量
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+ "拿到了许可证");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放信号量
System.out.println(Thread.currentThread().getName()+ "释放了许可证");
semaphore.release();
}
}
}
}
# CyclicBarrier
CyclicBarrier和CountDownLatch区别
作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0 ,也就是说CountDownLatch用于事件(等待数字0这个事件),但是CyclicBarrier是用于线程的(等待指定的线程数量)。
可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier可以重复使用。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("人员到齐,准备出发~~~");
});
for (int i = 0; i < 10; i++) {
new Thread(new Task(i,cyclicBarrier)).start();
}
}
static class Task implements Runnable {
private int id;
private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
Thread.sleep((long) (Math.random()*10000));
System.out.println("人员" + id + "到达集合地点");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
# CountDownLatch
用法一:一个线程等待多个线程都执行完毕,再继续自己的工作
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 5; i++) {
int finalI = i;
Runnable runnable = () -> {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println(finalI + "执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
executor.execute(runnable);
}
System.out.println("等待所有任务执行~~~");
countDownLatch.await();
System.out.println("所有任务执行完成");
executor.shutdown();
}
用法二:多个线程等待某一个线程的信号,同时开始执行
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 5; i++) {
int finalI = i;
Runnable runnable = () -> {
try {
System.out.println(finalI + "准备就绪");
countDownLatch.await();
System.out.println(finalI + "开始执行任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executor.execute(runnable);
}
Thread.sleep(5000);
System.out.println("准备开始~~~");
countDownLatch.countDown();
executor.shutdown();
}
结合用法一和用法二
public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(5);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 5; i++) {
int finalI = i;
Runnable runnable = () -> {
try {
System.out.println(finalI + "准备就绪");
begin.await();
System.out.println(finalI + "开始执行任务");
Thread.sleep((long) (Math.random()*10000));
System.out.println(finalI + "完成任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
};
executor.execute(runnable);
}
Thread.sleep(5000);
System.out.println("准备开始~~~");
begin.countDown();
end.await();
System.out.println("所有任务都执行完成");
executor.shutdown();
}
# Condition
public class ConditionTest {
private int queueSize = 10;
private Queue<Integer> queue = new PriorityQueue<>(queueSize);
private Lock lock = new ReentrantLock(true);
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
// 一直消费数据
while (true) {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("队列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费数据
queue.poll();
System.out.println("从队列取出一个数据,当前剩余"+queue.size());
// 唤醒线程
notFull.signalAll();
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
// 一直生产
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满了,等待消费数据");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 添加数据
queue.offer(1);
System.out.println("向队列添加一个元素,队列剩余空间:" + (queueSize - queue.size()));
// 唤醒线程
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
ConditionTest conditionTest = new ConditionTest();
Consumer consumer = conditionTest.new Consumer();
Producer producer = conditionTest.new Producer();
consumer.start();
producer.start();
}
}
# AQS
# 是什么
即AbstractQueuedSynchronizer
,用于构建锁、同步器、协作工具类的攻击类(框架)。有了AQS后,更多的协作工具都可以很方便的被写出来。
总结:有了AQS,构建线程协作类就容易多了。
# 核心部分
- state
- 控制线程抢锁和配合的FIFO队列
- 希望协作工具类去实现获取/释放等重要方法
# state
private volatile int state;
state是volatile修饰的,会被并发的修改,所有所有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态。
state的具体含义,会根据实现类的不同而不同。比如在Semaphore里,它表示”剩余的许可证数量“;而在CountDownLatch里,它表示”还需要倒数的数量“;在ReentrantLock中,state用来表示锁的占用情况,包括可重入计数。当state为0时,标识该Lock不被任何线程所占有
在ReentrantLock中,state用来表示锁的占有情况,包括可重入计数。当state的值为0的时候,标识该Lock不被任何线程占有。
# 控制线程抢锁和配合的FIFO队列
- 这个队列用来存放等待的线程,AQS就是排队管理器,当多个线程争用同一个把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁
- AQS会维护一个等待的线程队列,把线程都放到这个队列里
- 这是一个双向链表的队列
# 希望协作工具类去实现获取/释放等重要方法
这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同
获取方法
获取操作会依赖state变量,经常会阻塞(比如获取不到锁的时候)
- 在Semaphore中,获取就是acquire方法,作用是获取一个许可证
- 在CountDownLatch中,获取就是await方法,作用是等待,直到倒数结束
释放方法
释放操作不会阻塞
- 在Semaphore中,释放就是release方法,作用是释放一个许可证
- 在CountDownLatch中,释放就是countDown方法,作用是倒数一个数
tryAcquire和tryRelease等方法需要自己的线程协作类去实现
# ASQ应用实例
- 写一个类,想好协作的逻辑,实现获取/释放方法
- 内部写一个Sync类继承AbstractQueuedSynchronizer
- 根据是否独占来重写tryAcquire/tryRelease 或 tryAcquireShared(int acuires)和tryReleaseShared(int releases)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或则Shared方法
# CountDownLatch
# 构造函数
将传入的参数赋值给Sync内部类,Sync再设置到AQS的state上
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
# getCount
就是返回state的值
public long getCount() {
return sync.getCount();
}
# countDown
public void countDown() {
sync.releaseShared(1);
}
来到AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 唤醒等待的全部线程
doReleaseShared();
return true;
}
return false;
}
调用CountDownLatch重写的tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前state值
int c = getState();
if (c == 0)
return false;
// 计数-1
int nextc = c-1;
// cas设置值
if (compareAndSetState(c, nextc))
// 减一后的值为0才返回true
return nextc == 0;
}
}
tryReleaseShared 返回 true 后,就会执行 doReleaseShared(); 来唤醒所有等待的线程
# await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
来到 AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
CountDownLatch重写tryAcquireShared方法
当前数值是否为0,是返回1,不是返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
得到的结果是否小于0,小于0,进入等待队列;否则,继续执行
小于0,执行doAcquireSharedInterruptibly,当前线程进入等待队列
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 把当前线程包装成一个 Node
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
主要方法是parkAndCheckInterrupt(),挂起当前线程,进入阻塞状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.java
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
Unsafe.class
public native void park(boolean var1, long var2);
# 总结
- 调用CountDownLatch的await方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞
- 而”共享锁“可获取到的条件,就是”锁计数器“的值为0
- 而”锁计数器“的初始值为count,每个线程调用该CountDownLatch对象的countDown()方法时,才将”锁计数器“-1
- count个线程调用countDown()之后,”锁计数器“才为0,而前面提到的等待获取共享锁的线程才能继续允许
# Semaphore
# acquire
公平锁
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 小于0,进入队列阻塞
doAcquireSharedInterruptibly(arg);
}
调用Semaphore的实现
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 数量
int available = getState();
// 计算后的数量
int remaining = available - acquires;
// 如果计算后的数量小于0 或者 cas设置成功都返回计算后的数量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
然后根据返回的结果判断是否进入等待,小于0等待;否则进行允许
# 总结
- 在Semaphore中,state表示许可证的剩余数量
- 看tryAcquire方法,判断nonfairTryAcquireShared大于等于0的话,表示成功
- 这里会先检查剩余许可证数量够不够,用减法来计算。如果不够,返回负数,代表失败;如果够了,就自旋cas来设置state的值,直到改变成功就返回计算后的值;或者期间被其他线程修改导致剩余数量不够,那也返回负数代表获取失败。
# ReentrantLock
非公平锁
# tryRelease
protected final boolean tryRelease(int releases) {
// 释放一次锁后的state值
int c = getState() - releases;
// 是否当前线程持有锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 本次释放完锁后state为0
if (c == 0) {
free = true;
// 当前持有锁的线程设置为null
setExclusiveOwnerThread(null);
}
// 不用cas,因为只有当前线程才能释放
setState(c);
// 只有重入的锁都释放完了才返回true,否则返回false
return free;
}
AbstractQueuedSynchronizer.java
public final boolean release(int arg) {
// 上面返回的结果
if (tryRelease(arg)) {
// 释放成功
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒节点的后继者(如果存在)
unparkSuccessor(h);
return true;
}
return false;
}
上面就是unlock的实现。
# acquire
final void lock() {
// cas把state值从0改为1
if (compareAndSetState(0, 1))
// 设置当前拥有独占访问权限的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// cas修改失败
acquire(1);
}
AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
// 尝试获取锁,如果获取失败放入等待队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock.java
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// cas 更新
if (compareAndSetState(0, acquires)) {
// 设置锁为当前线程持有
setExclusiveOwnerThread(current);
return true;
}
}
// 重入了
else if (current == getExclusiveOwnerThread()) {
// 当前重入次数+1(默认)
int nextc = c + acquires;
// int最大值+1,发生溢出
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新重入次数
setState(nextc);
return true;
}
return false;
}
根据返回结果,返回false取反,true,将当前线程放入等待队列;返回true取反,false,继续运行
public final void acquire(int arg) {
// 尝试获取锁,如果获取失败放入等待队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
# 总结
加解锁要判断是不是当前线程 和 是否重入。
# 手写一个
模拟一个CountDownLatch
public class OneShotLatch {
/**
* 自定义latch 假设:state 1放行,0等待
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 小于 0 放入阻塞队列
* @param arg
* @return
*/
@Override
protected int tryAcquireShared(int arg) {
return getState() == 1 ? 1 : -1;
}
/**
* true 继续执行,false 继续阻塞
* @param arg
* @return
*/
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
/**
* 阻塞当前线程
*/
public void await() {
// 调用父类acquireShared方法,找到我们写的实现类tryAcquireShared方法
sync.acquireShared(0);
}
/**
* 唤醒全部等待线程
*/
public void signal() {
sync.releaseShared(0);
}
public static void main(String[] args) throws InterruptedException {
OneShotLatch latch = new OneShotLatch();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"开始等待"+new Date());
latch.await();
System.out.println(Thread.currentThread().getName()+"继续运行"+new Date());
}).start();
}
Thread.sleep(3000);
// 唤醒全部线程
latch.signal();
// 一次性latch,后续线程都不用等待
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"开始等待"+new Date());
latch.await();
System.out.println(Thread.currentThread().getName()+"继续运行"+new Date());
}).start();
}
}
# Future
Callable和Future的关系
- 我们可以用Future.get来获取Callable接口返回的执行结果,还可以通过Future.isDone()来判断任务是否已经执行完了,以及取消这个任务,限时获取任务的结果等等
- 在call()未执行完毕前,调用get()的线程(假定此时是主线程)会被阻塞,直到call()方法返回了结果后,此时future.get()才会得到该结果,然后主线程才会切换到runnable状态
- 所以Future是一个容器,它存储了call()这个任务的结果,而这个任务的执行时间是无法提取确定的,因为这完全取决于call()方法执行的情况
主要方法
- get():获取结果,get的结果取决于Callable任务的状态
- 任务正常完成,get方法会立刻返回结果
- 任务尚未完成(任务还没开始或进行中),get将阻塞直到任务完成
- 任务执行过程中抛出异常,get方法会抛出ExecutionException:不论call()执行时抛出的异常类型是什么,最后get方法抛出的异常类型都是ExecutionException
- 任务被取消,get方法会抛出CancellationException
- 任务超时,get方法有一个重载方法,是传入一个延迟时间,如果时间到了还没有结果返回,get方法就会抛出TimeoutException get(long timeout, TimeUnit unit)
- cancel:取消任务的执行
- 如果这个任务还是没有执行,任务会被正常取消,未来也不会被执行,方法返回true
- 如果任务已经完成,或者已取消,那么cancel会执行失败,返回false
- 如果这个任务已经开始执行了,那么这个取消方法将不会直接取消该方法,而是根据我们传入的参数_mayInterruptIfRunning_做判断
- true:对执行中的任务发出中断信号,任务可以选择中断或者不中断,主动权在任务,我们只是先它发出信号。
- false:不会对执行中的任务发出中断信号,对任务没有任何影响,任务继续执行
Future.cancel(true):适用于,我们知道任务有能力正确处理中断
Future.cancel(false):适用于
- 任务没有能力处理中断
- 不清楚任务是否支持取消
- isDone:方法是否执行完毕
- isCancelled:任务是否被取消
简单使用
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(5);
Future<Integer> future = pool.submit(() -> {
Thread.sleep(3000);
return new Random().nextInt();
});
System.out.println(future.get());
pool.shutdown();
}
批量接收结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
List<Future> list = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Future<Integer> future = pool.submit(() -> {
Thread.sleep(1000);
return new Random().nextInt();
});
list.add(future);
}
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i).get());
}
pool.shutdown();
}
异常演示
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> future = pool.submit(() -> {
throw new NullPointerException();
});
for (int i = 0; i < 3; i++) {
Thread.sleep(1000);
System.out.println(i);
}
System.out.println("任务执行完成:"+future.isDone());
try {
future.get();
} catch (NullPointerException e1) {
e1.printStackTrace();
System.out.println("NullPointerException异常");
} catch (ExecutionException e2) {
e2.printStackTrace();
System.out.println("ExecutionException异常");
}
pool.shutdown();
}
输出结果
0
1
2
任务执行完成:true
ExecutionException异常
任务一开始就抛出异常,但是没有get线程就能继续执行,只有在get时才会抛出异常,且异常是ExecutionException;isDone只是判断任务是否完成,不论成功还是失败(异常)。
演示cancel
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(3);
Future<String> future = pool.submit(() -> {
try {
Thread.sleep(3000);
System.out.println("正常执行");
return "ok";
} catch (InterruptedException e) {
System.out.println("处理中断~~~");
return "被中断了";
}
});
Thread.sleep(1000);
future.cancel(true);
// future.cancel(false);
pool.shutdown();
}
FutureTask
前面都是通过线程池的submit来得到Future,还是可以使用FutureTask来创建Future
FutureTask继承了Runnable和Future
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(3000);
return "ok";
});
pool.submit(futureTask);
System.out.println(futureTask.get());
}