并发工具类
Lock接口
ReentrantLock
可重入锁,比synchronized更灵活。
java
Lock lock = new ReentrantLock();
public void method() {
lock.lock();
try {
// 业务代码
} finally {
lock.unlock(); // 必须在finally中释放锁
}
}
// 尝试获取锁
if (lock.tryLock()) {
try {
// 业务代码
} finally {
lock.unlock();
}
}
// 超时获取锁
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
// 业务代码
} finally {
lock.unlock();
}
}ReadWriteLock
读写锁,读读不互斥,读写、写写互斥。
java
ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
// 读操作
public String read() {
readLock.lock();
try {
return data;
} finally {
readLock.unlock();
}
}
// 写操作
public void write(String newData) {
writeLock.lock();
try {
data = newData;
} finally {
writeLock.unlock();
}
}并发容器
ConcurrentHashMap
线程安全的HashMap。
java
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.get("key");
map.remove("key");
// 原子操作
map.putIfAbsent("key", 1);
map.computeIfAbsent("key", k -> 1);CopyOnWriteArrayList
写时复制的ArrayList,适合读多写少场景。
java
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item");
list.get(0);
list.remove(0);
// 迭代时不会抛出ConcurrentModificationException
for (String item : list) {
System.out.println(item);
}BlockingQueue
阻塞队列,生产者-消费者模式的核心。
java
// ArrayBlockingQueue:有界队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// LinkedBlockingQueue:可选有界队列
BlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
// PriorityBlockingQueue:优先级队列
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
// 生产者
queue.put("item"); // 队列满时阻塞
queue.offer("item", 1, TimeUnit.SECONDS); // 超时返回
// 消费者
String item = queue.take(); // 队列空时阻塞
String item = queue.poll(1, TimeUnit.SECONDS); // 超时返回同步工具类
CountDownLatch
倒计时门闩,等待多个线程完成。
java
CountDownLatch latch = new CountDownLatch(3);
// 工作线程
new Thread(() -> {
// 执行任务
latch.countDown(); // 计数减1
}).start();
// 主线程等待
latch.await(); // 等待计数为0
System.out.println("所有任务完成");
// 使用场景:主线程等待所有子线程完成初始化CyclicBarrier
循环屏障,等待所有线程到达屏障点。
java
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达屏障点");
});
// 工作线程
new Thread(() -> {
// 执行任务
barrier.await(); // 等待其他线程
// 所有线程到达后继续执行
}).start();
// 使用场景:多个线程互相等待,到达同步点后一起继续执行Semaphore
信号量,控制同时访问资源的线程数量。
java
Semaphore semaphore = new Semaphore(3); // 允许3个线程同时访问
public void access() {
try {
semaphore.acquire(); // 获取许可
// 访问资源
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}
// 使用场景:限流,如数据库连接池Exchanger
线程间交换数据。
java
Exchanger<String> exchanger = new Exchanger<>();
// 线程1
new Thread(() -> {
try {
String data = exchanger.exchange("data1");
System.out.println("收到:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 线程2
new Thread(() -> {
try {
String data = exchanger.exchange("data2");
System.out.println("收到:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();原子类
基本类型原子类
java
AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet(); // i++
atomicInt.decrementAndGet(); // i--
atomicInt.addAndGet(5); // i += 5
atomicInt.compareAndSet(0, 1); // CAS操作
AtomicLong atomicLong = new AtomicLong(0);
AtomicBoolean atomicBoolean = new AtomicBoolean(false);数组类型原子类
java
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.set(0, 10);
array.incrementAndGet(0); // 数组[0]位置自增引用类型原子类
java
class User {
String name;
int age;
}
AtomicReference<User> atomicUser = new AtomicReference<>();
User user = new User();
atomicUser.set(user);
atomicUser.compareAndSet(user, new User());字段更新器
java
class Score {
volatile int score;
}
AtomicIntegerFieldUpdater<Score> updater =
AtomicIntegerFieldUpdater.newUpdater(Score.class, "score");
Score score = new Score();
updater.incrementAndGet(score); // 原子更新score字段实战案例
生产者-消费者模式
java
public class ProducerConsumerDemo {
private BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
// 生产者
class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
queue.put(i);
System.out.println("生产:" + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 消费者
class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println("消费:" + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void start() {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}并行任务执行
java
public class ParallelTaskDemo {
public static void main(String[] args) throws InterruptedException {
int taskCount = 5;
CountDownLatch latch = new CountDownLatch(taskCount);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("任务" + taskId + "开始");
Thread.sleep(2000);
System.out.println("任务" + taskId + "完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
System.out.println("所有任务完成");
executor.shutdown();
}
}💡 提示
这是一个demo文档,欢迎补充更多并发工具相关内容。