返回文章列表

Java JUC包

M
Moonpeak
| | 30 分钟

Java JUC(java.util.concurrent)包提供了丰富的并发工具类,是现代 Java 并发编程的核心。


Synchronized 锁实现

MonitorEnter & MonitorExit

锁的三种形式

锁类型目标对象字节码标识
实例方法当前实例 thisACC_SYNCHRONIZED
静态方法当前 Class 对象ACC_SYNCHRONIZED
同步代码块括号内指定的对象monitorenter / monitorexit

synchronized 锁原理

  1. 同步代码块:使用 monitorentermonitorexit 指令,依赖 Monitor 监视器(由 JVM 实现,C++ 编写的
    ObjectMonitor)
  2. 同步方法:通过 ACC_SYNCHRONIZED 访问标志,由 JVM 自动隐式调用 Monitor

锁升级过程(JDK 1.6+)

无锁 → 偏向锁 → 轻量级锁 → 重量级锁
CAS: 1. 通过比较期望值与实际值不断自旋,保证修改结果正确
2. 不会无限制自旋,超过次数自动阻塞【1.6以后优化措施】
ABA: 期望值对应的实际值通过修改后匹配,发生了结果符合,过程无法预知
锁状态适用场景特点
偏向锁只有一个线程访问同步块无 CAS,只需 Mark Word 记录线程 ID,约等于无锁
轻量级锁多个线程交替执行(无竞争)CAS 自旋,避免线程阻塞/唤醒开销
重量级锁多个线程同时竞争使用 Monitor,线程阻塞,需要用户态↔内核态切换
Tip

锁只能升级,不能降级(除偏向锁撤销)。偏向锁在 JDK 15 后默认禁用。

Mark Word 结构(64位 JVM)

锁状态62 bit2 bit
无锁hashCode + 分代年龄01
偏向锁线程ID + Epoch + 分代年龄01
轻量级锁指向栈中锁记录的指针00
重量级锁指向互斥量(Monitor)的指针10
GC标记11

锁图示 64 位 JVM Mark Word 位分布总览

| unused:25 | hash:31 | unused:1 | age:4 | biased_lock:0 | lock:01 // 无锁
| threadID:54| epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:01 // 偏向锁
| ptr_to_lock_record:62 | lock:00 // 轻量级锁
| ptr_to_monitor:62 | lock:10 // 重量级锁

AQS 抽象队列同步器

1. 核心组件

┌─────────────────────────────────────┐
│ AQS 抽象队列同步器 │
├─────────────────────────────────────┤
│ state: volatile int │ ← 资源状态
│ head: Node │ ← CLH 变体队列头
│ tail: Node │ ← CLH 变体队列尾
│ exclusiveOwnerThread: Thread │ ← 独占模式持有线程
└─────────────────────────────────────┘

2. 核心字段

字段说明
state同步状态,volatile 修饰,0=未锁定,>0=已锁定(可重入计数)
head / tail等待队列的头尾节点,虚拟头节点设计(头节点不存线程)
Node.waitStatus节点状态:CANCELLED(1)SIGNAL(-1)CONDITION(-2)PROPAGATE(-3)

3. AQS Java结构

static final class Node {
volatile int waitStatus; // 节点状态:SIGNAL/CANCELLED/CONDITION/PROPAGATE
volatile Node prev; // 前驱节点(用于检查状态)
volatile Node next; // 后继节点(用于唤醒)
volatile Thread thread; // 绑定的线程
Node nextWaiter; // 共享/独占模式标记 或 Condition 队列链接
}

关键设计:双向链表 + 状态驱动——前驱节点负责唤醒后继。

4. 两种模式

4.1 独占模式(Exclusive)

// 获取锁(子类需实现 tryAcquire)
acquire(int arg)
tryAcquire(arg)
addWaiter(Node.EXCLUSIVE)
acquireQueued()
// 释放锁(子类需实现 tryRelease)
release(int arg)
tryRelease(arg)
unparkSuccessor()

代表实现ReentrantLockReentrantReadWriteLock.WriteLock

4.2 共享模式(Shared)

// 获取共享锁
acquireShared(int arg)
tryAcquireShared(arg)
doAcquireShared()
// 释放共享锁(可能唤醒多个后继)
releaseShared(int arg)
tryReleaseShared(arg)
doReleaseShared()

代表实现SemaphoreCountDownLatchReentrantReadWriteLock.ReadLock

5. AQS 等待队列图示

┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│ head │ → │ Node │ → │ Node │ → │ Node │ → ...(等待获取锁)
│ 虚拟 │ │ T2 │ │ T3 │ │ T4 │
└──────┘ └──────┘ └──────┘ └──────┘
│ waitStatus = -1 (SIGNAL)
│ 表示释放锁时需唤醒后继

