1. 生产者和消费者模式概述
生产者和消费者模式是一种经典的并发设计模式,用于解决生产者和消费者之间数据共享问题。它主要涉及三个角色:
-
生产者 (Producer) :负责生产数据,例如将数据写入文件、读取数据库数据等。
-
消费者 (Consumer) :负责消费数据,例如将数据从文件中读取出来、对数据进行处理等。
-
缓冲区 (Buffer) :用于存放生产者生产的数据,消费者可以从缓冲区中获取数据。
2. 生产者和消费者案例
案例描述:模拟一个仓库管理系统,生产者负责生产商品,消费者负责消费商品。
代码示例:
```java
// 商品类
class Product {
private String name;
private int count;
public Product(String name, int count) {
this.name = name;
this.count = count;
}
public String getName() {
return name;
}
public int getCount() {
return count;
}
}
// 生产者
class Producer implements Runnable {
private final List buffer; // 缓冲区
private final int capacity; // 缓冲区容量
public Producer(List buffer, int capacity) {
this.buffer = buffer;
this.capacity = capacity;
}
@Override
public void run() {
while (true) {
// 生产商品
Product product = new Product("商品", 10);
// 等待缓冲区有空位
synchronized (buffer) { // 使用同步块保证线程安全
while (buffer.size() == capacity) {
try {
buffer.wait(); // 等待缓冲区有空位
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产商品入库
buffer.add(product);
System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
buffer.notifyAll(); // 唤醒所有等待的消费者线程
}
try {
Thread.sleep(1000); // 模拟生产时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 消费者
class Consumer implements Runnable {
private final List buffer; // 缓冲区
public Consumer(List buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
// 等待缓冲区有商品
synchronized (buffer) { // 使用同步块保证线程安全
while (buffer.isEmpty()) {
try {
buffer.wait(); // 等待缓冲区有商品
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费商品
Product product = buffer.remove(0);
System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
buffer.notifyAll(); // 唤醒所有等待的生产者线程
}
try {
Thread.sleep(1000); // 模拟消费时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
// 初始化缓冲区
List buffer = new ArrayList<>();
// 设置缓冲区容量
int capacity = 5;
// 创建生产者和消费者线程
Producer producer = new Producer(buffer, capacity);
Consumer consumer = new Consumer(buffer);
// 启动线程
new Thread(producer).start();
new Thread(consumer).start();
}
}
```
解释:
-
synchronized 块: synchronized 块用于保证线程安全,确保同一时刻只有一个线程可以访问共享资源 buffer。
-
wait() 方法: 当生产者发现缓冲区已满,或消费者发现缓冲区为空时,它们会调用 wait() 方法进入等待状态。 进入等待状态的线程会释放锁,允许其他线程访问共享资源。
-
notifyAll() 方法: 当生产者生产完商品,或消费者消费完商品后,它们会调用 notifyAll() 方法唤醒所有在 buffer 上等待的线程。 唤醒后,等待线程会尝试再次获取锁,并继续执行。
3. 生产者和消费者案例优化
问题: 上面的案例中,生产者和消费者都使用了 wait() 和 notifyAll() 方法进行同步,可能会导致虚假唤醒问题。 当线程被 notifyAll() 唤醒后,它并不能保证唤醒的原因是缓冲区状态发生了变化。
解决方案:使用条件变量来解决虚假唤醒问题。 条件变量允许线程等待特定条件的满足,避免了虚假唤醒问题。
代码示例:
```java
// 生产者
class Producer {
private final List buffer; // 缓冲区
private final int capacity; // 缓冲区容量
private final Condition notFull; // 缓冲区未满条件
private final Condition notEmpty; // 缓冲区非空条件
public Producer(List buffer, int capacity, Condition notFull, Condition notEmpty) {
this.buffer = buffer;
this.capacity = capacity;
this.notFull = notFull;
this.notEmpty = notEmpty;
}
public void produce() {
// 生产商品
Product product = new Product("商品", 10);
// 等待缓冲区有空位
synchronized (buffer) { // 使用同步块保证线程安全
while (buffer.size() == capacity) {
try {
notFull.await(); // 等待缓冲区有空位
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产商品入库
buffer.add(product);
System.out.println("生产者生产了商品:" + product.getName() + ", 库存数量:" + product.getCount());
notEmpty.signalAll(); // 唤醒所有等待的消费者线程
}
}
}
// 消费者
class Consumer {
private final List buffer; // 缓冲区
private final Condition notFull; // 缓冲区未满条件
private final Condition notEmpty; // 缓冲区非空条件
public Consumer(List buffer, Condition notFull, Condition notEmpty) {
this.buffer = buffer;
this.notFull = notFull;
this.notEmpty = notEmpty;
}
public void consume() {
// 等待缓冲区有商品
synchronized (buffer) { // 使用同步块保证线程安全
while (buffer.isEmpty()) {
try {
notEmpty.await(); // 等待缓冲区有商品
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费商品
Product product = buffer.remove(0);
System.out.println("消费者消费了商品:" + product.getName() + ", 库存数量:" + product.getCount());
notFull.signalAll(); // 唤醒所有等待的生产者线程
}
}
}
public class ProducerConsumerOptimized {
public static void main(String[] args) {
// 初始化缓冲区
List buffer = new ArrayList<>();
// 设置缓冲区容量
int capacity = 5;
// 创建锁
Lock lock = new ReentrantLock();
// 创建条件变量
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
// 创建生产者和消费者线程
Producer producer = new Producer(buffer, capacity, notFull, notEmpty);
Consumer consumer = new Consumer(buffer, notFull, notEmpty);
// 启动线程
new Thread(producer::produce).start();
new Thread(consumer::consume).start();
}
}
```
解释:
-
ReentrantLock 类: ReentrantLock 类提供了更灵活的锁机制,可以替代传统的 synchronized 块。
-
Condition 类: Condition 类用于创建条件变量,它与 ReentrantLock 关联。 每个条件变量对应一个特定的条件,线程可以通过 await() 方法等待该条件满足,并通过 signalAll() 方法唤醒所有等待该条件的线程。
4. 阻塞队列的基本使用
阻塞队列是一种线程安全的队列,它可以实现生产者和消费者之间的同步。 阻塞队列内部使用锁和条件变量来实现线程安全和等待唤醒机制。
代码示例:
```java
import java.util.concurrent.ArrayBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// 创建一个容量为 5 的阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue<>(5);
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 将元素添加到队列
queue.put("元素 " + i); // 如果队列已满,则会阻塞直到有空间
System.out.println("生产者添加了元素:" + "元素 " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 消费者线程
new Thread(() -> {
while (true) {
try {
// 从队列获取元素
String element = queue.take(); // 如果队列为空,则会阻塞直到有元素
System.out.println("消费者消费了元素:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
```
解释:
-
ArrayBlockingQueue 类: ArrayBlockingQueue 类是一个基于数组实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。
-
put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。
-
take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。
5. 阻塞队列实现等待唤醒机制
阻塞队列内部使用锁和条件变量来实现等待唤醒机制,保证线程之间的安全同步。
-
put() 方法 : 当队列已满时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被消费后,消费者会调用 signalAll() 方法,唤醒所有在条件变量上等待的生产者线程。
-
take() 方法 : 当队列为空时,线程会调用 await() 方法,阻塞在条件变量上。当有元素被生产后,生产者会调用 signalAll() 方法,唤醒所有在条件变量上等待的消费者线程。
代码示例:
```java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
// 生产者
class ProducerBlockingQueue {
private final BlockingQueue queue; // 阻塞队列
public ProducerBlockingQueue(BlockingQueue queue) {
this.queue = queue;
}
public void produce() {
while (true) {
try {
// 生产商品
String product = "商品";
// 将元素添加到队列
queue.put(product); // 如果队列已满,则会阻塞生产者线程
System.out.println("生产者生产了商品:" + product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 消费者
class ConsumerBlockingQueue {
private final BlockingQueue queue; // 阻塞队列
public ConsumerBlockingQueue(BlockingQueue queue) {
this.queue = queue;
}
public void consume() {
while (true) {
try {
// 消费商品
String product = queue.take(); // 如果队列为空,则会阻塞消费者线程
System.out.println("消费者消费了商品:" + product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class BlockingQueueWaitNotify {
public static void main(String[] args) {
// 创建一个阻塞队列
BlockingQueue queue = new LinkedBlockingQueue<>();
// 创建生产者和消费者线程
ProducerBlockingQueue producer = new ProducerBlockingQueue(queue);
ConsumerBlockingQueue consumer = new ConsumerBlockingQueue(queue);
// 启动线程
new Thread(producer::produce).start();
new Thread(consumer::consume).start();
}
}
```
解释:
-
LinkedBlockingQueue 类: LinkedBlockingQueue 类是一个基于链表实现的阻塞队列,它提供了 put() 和 take() 方法来实现生产者和消费者之间的同步。
-
put() 方法: 如果队列已满,put() 方法会阻塞生产者线程,直到队列中有空位。
-
take() 方法: 如果队列为空,take() 方法会阻塞消费者线程,直到队列中有元素。
总结:
生产者和消费者模式是一种常用的并发设计模式,它可以有效地解决生产者和消费者之间的数据共享问题。 阻塞队列是实现生产者和消费者模式的便捷工具,它可以简化代码编写,提高代码可读性。 通过使用阻塞队列,我们可以避免使用复杂的同步机制,从而降低开发成本和维护成本。希望对各位看官有所帮助,感谢各位看官的观看,下期见,谢谢~
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/5278.html