Skip to content

并发工具类

Lock接口

ReentrantLock

可重入锁,比synchronized更灵活。

java
Lock lock = new ReentrantLock();

public void method() {
    lock.lock();
    try {
        // 业务代码
    } finally {
        lock.unlock();  // 必须在finally中释放锁
    }
}

// 尝试获取锁
if (lock.tryLock()) {
    try {
        // 业务代码
    } finally {
        lock.unlock();
    }
}

// 超时获取锁
if (lock.tryLock(3, TimeUnit.SECONDS)) {
    try {
        // 业务代码
    } finally {
        lock.unlock();
    }
}

ReadWriteLock

读写锁,读读不互斥,读写、写写互斥。

java
ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();

// 读操作
public String read() {
    readLock.lock();
    try {
        return data;
    } finally {
        readLock.unlock();
    }
}

// 写操作
public void write(String newData) {
    writeLock.lock();
    try {
        data = newData;
    } finally {
        writeLock.unlock();
    }
}

并发容器

ConcurrentHashMap

线程安全的HashMap。

java
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

map.put("key", 1);
map.get("key");
map.remove("key");

// 原子操作
map.putIfAbsent("key", 1);
map.computeIfAbsent("key", k -> 1);

CopyOnWriteArrayList

写时复制的ArrayList,适合读多写少场景。

java
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

list.add("item");
list.get(0);
list.remove(0);

// 迭代时不会抛出ConcurrentModificationException
for (String item : list) {
    System.out.println(item);
}

BlockingQueue

阻塞队列,生产者-消费者模式的核心。

java
// ArrayBlockingQueue:有界队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// LinkedBlockingQueue:可选有界队列
BlockingQueue<String> queue = new LinkedBlockingQueue<>(100);

// PriorityBlockingQueue:优先级队列
BlockingQueue<String> queue = new PriorityBlockingQueue<>();

// 生产者
queue.put("item");  // 队列满时阻塞
queue.offer("item", 1, TimeUnit.SECONDS);  // 超时返回

// 消费者
String item = queue.take();  // 队列空时阻塞
String item = queue.poll(1, TimeUnit.SECONDS);  // 超时返回

同步工具类

CountDownLatch

倒计时门闩,等待多个线程完成。

java
CountDownLatch latch = new CountDownLatch(3);

// 工作线程
new Thread(() -> {
    // 执行任务
    latch.countDown();  // 计数减1
}).start();

// 主线程等待
latch.await();  // 等待计数为0
System.out.println("所有任务完成");

// 使用场景:主线程等待所有子线程完成初始化

CyclicBarrier

循环屏障,等待所有线程到达屏障点。

java
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程到达屏障点");
});

// 工作线程
new Thread(() -> {
    // 执行任务
    barrier.await();  // 等待其他线程
    // 所有线程到达后继续执行
}).start();

// 使用场景:多个线程互相等待,到达同步点后一起继续执行

Semaphore

信号量,控制同时访问资源的线程数量。

java
Semaphore semaphore = new Semaphore(3);  // 允许3个线程同时访问

public void access() {
    try {
        semaphore.acquire();  // 获取许可
        // 访问资源
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        semaphore.release();  // 释放许可
    }
}

// 使用场景:限流,如数据库连接池

Exchanger

线程间交换数据。

java
Exchanger<String> exchanger = new Exchanger<>();

// 线程1
new Thread(() -> {
    try {
        String data = exchanger.exchange("data1");
        System.out.println("收到:" + data);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

// 线程2
new Thread(() -> {
    try {
        String data = exchanger.exchange("data2");
        System.out.println("收到:" + data);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

原子类

基本类型原子类

java
AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet();  // i++
atomicInt.decrementAndGet();  // i--
atomicInt.addAndGet(5);       // i += 5
atomicInt.compareAndSet(0, 1);  // CAS操作

AtomicLong atomicLong = new AtomicLong(0);
AtomicBoolean atomicBoolean = new AtomicBoolean(false);

数组类型原子类

java
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.set(0, 10);
array.incrementAndGet(0);  // 数组[0]位置自增

引用类型原子类

java
class User {
    String name;
    int age;
}

AtomicReference<User> atomicUser = new AtomicReference<>();
User user = new User();
atomicUser.set(user);
atomicUser.compareAndSet(user, new User());

字段更新器

java
class Score {
    volatile int score;
}

AtomicIntegerFieldUpdater<Score> updater = 
    AtomicIntegerFieldUpdater.newUpdater(Score.class, "score");

Score score = new Score();
updater.incrementAndGet(score);  // 原子更新score字段

实战案例

生产者-消费者模式

java
public class ProducerConsumerDemo {
    private BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    
    // 生产者
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    queue.put(i);
                    System.out.println("生产:" + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 消费者
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take();
                    System.out.println("消费:" + item);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public void start() {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}

并行任务执行

java
public class ParallelTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        int taskCount = 5;
        CountDownLatch latch = new CountDownLatch(taskCount);
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("任务" + taskId + "开始");
                    Thread.sleep(2000);
                    System.out.println("任务" + taskId + "完成");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        System.out.println("所有任务完成");
        executor.shutdown();
    }
}

💡 提示

这是一个demo文档,欢迎补充更多并发工具相关内容。

相关链接

基于 VitePress 构建