6. AQS 入队、出队

6.1 入队流程

// 简化流程:
1. 调用 tryAcquire() 尝试获取(子类实现)
2. 失败 → 创建 Node 节点
3. CAS 入队(加到队尾,可能重试)
4. 找到前驱,将前驱 waitStatus 设为 SIGNAL(表示"我后面有人,记得唤醒我"
5. LockSupport.park(this) 挂起自己

6.2 释放资源 → 唤醒

// 简化流程:
1. 调用 tryRelease() 释放资源(子类实现)
2. state 修改成功
3. 检查队头节点 waitStatus
4. 如果是 SIGNAL → unpark 后继线程
5. 被唤醒线程从 park 处恢复,再次 tryAcquire

ReentrantLock

1. 与 synchronized 对比

特性ReentrantLocksynchronized
锁类型API 级JVM 内置
可重入✅ 支持✅ 支持
公平性✅ 支持公平/非公平❌ 非公平
可中断lockInterruptibly()❌ 不可中断
超时获取tryLock(timeout)❌ 不支持
条件变量✅ 多个 Condition❌ 只有一个 wait/notify
性能JDK6+ 相近自动优化
释放要求必须手动 unlock()自动释放(JVM 保证)

可重入关键:通过 exclusiveOwnerThread 记录持有线程,同一线程再次获取时只需增加 state 计数。

2. 公平锁 vs 非公平锁

// 非公平锁(默认)- 吞吐量更高
ReentrantLock lock = new ReentrantLock();
// 公平锁 - 按请求顺序获取,避免饥饿
ReentrantLock fairLock = new ReentrantLock(true);

非公平锁优势

  • 刚释放锁时,新来的线程可以直接 CAS 尝试获取(减少上下文切换)
  • 吞吐量比公平锁高约 5-10 倍

公平锁适用场景

  • 需要避免线程饥饿
  • 持有锁时间较长的场景

3. 使用示例

class Counter {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
// 可重入:同一线程可再次获取锁
nestedMethod();
} finally {
lock.unlock(); // 必须在 finally 中释放
}
}
// 带超时的获取
public boolean tryIncrement(long timeout, TimeUnit unit) {
try {
if (lock.tryLock(timeout, unit)) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
private void nestedMethod() {
lock.lock(); // 可重入,不会死锁
try {
// do something
} finally {
lock.unlock();
}
}
}

Condition 条件队列

1. 与 Object.wait/notify 对比

特性ConditionObject.wait/notify
条件队列数量多个(每个 Condition 一个)只有一个
唤醒精准度signal() 唤醒指定条件队列notifyAll() 唤醒所有等待线程
阻塞位置await() 释放锁并阻塞wait() 释放锁并阻塞
等待位置条件队列对象监视器队列
组合使用必须与 Lock 配合使用必须与 synchronized 配合使用

2. 核心方法

方法说明
await()释放锁,进入等待状态,直到被 signal/interrupt
awaitNanos(long)带超时的等待,返回剩余纳秒数
awaitUntil(Date)等待到指定截止时间
awaitUninterruptibly()不可中断的等待
signal()唤醒一个等待线程(转移到 AQS 同步队列)
signalAll()唤醒所有等待线程

3. 经典用法:生产者-消费者

class BoundedBuffer<T> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object[] items;
private int putIndex, takeIndex, count;
public BoundedBuffer(int capacity) {
items = new Object[capacity];
}
public void put(T x) throws InterruptedException {
lock.lock();
try {
// 必须用 while,防止虚假唤醒
while (count == items.length) {
notFull.await(); // 满了,等待"非满"条件
}
items[putIndex] = x;
putIndex = (putIndex + 1) % items.length;
count++;
notEmpty.signal(); // 通知消费者可以取了
} finally {
lock.unlock();
}
}
@SuppressWarnings("unchecked")
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 空了,等待"非空"条件
}
T x = (T) items[takeIndex];
takeIndex = (takeIndex + 1) % items.length;
count--;
notFull.signal(); // 通知生产者可以放
return x;
} finally {
lock.unlock();
}
}
}

虚假唤醒:Java 的 Object.wait() 和 Condition.await() 底层都依赖操作系统的条件变量机制,又因多线程下可能会出现竞态信号从而导致莫名唤醒。

4. 线程状态流转

