掌握多线程、锁机制、线程池的核心原理
并发编程是 Java 高级开发的必备技能,也是面试的重点难点。本篇系统梳理并发编程的核心知识。
本篇重点围绕并发编程的核心知识,帮助你系统掌握:
主要区别:
六种状态:
Thread thread = new Thread(() -> {
System.out.println("Running");
});
System.out.println(thread.getState()); // NEW
thread.start();
System.out.println(thread.getState()); // RUNNABLE// 方式1:继承 Thread 类
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running");
}
}
new MyThread().start();
// 方式2:实现 Runnable 接口(推荐)
Runnable task = () -> System.out.println("Runnable running");
new Thread(task).start();
// 方式3:实现 Callable 接口(有返回值)
Callable callable = () -> "Callable result";
FutureTask futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
String result = futureTask.get(); // 获取返回值
// 方式4:线程池(推荐)
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(() -> System.out.println("Pool running"));
executor.shutdown(); // sleep:不释放锁
synchronized (lock) {
Thread.sleep(1000); // 持有锁,其他线程无法进入
}
// wait:释放锁
synchronized (lock) {
lock.wait(); // 释放锁,其他线程可以进入
// 被 notify() 唤醒后,重新获取锁
}底层实现:基于 Monitor(监视器锁)
// 字节码层面
monitorenter // 进入同步块
monitorexit // 退出同步块锁升级过程(JDK 1.6+):
三种使用方式:
// 1. 修饰实例方法:锁是当前实例对象
public synchronized void method() { }
// 2. 修饰静态方法:锁是当前类的 Class 对象
public static synchronized void method() { }
// 3. 修饰代码块:锁是指定对象
synchronized (lock) {
// 临界区代码
}// synchronized:自动释放
synchronized (lock) {
// 自动释放锁
}
// ReentrantLock:手动释放
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 临界区代码
} finally {
lock.unlock(); // 必须在 finally 中释放
}
// ReentrantLock 高级功能
// 1. 可中断
lock.lockInterruptibly();
// 2. 尝试获取锁
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// 获取锁成功
} finally {
lock.unlock();
}
}
// 3. 公平锁
ReentrantLock fairLock = new ReentrantLock(true);
// 4. 多个条件变量
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();使用场景:
死锁:两个或多个线程互相持有对方需要的锁,导致永久阻塞。
// 死锁示例
Object lock1 = new Object();
Object lock2 = new Object();
// 线程1
new Thread(() -> {
synchronized (lock1) {
Thread.sleep(100);
synchronized (lock2) { // 等待 lock2
System.out.println("Thread 1");
}
}
}).start();
// 线程2
new Thread(() -> {
synchronized (lock2) {
Thread.sleep(100);
synchronized (lock1) { // 等待 lock1
System.out.println("Thread 2");
}
}
}).start();死锁的四个必要条件:
避免死锁的方法:
// 避免死锁:固定加锁顺序
Object lock1 = new Object();
Object lock2 = new Object();
// 所有线程都按 lock1 -> lock2 的顺序加锁
synchronized (lock1) {
synchronized (lock2) {
// 不会死锁
}
}ThreadPoolExecutor 七大参数:
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
) 四种拒绝策略:
// 自定义线程池(推荐)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活60秒
new LinkedBlockingQueue<>(100), // 队列容量100
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);参数配置建议:
两大作用:
// 不使用 volatile:可能出现可见性问题
class Task {
private boolean flag = false;
public void stop() {
flag = true; // 线程1修改
}
public void run() {
while (!flag) { // 线程2可能看不到修改
// 可能死循环
}
}
}
// 使用 volatile:保证可见性
class Task {
private volatile boolean flag = false;
public void stop() {
flag = true; // 线程1修改
}
public void run() {
while (!flag) { // 线程2立即可见
// 正常退出
}
}
}使用场景:
count++ 不是原子操作,需要用 AtomicInteger。CAS(Compare And Swap):比较并交换,是一种无锁算法。
// CAS 伪代码
boolean compareAndSwap(int expectedValue, int newValue) {
if (currentValue == expectedValue) {
currentValue = newValue;
return true;
}
return false;
}
// AtomicInteger 使用 CAS
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet(); // 原子操作,底层使用 CAS
// 手动使用 CAS
int oldValue, newValue;
do {
oldValue = count.get();
newValue = oldValue + 1;
} while (!count.compareAndSet(oldValue, newValue));CAS 的三大问题:
AQS(AbstractQueuedSynchronizer):抽象队列同步器,是 JUC 并发包的基础框架。
核心思想:
state 表示同步状态(0表示未锁定,1表示锁定)基于 AQS 的实现:
// AQS 使用示例:自定义锁
class MyLock {
private final Sync sync = new Sync();
static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}ThreadLocal:线程本地变量,每个线程都有自己的副本。
// ThreadLocal 使用示例
ThreadLocal threadLocal = new ThreadLocal<>();
// 线程1
new Thread(() -> {
threadLocal.set("Thread 1");
System.out.println(threadLocal.get()); // Thread 1
}).start();
// 线程2
new Thread(() -> {
threadLocal.set("Thread 2");
System.out.println(threadLocal.get()); // Thread 2
}).start();
// 使用完后要 remove,避免内存泄漏
threadLocal.remove(); 使用场景:
// CountDownLatch:等待多个任务完成
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println("Task done");
latch.countDown(); // 计数减1
}).start();
}
latch.await(); // 等待计数归0
System.out.println("All tasks done");
// CyclicBarrier:多线程到达屏障点后一起执行
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads ready");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println("Thread ready");
barrier.await(); // 等待其他线程
System.out.println("Thread continue");
}).start();
}使用场景:
Semaphore(信号量):控制同时访问某个资源的线程数量。
// 限制最多3个线程同时访问
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可证
System.out.println("Thread " + Thread.currentThread().getName() + " running");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可证
}
}).start();
}使用场景:
// 方式1:饿汉式(推荐,简单)
class Singleton {
private static final Singleton INSTANCE = new Singleton();
private Singleton() {}
public static Singleton getInstance() {
return INSTANCE;
}
}
// 方式2:双重检查锁定(DCL,推荐)
class Singleton {
private static volatile Singleton instance;
private Singleton() {}
public static Singleton getInstance() {
if (instance == null) { // 第一次检查
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查
instance = new Singleton();
}
}
}
return instance;
}
}
// 方式3:静态内部类(推荐,懒加载)
class Singleton {
private Singleton() {}
private static class Holder {
private static final Singleton INSTANCE = new Singleton();
}
public static Singleton getInstance() {
return Holder.INSTANCE;
}
}
// 方式4:枚举(最安全,防反射和序列化)
enum Singleton {
INSTANCE;
public void doSomething() {
// 业务方法
}
}JMM(Java Memory Model):定义了线程和主内存之间的抽象关系,规定了共享变量的访问规则。
JMM 三大特性:
happens-before 原则:
指令重排序:编译器和 CPU 为了优化性能,可能改变代码执行顺序。
// 原始代码
int a = 1; // 语句1
int b = 2; // 语句2
int c = a + b; // 语句3
// 可能被重排序为
int b = 2; // 语句2
int a = 1; // 语句1
int c = a + b; // 语句3
// DCL 单例中的重排序问题
instance = new Singleton();
// 实际执行可能是:
// 1. 分配内存空间
// 2. 将引用指向内存(此时 instance != null)
// 3. 初始化对象
// 如果 2 和 3 重排序,其他线程可能拿到未初始化的对象!防止重排序的方法:
读写锁:读读共享、读写互斥、写写互斥,适用于读多写少场景。
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReadLock readLock = rwLock.readLock();
WriteLock writeLock = rwLock.writeLock();
// 读操作:多线程可同时读
public String read() {
readLock.lock();
try {
return data;
} finally {
readLock.unlock();
}
}
// 写操作:独占访问
public void write(String newData) {
writeLock.lock();
try {
data = newData;
} finally {
writeLock.unlock();
}
}
// 锁降级:写锁 → 读锁(允许)
writeLock.lock();
try {
// 修改数据
readLock.lock(); // 获取读锁
} finally {
writeLock.unlock(); // 释放写锁,保持读锁
}
// 继续持有读锁...// 公平锁:按申请顺序获取锁
ReentrantLock fairLock = new ReentrantLock(true);
// 非公平锁:允许插队(默认)
ReentrantLock unfairLock = new ReentrantLock(false);
// 非公平锁性能更好的原因:
// 1. 减少线程切换开销
// 2. 新线程可能直接获取锁,无需唤醒等待线程可重入锁:同一线程可以多次获取同一把锁,不会死锁。
// 可重入锁示例
ReentrantLock lock = new ReentrantLock();
public void outer() {
lock.lock();
try {
inner(); // 同一线程再次获取锁
} finally {
lock.unlock();
}
}
public void inner() {
lock.lock(); // 可重入,不会死锁
try {
// 业务逻辑
} finally {
lock.unlock();
}
}
// synchronized 也是可重入的
public synchronized void method1() {
method2(); // 可重入
}
public synchronized void method2() {
// 同一线程已持有锁
}为什么需要可重入:
原理:基于 CAS(Compare And Swap)+ volatile 实现无锁并发。
// AtomicInteger 常用方法
AtomicInteger count = new AtomicInteger(0);
count.get(); // 获取值
count.set(10); // 设置值
count.incrementAndGet(); // ++i
count.getAndIncrement(); // i++
count.compareAndSet(10, 20); // CAS 操作
count.addAndGet(5); // 加5并返回
// LongAdder:高并发下性能更好
LongAdder adder = new LongAdder();
adder.increment(); // 递增
adder.add(10); // 加10
long sum = adder.sum(); // 获取总和
// AtomicStampedReference:解决 ABA 问题
AtomicStampedReference ref =
new AtomicStampedReference<>(100, 0);
int stamp = ref.getStamp();
ref.compareAndSet(100, 200, stamp, stamp + 1); // 优雅关闭线程池
executor.shutdown(); // 不再接受新任务,等待已提交任务完成
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 超时后强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
}ForkJoinPool:专为分治算法设计的线程池,适合递归任务。
// ForkJoinPool 使用示例:计算 1+2+...+n
class SumTask extends RecursiveTask {
private long start, end;
private static final long THRESHOLD = 1000;
public SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 小任务直接计算
long sum = 0;
for (long i = start; i <= end; i++) sum += i;
return sum;
}
// 大任务拆分
long mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork(); // 异步执行
right.fork();
return left.join() + right.join(); // 合并结果
}
}
ForkJoinPool pool = new ForkJoinPool();
Long result = pool.invoke(new SumTask(1, 1000000)); # 1. jps 查看 Java 进程 ID
jps -l
# 2. jstack 导出线程堆栈
jstack -l > thread_dump.txt
# 3. 搜索死锁信息
grep -A 50 "Found one Java-level deadlock" thread_dump.txt
# 4. 或使用 jconsole/jvisualvm 图形化工具 // 代码中检测死锁
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = bean.findDeadlockedThreads();
if (deadlockedThreads != null) {
ThreadInfo[] infos = bean.getThreadInfo(deadlockedThreads);
for (ThreadInfo info : infos) {
System.out.println("死锁线程: " + info.getThreadName());
}
}排查步骤:
// 令牌桶算法实现
class TokenBucket {
private long capacity; // 桶容量
private long tokens; // 当前令牌数
private long rate; // 令牌生成速率(个/秒)
private long lastTime; // 上次获取时间
public TokenBucket(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = capacity;
this.lastTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 计算新增令牌
tokens = Math.min(capacity,
tokens + (now - lastTime) * rate / 1000);
lastTime = now;
if (tokens >= 1) {
tokens--;
return true;
}
return false;
}
}
// 使用 Guava RateLimiter(推荐)
RateLimiter limiter = RateLimiter.create(100); // 100 QPS
if (limiter.tryAcquire()) {
// 处理请求
}CompletableFuture:JDK8 引入的异步编程工具,支持链式调用和组合操作。
// 1. 创建异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 2. 链式处理
future.thenApply(s -> s + " World") // 转换结果
.thenAccept(System.out::println) // 消费结果
.thenRun(() -> System.out.println("Done")); // 执行后续操作
// 3. 组合多个 Future
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> "B");
// 等待所有完成
CompletableFuture.allOf(f1, f2).join();
// 等待任一完成
CompletableFuture.anyOf(f1, f2).join();
// 合并两个结果
f1.thenCombine(f2, (a, b) -> a + b);
// 4. 异常处理
future.exceptionally(ex -> "Error: " + ex.getMessage())
.handle((result, ex) -> ex != null ? "Error" : result);
// 5. 超时控制(JDK9+)
future.orTimeout(1, TimeUnit.SECONDS)
.completeOnTimeout("Default", 1, TimeUnit.SECONDS); 应用场景:并行调用多个服务、异步任务编排、非阻塞 IO。