┌─────────────┐
│ 运行状态 │
└──────┬──────┘
│ lock.lock()
┌─────────────────────────────┐
│ 获取锁成功,执行业务 │
└─────────────┬───────────────┘
│ condition.await()
┌─────────────────────────────┐
│ 释放锁,加入条件等待队列 │ ← 线程状态:WAITING
└─────────────┬───────────────┘
│ condition.signal()
┌─────────────────────────────┐
│ 转移到 AQS 同步队列,竞争锁 │ ← 线程状态:BLOCKED
└─────────────┬───────────────┘
│ 获取锁成功
┌─────────────┐
│ 继续执行 │
└─────────────┘

常用同步工具类

1. Semaphore(信号量)- 限流控制

原理:控制同时访问某个资源的线程数量,基于 AQS 共享模式。

// 创建信号量,允许 10 个线程同时访问
Semaphore semaphore = new Semaphore(10);
// 获取许可(state - 1)
semaphore.acquire();
// 释放许可(state + 1)
semaphore.release();

使用场景

场景示例
数据库连接池限制最大连接数
API 限流控制并发请求数量
资源池管理对象池、线程池入口控制

代码示例

// 数据库连接池限流
class ConnectionPool {
private final Semaphore semaphore;
private final List<Connection> connections;
public ConnectionPool(int maxConnections) {
semaphore = new Semaphore(maxConnections);
connections = new ArrayList<>(maxConnections);
// 初始化连接...
}
public Connection borrow() throws InterruptedException {
semaphore.acquire(); // 获取许可
return connections.remove(connections.size() - 1);
}
public void release(Connection conn) {
connections.add(conn);
semaphore.release(); // 归还许可
}
}

2. CountDownLatch(倒计时门闩)- 等待多任务完成

原理:初始化一个计数器,countDown() 减一,直到为 0 时唤醒所有等待线程。

// 创建,计数器 = 3
CountDownLatch latch = new CountDownLatch(3);
// 每个任务完成时调用
latch.countDown(); // 计数器 - 1
// 主线程等待
latch.await(); // 阻塞,直到计数器 = 0

使用场景

场景说明
多线程任务汇总启动多个线程并行处理,等待全部完成
服务启动检查等待所有依赖服务就绪后才启动主服务
压测场景等待所有线程就绪后同时开始

代码示例

// 主服务等待多个依赖服务启动
public void startService() throws InterruptedException {
CountDownLatch readyLatch = new CountDownLatch(3);
// 启动数据库连接池
new Thread(() -> {
initDbPool();
readyLatch.countDown();
}).start();
// 启动缓存服务
new Thread(() -> {
initCache();
readyLatch.countDown();
}).start();
// 启动消息队列
new Thread(() -> {
initMQ();
readyLatch.countDown();
}).start();
// 等待所有依赖就绪
readyLatch.await();
System.out.println("所有依赖服务已就绪,启动主服务...");
}
Warning

CountDownLatch 不可重用,计数器到 0 后不能重置。如需重复使用,请用 CyclicBarrier 或 Phaser。


3. CyclicBarrier(循环屏障)- 多线程互相等待

原理:设置一个屏障点,线程到达后阻塞,直到所有线程都到达后才继续执行,可循环使用

// 创建屏障,3 个线程到达后触发
CyclicBarrier barrier = new CyclicBarrier(3);
// 每个线程到达屏障
barrier.await(); // 阻塞,等待其他线程
// 带超时的等待
barrier.await(10, TimeUnit.SECONDS);

CountDownLatch vs CyclicBarrier

特性CountDownLatchCyclicBarrier
等待方向一个/多个线程等待其他线程线程之间互相等待
可重用❌ 一次性✅ 自动重置
计数器操作其他线程调用 countDown()线程自己调用 await()
回调功能可设置 Runnable 在屏障触发时执行
异常处理线程中断/超时会破坏屏障

使用场景

场景示例
分阶段计算多线程分片处理,每阶段结束后汇总
并行测试所有测试线程准备就绪后同时开始
游戏回合制所有玩家操作完成后进入下一回合

代码示例

// 多线程分阶段计算
class ParallelCompute {
private final CyclicBarrier barrier;
private int[] data;
private int[] partialSums;
public ParallelCompute(int[] data, int threads) {
this.data = data;
this.partialSums = new int[threads];
// 屏障触发时,执行汇总操作
this.barrier = new CyclicBarrier(threads, () -> {
int total = 0;
for (int sum : partialSums) {
total += sum;
}
System.out.println("阶段完成,当前总和: " + total);
});
}
public void compute() {
int threadId = (int) Thread.currentThread().getId() % partialSums.length;
int chunkSize = data.length / partialSums.length;
int start = threadId * chunkSize;
int end = (threadId == partialSums.length - 1) ? data.length : start + chunkSize;
// 阶段1:计算部分和
for (int i = start; i < end; i++) {
partialSums[threadId] += data[i];
}
try {
barrier.await(); // 等待其他线程
// 阶段2:数据归一化(所有线程同步进入)
for (int i = start; i < end; i++) {
data[i] = data[i] * 2; // 示例操作
}
barrier.await(); // 再次等待
} catch (Exception e) {
e.printStackTrace();
}
}
}

其他重要工具类

ReadWriteLock(读写锁)

ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
// 读锁:共享,多个线程可同时获取
readLock.lock();
try {
// 读取操作
} finally {
readLock.unlock();
}
// 写锁:独占,与其他读写锁互斥
writeLock.lock();
try {
// 写入操作
} finally {
writeLock.unlock();
}

特点

  • 读读共享:多个线程可同时持有读锁
  • 读写互斥:读锁与写锁互斥
  • 写写互斥:写锁与写锁互斥
  • 锁降级:持有写锁的线程可以获取读锁(防止看到不一致数据)

StampedLock(戳记锁,JDK 8)

StampedLock lock = new StampedLock();
// 乐观读
long stamp = lock.tryOptimisticRead();
// 读取数据...
if (!lock.validate(stamp)) {
// 乐观读失败,转为悲观读
stamp = lock.readLock();
try {
// 重新读取
} finally {
lock.unlockRead(stamp);
}
}
// 写锁
stamp = lock.writeLock();
try {
// 写入操作
} finally {
lock.unlockWrite(stamp);
}

优势:比 ReadWriteLock 性能更好,支持乐观读(无锁读取)。

CompletableFuture(异步编程)

原理:基于 Future + CompletionStage,支持函数式编程风格的异步任务编排。

1. 创建异步任务

// 使用默认线程池(ForkJoinPool.commonPool)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "result";
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "result";
}, executor);
// 无返回值
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("异步执行");
});

2. 任务链式编排

CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 转换
.thenApply(String::toUpperCase) // 链式转换
.thenAccept(System.out::println); // 消费结果
// 异步版本(不阻塞主线程)
CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(s -> s + " World")
.thenAcceptAsync(System.out::println);

3. 组合多个任务

// thenCompose: 扁平化嵌套 Future(用于依赖关系)
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> "userId")
.thenCompose(userId -> fetchUser(userId)) // 返回 CompletableFuture<User>
// thenCombine: 合并两个独立的 Future
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combined = f1.thenCombine(f2, Integer::sum);
// allOf: 等待所有任务完成
CompletableFuture<Void> all = CompletableFuture.allOf(
CompletableFuture.runAsync(() -> task1()),
CompletableFuture.runAsync(() -> task2()),
CompletableFuture.runAsync(() -> task3())
);
// anyOf: 任意一个完成即可
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);

4. 异常处理

CompletableFuture.supplyAsync(() -> "result")
.thenApply(s -> Integer.parseInt(s)) // 可能抛出异常
.exceptionally(ex -> {
System.err.println("异常: " + ex.getMessage());
return 0; // 默认值
})
.thenApply(n -> n * 2)
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return "Success: " + result;
});

5. 获取结果

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");
// 阻塞等待(不推荐)
String result = future.get(); // 阻塞,等待结果或异常
String result = future.get(5, TimeUnit.SECONDS); // 带超时
// 非阻塞(推荐)
future.thenAccept(result -> System.out.println(result));
// join vs get: join 不抛出受检异常
String result = future.join();

使用场景

场景示例
异步调用异步调用第三方 API,避免阻塞
任务编排多个任务串联/并联执行
异步链支付流程:下单 → 扣库存 → 发消息
并行查询多个数据源并行查询后合并结果
Tip
  • 默认使用 ForkJoinPool.commonPool(),适合 CPU 密集型任务
  • IO 密集型任务建议使用自定义线程池
  • 记得最后添加异常处理,避免异常丢失

JUC 并发工具速查表

工具类核心方法适用场景注意事项
ReentrantLocklock/unlock需要更灵活锁控制必须在 finally 释放
Conditionawait/signal多条件等待唤醒与 Lock 配合使用
Semaphoreacquire/release限流、资源池注意释放次数
CountDownLatchawait/countDown等待多任务完成不可重用
CyclicBarrierawait多线程分阶段协作可重用,注意异常处理
ReadWriteLockreadLock/writeLock读多写少场景支持锁降级
StampedLocktryOptimisticRead高并发读场景不支持重入
CompletableFuturesupplyAsync/thenApply异步编程记得异常处理

Note

本文档持续更新,建议结合 JDK 源码深入学习 AQS 实现细节。