动态

详情 返回 返回

Java 多線程核心技術:線程間通信三種經典方式詳解與實戰 - 动态 详情

一、為什麼需要線程間通信?

大家好!今天我們來聊聊多線程編程中的一個核心問題:線程間通信

想象一下這個場景:你開發了一個電商系統,一個線程負責接收用户下單請求,另一個線程負責庫存扣減,還有一個線程負責發送通知。這些線程之間如果無法協作,就像各自為戰的士兵,無法完成統一的任務。

線程間通信解決的核心問題是:

  • 線程協作:多個線程按照預定的順序執行任務
  • 數據共享:一個線程產生的數據,需要被另一個線程使用
  • 狀態同步:一個線程的狀態變化需要通知其他線程

Java 提供了多種線程間通信機制,今天我們重點介紹三種經典方式:

graph TD
    A[Java線程間通信機制] --> B[wait/notify機制]
    A --> C[Condition條件機制]
    A --> D[管道通信]
    A --> E[volatile變量通信]
    B --> F[Object類的方法]
    C --> G[ReentrantLock的配套工具]
    D --> H[PipedInputStream/PipedOutputStream]
    E --> I[可見性保證]

二、第一種:wait/notify 機制

2.1 核心原理

wait/notify 是 Java 最基礎的線程間通信機制,它們是 Object 類的方法,而不是 Thread 類的方法。這意味着任何對象都可以作為線程間通信的媒介

基本工作原理如下:

sequenceDiagram
    participant 線程A
    participant 共享對象
    participant 線程B
    線程A->>共享對象: 獲取鎖(synchronized)
    線程A->>共享對象: 檢查條件(while循環)
    線程A->>共享對象: wait()釋放鎖並進入等待狀態
    Note over 線程A,共享對象: 線程A釋放鎖,進入對象的等待隊列
    線程B->>共享對象: 獲取鎖(synchronized)
    線程B->>共享對象: 修改狀態
    線程B->>共享對象: notify()/notifyAll()通知等待線程
    Note over 線程B,共享對象: 線程B通知後繼續持有鎖直到同步塊結束
    線程B->>共享對象: 釋放鎖
    Note over 線程A,共享對象: 線程A被喚醒,重新獲取鎖
    共享對象-->>線程A: 重新獲取鎖
    線程A->>共享對象: 再次檢查條件(防止虛假喚醒)
    線程A->>共享對象: 條件滿足,繼續執行

2.2 核心方法説明

  • wait(): 讓當前線程進入等待狀態,並釋放對象鎖
  • wait(long timeout): 帶超時時間的等待
  • wait(long timeout, int nanos): 更精細的超時控制
  • notify(): 隨機喚醒一個在該對象上等待的線程
  • notifyAll(): 喚醒所有在該對象上等待的線程

2.3 使用規則

使用 wait/notify 有一些必須遵守的規則,否則會拋出 IllegalMonitorStateException 異常:

  1. 必須在 synchronized 同步塊或方法中調用
  2. 必須是同一個監視器對象
  3. wait 後必須使用循環檢查等待條件(避免虛假喚醒)

關於虛假喚醒,Java 官方文檔明確指出:

"線程可能在沒有被通知、中斷或超時的情況下被喚醒,這被稱為虛假喚醒。雖然這在實際中很少發生,但應用程序必須通過測試應該導致線程被喚醒的條件來防範它,並且如果條件不滿足則繼續等待。換句話説,等待應該總是發生在循環中。"

這是由操作系統線程調度機制決定的,不是 Java 的 bug。

2.4 生產者-消費者示例

下面是一個典型的生產者-消費者模式示例,通過 wait/notify 實現線程間協作:

public class WaitNotifyExample {
    private final Queue<String> queue = new LinkedList<>();
    private final int MAX_SIZE = 5;

    public synchronized void produce(String data) throws InterruptedException {
        // 使用while循環檢查條件,防止虛假喚醒
        while (queue.size() == MAX_SIZE) {
            System.out.println("隊列已滿,生產者等待...");
            this.wait(); // 隊列滿了,生產者線程等待
        }

        queue.add(data);
        System.out.println("生產數據: " + data + ", 當前隊列大小: " + queue.size());

        // 只通知消費者線程,避免不必要的喚醒
        this.notify(); // 在單生產者單消費者的情況下可以用notify提高效率
    }

    public synchronized String consume() throws InterruptedException {
        // 使用while循環檢查條件,防止虛假喚醒
        while (queue.isEmpty()) {
            System.out.println("隊列為空,消費者等待...");
            this.wait(); // 隊列空了,消費者線程等待
        }

        String data = queue.poll();
        System.out.println("消費數據: " + data + ", 當前隊列大小: " + queue.size());

        // 只通知生產者線程,避免不必要的喚醒
        this.notify(); // 在單生產者單消費者的情況下可以用notify提高效率
        return data;
    }

    // 對於多生產者多消費者的場景,應改用notifyAll避免線程飢餓
    public synchronized void produceMulti(String data) throws InterruptedException {
        while (queue.size() == MAX_SIZE) {
            System.out.println(Thread.currentThread().getName() + ": 隊列已滿,生產者等待...");
            this.wait();
        }

        queue.add(data);
        System.out.println(Thread.currentThread().getName() + ": 生產數據: " + data + ", 當前隊列大小: " + queue.size());

        // 當有多個生產者和消費者時,必須用notifyAll確保正確喚醒
        this.notifyAll();
    }

    public static void main(String[] args) {
        WaitNotifyExample example = new WaitNotifyExample();

        // 創建生產者線程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    example.produce("數據-" + i);
                    Thread.sleep(new Random().nextInt(1000));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 創建消費者線程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    example.consume();
                    Thread.sleep(new Random().nextInt(1000));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

2.5 notify()與 notifyAll()的選擇策略

何時使用 notify(),何時使用 notifyAll()?這是線程間通信中的重要決策:

  1. 使用 notify()的情況

    • 所有等待線程都是同質的(做相同任務)
    • 單一消費者/生產者模式
    • 性能敏感,且能確保不會導致線程飢餓
  2. 使用 notifyAll()的情況

    • 有多種不同類型的等待線程
    • 多生產者/多消費者模式
    • 安全性要求高於性能要求
    • 當不確定使用哪個更合適時(默認選擇)

2.6 常見問題與解決方案

  1. 虛假喚醒問題

    問題:線程可能在沒有 notify/notifyAll 調用的情況下被喚醒

    解決:始終使用 while 循環檢查等待條件,而不是 if 語句

  2. 死鎖風險

    如果生產者和消費者互相等待對方的通知,且都沒有收到通知,就會發生死鎖。可以考慮使用帶超時參數的 wait(timeout)方法,例如:

    // 超時等待,避免永久死鎖
    if (!condition) {
        this.wait(1000); // 最多等待1秒
    }
  3. 異常處理

    wait()方法會拋出 InterruptedException,需要適當處理:

    try {
        while (!condition) {
            object.wait();
        }
    } catch (InterruptedException e) {
        // 恢復中斷狀態,不吞掉中斷
        Thread.currentThread().interrupt();
        // 或者進行資源清理並提前返回
        return;
    }

三、第二種:Condition 條件變量

3.1 基本概念

Condition 是在 Java 5 引入的,它提供了比 wait/notify 更加靈活和精確的線程間控制機制。Condition 對象總是與 Lock 對象一起使用。

graph TD
    A[ReentrantLock] -->|創建| B[Condition]
    B -->|提供| C[await方法:等待]
    B -->|提供| D[signal方法:通知]
    B -->|提供| E[signalAll方法:通知所有]
    F[等待隊列1] --- B
    G[等待隊列2] --- B
    H[等待隊列3] --- B
    F --- I[精確喚醒]
    G --- I
    H --- I

3.2 Condition 接口的核心方法

  • await(): 類似於 wait(),釋放鎖並等待
  • await(long time, TimeUnit unit): 帶超時的等待
  • awaitUninterruptibly(): 不可中斷的等待
  • awaitUntil(Date deadline): 等待到指定的時間點
  • signal(): 類似於 notify(),喚醒一個等待線程
  • signalAll(): 類似於 notifyAll(),喚醒所有等待線程

3.3 分組喚醒原理

Condition 的核心優勢在於實現"精確通知"。與 wait/notify 使用同一個等待隊列不同,每個 Condition 對象管理着各自獨立的等待隊列

wait/notify: 所有線程在同一個等待隊列
┌─────────────────────┐
│ 對象監視器等待隊列    │
├─────────┬───────────┤
│ 線程A    │ 線程B     │
└─────────┴───────────┘

Condition: 每個Condition維護獨立的等待隊列
┌─────────────────────┐  ┌─────────────────────┐
│ Condition1等待隊列   │  │ Condition2等待隊列   │
├─────────┬───────────┤  ├─────────┬───────────┤
│ 線程A    │ 線程C     │  │ 線程B    │ 線程D     │
└─────────┴───────────┘  └─────────┴───────────┘

這種機制使得:

  • 生產者可以只喚醒消費者(而不是所有等待線程)
  • 清空操作可以只喚醒生產者(而不是消費者)
  • 不同類型的等待可以使用不同的 Condition

3.4 相比 wait/notify 的優勢

  1. 可以精確喚醒指定線程組:一個 Lock 可以創建多個 Condition 對象,實現分組喚醒
  2. 有更好的中斷控制:提供可中斷和不可中斷的等待
  3. 可以設置超時時間:更靈活的超時機制(支持時間單位)
  4. 可以實現公平鎖:使用 ReentrantLock 的公平性特性
  5. 通過獨立等待隊列實現精準喚醒:僅通知目標線程組,避免喚醒無關線程(如生產者不喚醒其他生產者),從而減少 CPU 資源浪費

3.5 精確通知示例

下面是一個使用 Condition 實現的生產者-消費者模式,支持精確通知:

public class ConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();  // 隊列不滿條件
    private final Condition notEmpty = lock.newCondition(); // 隊列不空條件

    private final Queue<String> queue = new LinkedList<>();
    private final int MAX_SIZE = 5;

    public void produce(String data) throws InterruptedException {
        lock.lock();
        try {
            // 隊列已滿,等待不滿條件
            while (queue.size() == MAX_SIZE) {
                System.out.println("隊列已滿,生產者等待...");
                notFull.await(); // 生產者在notFull條件上等待
            }

            queue.add(data);
            System.out.println("生產數據: " + data + ", 當前隊列大小: " + queue.size());

            // 通知消費者隊列不為空 - 精確通知,只喚醒消費者線程
            notEmpty.signal();
        } finally {
            // 必須在finally中釋放鎖,確保鎖一定被釋放
            lock.unlock();
        }
    }

    public String consume() throws InterruptedException {
        lock.lock();
        try {
            // 隊列為空,等待不空條件
            while (queue.isEmpty()) {
                System.out.println("隊列為空,消費者等待...");
                notEmpty.await(); // 消費者在notEmpty條件上等待
            }

            String data = queue.poll();
            System.out.println("消費數據: " + data + ", 當前隊列大小: " + queue.size());

            // 通知生產者隊列不滿 - 精確通知,只喚醒生產者線程
            notFull.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    // 使用可中斷鎖嘗試獲取數據,帶超時控制
    public String consumeWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {
        // 嘗試獲取鎖,可設置超時
        if (!lock.tryLock(timeout, unit)) {
            System.out.println("獲取鎖超時,放棄消費");
            return null;
        }

        try {
            // 使用超時等待
            if (queue.isEmpty() && !notEmpty.await(timeout, unit)) {
                System.out.println("等待數據超時,放棄消費");
                return null;
            }

            if (!queue.isEmpty()) {
                String data = queue.poll();
                System.out.println("消費數據: " + data + ", 當前隊列大小: " + queue.size());
                notFull.signal();
                return data;
            }
            return null;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionExample example = new ConditionExample();

        // 創建生產者線程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    example.produce("數據-" + i);
                    Thread.sleep(new Random().nextInt(1000));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 創建消費者線程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    example.consume();
                    Thread.sleep(new Random().nextInt(1000));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

3.6 使用 Condition 實現多條件協作

我們可以使用多個 Condition 實現更復雜的場景,比如一個緩衝區,有讀者、寫者和清理者三種角色:

public class MultiConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition writerCondition = lock.newCondition();   // 寫入條件
    private final Condition readerCondition = lock.newCondition();   // 讀取條件
    private final Condition cleanerCondition = lock.newCondition();  // 清理條件

    private boolean hasData = false;
    private boolean needClean = false;
    private String data;

    // 寫入數據
    public void write(String data) throws InterruptedException {
        lock.lock();
        try {
            // 已有數據或需要清理,等待
            while (hasData || needClean) {
                System.out.println(Thread.currentThread().getName() + " 等待寫入條件...");
                writerCondition.await();
            }

            this.data = data;
            hasData = true;
            System.out.println(Thread.currentThread().getName() + " 寫入數據: " + data);

            // 通知讀者可以讀取數據
            readerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    // 讀取數據
    public String read() throws InterruptedException {
        lock.lock();
        try {
            // 沒有數據或需要清理,等待
            while (!hasData || needClean) {
                System.out.println(Thread.currentThread().getName() + " 等待讀取條件...");
                readerCondition.await();
            }

            String result = this.data;
            hasData = false;
            needClean = true;
            System.out.println(Thread.currentThread().getName() + " 讀取數據: " + result);

            // 通知清理者可以清理
            cleanerCondition.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

    // 清理操作
    public void clean() throws InterruptedException {
        lock.lock();
        try {
            // 不需要清理,等待
            while (!needClean) {
                System.out.println(Thread.currentThread().getName() + " 等待清理條件...");
                cleanerCondition.await();
            }

            this.data = null;
            needClean = false;
            System.out.println(Thread.currentThread().getName() + " 清理完成");

            // 通知寫者可以寫入數據
            writerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    // 測試方法
    public static void main(String[] args) {
        MultiConditionExample example = new MultiConditionExample();

        // 寫入線程
        new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    example.write("數據-" + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "寫入線程").start();

        // 讀取線程
        new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    example.read();
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "讀取線程").start();

        // 清理線程
        new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    example.clean();
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "清理線程").start();
    }
}

3.7 Condition 與虛假喚醒

需要注意的是,Condition 的 await()方法同樣會發生虛假喚醒,與 wait()方法類似。虛假喚醒是所有線程等待機制的固有特性,而不是特定通知機制的缺陷。即使使用 Condition 的精確通知機制,仍然需要使用循環檢查等待條件:

// 正確使用Condition的方式
lock.lock();
try {
    while (!condition) {  // 使用循環檢查條件
        condition.await();
    }
    // 處理條件滿足的情況
} finally {
    lock.unlock();
}

Condition 雖然通過獨立的等待隊列減少了"無效喚醒"(非目標線程的喚醒),但無法消除操作系統層面的虛假喚醒可能性。

3.8 wait/notify 與 Condition 性能對比

雖然 Condition 在功能上更加強大,但實際性能與 wait/notify 非常接近,因為兩者底層都依賴於操作系統的線程阻塞機制。兩者的性能差異在高併發場景下可忽略,功能匹配度是首要考慮因素。主要區別在於:

  • Condition 需要顯式管理鎖:Lock.lock()和 lock.unlock(),增加了代碼複雜度
  • Condition 提供更多控制能力:超時、中斷、多條件等
  • Lock 支持非阻塞嘗試獲取鎖:tryLock()可避免長時間阻塞

選擇標準:功能需求應優先於性能考慮,複雜的線程間協作場景下首選 Condition。

四、第三種:管道通信

4.1 管道通信基本概念

Java IO 提供了管道流,專門用於線程間的數據傳輸,適用於單 JVM 內的線程間通信。主要涉及以下幾個類:

  • PipedOutputStream 和 PipedInputStream
  • PipedWriter 和 PipedReader

這些類構成了線程之間的管道通信通道,一個線程向管道寫入數據,另一個線程從管道讀取數據。

graph LR
    A[線程A] -->|寫入| B[PipedOutputStream]
    B -->|連接| C[PipedInputStream]
    C -->|讀取| D[線程B]
    E[線程C] -->|寫入| F[PipedWriter]
    F -->|連接| G[PipedReader]
    G -->|讀取| H[線程D]

4.2 管道通信的核心特性

  1. 阻塞機制

    • 管道緩衝區滿時,write()操作會阻塞
    • 管道緩衝區空時,read()操作會阻塞
    • 這一特性自動實現了生產者-消費者模式的流控制
  2. 字節流與字符流

    • 字節流:PipedInputStream/PipedOutputStream - 處理二進制數據
    • 字符流:PipedReader/PipedWriter - 處理文本數據(帶字符編碼)
  3. 內部緩衝區

    • 默認大小為 1024 字節
    • 可以在構造函數中指定緩衝區大小

4.3 管道通信的使用場景

管道通信特別適合於:

  • 需要傳輸原始數據或字符流的場景
  • 生產者-消費者模式中的數據傳輸
  • 多個處理階段之間的流水線處理
  • 日誌記錄器、數據過濾、實時數據處理

4.4 字節管道示例

下面是一個使用 PipedInputStream 和 PipedOutputStream 的示例:

public class PipedStreamExample {
    public static void main(String[] args) throws Exception {
        // 創建管道輸出流和輸入流
        PipedOutputStream output = new PipedOutputStream();
        PipedInputStream input = new PipedInputStream(output); // 直接在構造器中連接

        // 創建寫入線程
        Thread writerThread = new Thread(() -> {
            try {
                System.out.println("寫入線程啓動");
                for (int i = 1; i <= 10; i++) {
                    String message = "數據-" + i;
                    output.write(message.getBytes());
                    System.out.println("寫入: " + message);

                    // 如果註釋掉sleep,可能會因為管道緩衝區滿而阻塞
                    Thread.sleep(500);
                }
                // 關閉輸出流,表示不再寫入數據
                output.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // 創建讀取線程
        Thread readerThread = new Thread(() -> {
            try {
                System.out.println("讀取線程啓動");
                byte[] buffer = new byte[100]; // 小於完整消息長度,演示多次讀取
                int len;

                // read方法在管道沒有數據時會阻塞,直到有數據或管道關閉
                while ((len = input.read(buffer)) != -1) {
                    String message = new String(buffer, 0, len);
                    System.out.println("讀取: " + message);

                    // 如果註釋掉sleep,可能會因為消費太快而導致管道經常為空
                    Thread.sleep(1000);
                }
                input.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // 啓動線程
        writerThread.start();
        readerThread.start();
    }
}

4.5 字符管道示例

下面是使用 PipedWriter 和 PipedReader 的字符流管道示例:

public class PipedReaderWriterExample {
    public static void main(String[] args) throws Exception {
        // 創建管道寫入器和讀取器
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader(writer, 1024); // 指定緩衝區大小

        // 創建寫入線程
        Thread writerThread = new Thread(() -> {
            try {
                System.out.println("寫入線程啓動");
                for (int i = 1; i <= 10; i++) {
                    String message = "字符數據-" + i + "\n";
                    writer.write(message);
                    writer.flush(); // 確保數據立即寫入管道
                    System.out.println("寫入: " + message);
                    Thread.sleep(500);
                }
                writer.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // 創建讀取線程
        Thread readerThread = new Thread(() -> {
            try {
                System.out.println("讀取線程啓動");
                char[] buffer = new char[1024];
                int len;

                // 演示按行讀取
                BufferedReader bufferedReader = new BufferedReader(reader);
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    System.out.println("讀取一行: " + line);
                    Thread.sleep(700);
                }

                reader.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // 啓動線程
        writerThread.start();
        readerThread.start();
    }
}

4.6 管道通信的注意事項

  1. 管道流容量有限

    • 默認容量為 1024 字節
    • 寫入過多數據而沒有及時讀取,寫入方會阻塞
    • 讀取空管道時,讀取方會阻塞
  2. 連接機制

    • 使用前必須先調用 connect()方法連接兩個流,或在構造時指定
    • 多次 connect 會拋出異常
  3. 單向通信

    • 管道是單向的,需要雙向通信時需要創建兩對管道
    • 分清楚誰是生產者(寫入方),誰是消費者(讀取方)
  4. 關閉管理

    • 必須正確關閉管道流(在 finally 塊中)
    • 一端關閉後另一端會收到-1 或 null,表示流結束
  5. 線程安全性

    • 單個管道的寫入/讀取操作是線程安全的(即單個寫入線程和單個讀取線程無需額外同步)
    • 多個線程同時寫入/讀取同一個管道仍需外部同步
    • 管道不支持多寫多讀模式,設計上就是一個線程寫,一個線程讀

五、輔助通信方式:volatile 變量

5.1 volatile 基本原理

volatile 是 Java 提供的輕量級線程間通信機制,它保證了變量的可見性有序性,但不保證原子性。

graph LR
    A[線程A] -->|寫入| B[volatile變量]
    B -->|立即刷新到主內存| C[主內存]
    C -->|其他線程立即可見| D[線程B]
    D -->|讀取最新值| B

5.2 volatile 的實現原理

  1. 可見性保證

    • volatile 變量的寫操作會強制刷新到主內存
    • volatile 變量的讀操作會強制從主內存獲取最新值
    • 保證一個線程對變量的修改對其他線程立即可見
  2. 內存屏障

    • volatile 變量的讀寫操作會插入內存屏障指令,禁止指令重排序
    • 保證程序執行的有序性,防止編譯器和 CPU 的優化破壞併發安全
    • 在 x86 架構上,寫操作會生成鎖前綴指令(LOCK prefix)
  3. 無鎖機制

    • 不會導致線程阻塞
    • 比 synchronized 更輕量級,性能更好
    • 適合一寫多讀場景

5.3 volatile 與原子性

volatile 不保證原子性,這意味着:

// 以下操作在多線程環境中不安全,即使counter是volatile
volatile int counter = 0;
counter++; // 非原子操作:讀取-修改-寫入

對於需要原子性的場景,可以結合原子類使用:

// 使用原子類保證原子性
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet(); // 原子操作

或者使用 synchronized:

volatile int counter = 0;
synchronized void increment() {
    counter++; // 在同步塊中安全
}

5.4 使用 volatile 實現線程間通信示例

下面是一個使用 volatile 變量實現線程間通信的簡單例子:

public class VolatileCommunicationExample {
    // 使用volatile標記共享變量
    private static volatile boolean flag = false;
    private static volatile int counter = 0;

    public static void main(String[] args) throws InterruptedException {
        // 創建Writer線程
        Thread writerThread = new Thread(() -> {
            System.out.println("寫入線程開始運行");
            try {
                Thread.sleep(1000); // 模擬耗時操作
                counter = 100;     // 更新數據

                // volatile變量的寫操作會強制刷新到主內存
                // 其他線程的讀取會從主內存獲取最新值
                // 內存屏障確保以下操作不會被重排序到上面操作之前
                flag = true;       // 設置標誌位

                System.out.println("寫入線程完成數據更新: counter = " + counter);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 創建Reader線程
        Thread readerThread = new Thread(() -> {
            System.out.println("讀取線程開始運行");

            // 不使用sleep等待,volatile的可見性使其工作
            while (!flag) {
                // 自旋等待標誌位變化
                Thread.yield(); // 減少CPU佔用
            }

            // 由於volatile的可見性保證,這裏讀取到的一定是最新值100
            System.out.println("讀取線程讀取到數據: counter = " + counter);
        });

        // 啓動線程
        readerThread.start();
        writerThread.start();

        // 等待線程結束
        writerThread.join();
        readerThread.join();
    }
}

5.5 volatile 的適用場景與侷限性

適用場景:

  • 狀態標誌:如開關變量、完成標誌等
  • 一寫多讀:一個線程寫,多個線程讀的場景
  • 無原子操作:變量操作不需要保證原子性的場景
  • 雙重檢查鎖定模式:在單例模式中用於安全發佈

侷限性:

  • 不保證原子性:對於 i++ 這樣的複合操作無法保證
  • 不能替代鎖:對於複雜共享狀態的控制還是需要鎖
  • 性能考慮:頻繁修改的變量使用 volatile 可能導致總線流量增加

六、三種通信方式的對比與選擇

不同的線程間通信方式有各自的特點和適用場景,下表對比了它們的關鍵特性:

特性 wait/notify Condition 管道通信 volatile
線程安全級別 高(內置鎖) 高(顯式鎖) 中(緩衝區) 低(僅可見性)
數據傳輸能力 通過共享對象 通過共享對象 流式傳輸 單個變量
適用場景 線程間協作 複雜線程間協作 數據流傳輸 狀態標誌
實現複雜度 簡單 中等 中等 簡單
控制精度 一般 不適用 不適用
阻塞特性 阻塞 阻塞 阻塞 非阻塞
鎖機制 synchronized ReentrantLock 內部同步 無鎖
通信方向 多向 多向 單向 多向
通知精確性 不精確 精確 不適用 不適用
適用數據類型 任意對象 任意對象 支持連續的二進制數據或文本數據,適合流式處理(如日誌、文件內容),不適合離散的對象傳輸 基本類型/對象引用

七、線程間通信實戰案例:日誌收集器

下面是一個綜合應用案例,實現一個簡單的日誌收集器:

public class LogCollector {
    // 日誌隊列 - 內部已實現線程安全
    private final BlockingQueue<String> logQueue = new LinkedBlockingQueue<>(1000);
    // 停止標誌
    private volatile boolean stopped = false;
    // 用於管道通信的字符寫入器與讀取器
    private PipedWriter logWriter;
    private PipedReader logReader;

    // 線程管理
    private Thread collectorThread;  // 日誌收集線程
    private Thread processorThread;  // 日誌處理線程
    private Thread outputThread;     // 日誌輸出線程

    public LogCollector() throws IOException {
        // 初始化管道
        this.logWriter = new PipedWriter();
        this.logReader = new PipedReader(logWriter);
    }

    public void start() {
        // 創建日誌收集線程
        collectorThread = new Thread(() -> {
            System.out.println("日誌收集線程啓動");
            try {
                while (!stopped) {
                    // 模擬生成日誌
                    String log = "INFO " + new Date() + ": " + "系統運行正常,內存使用率: "
                        + new Random().nextInt(100) + "%";

                    // BlockingQueue的put方法在隊列滿時會自動阻塞
                    logQueue.put(log);
                    System.out.println("收集日誌: " + log);

                    // 控制日誌生成速度
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("日誌收集線程結束");
        });

        // 創建日誌處理線程
        processorThread = new Thread(() -> {
            System.out.println("日誌處理線程啓動");
            try {
                while (!stopped || !logQueue.isEmpty()) {
                    // BlockingQueue的poll方法在隊列空時會阻塞指定時間
                    String log = logQueue.poll(500, TimeUnit.MILLISECONDS);
                    if (log != null) {
                        // 處理日誌(這裏簡單加上處理標記)
                        String processedLog = "已處理: " + log + "\n";

                        // 通過管道發送到輸出線程
                        logWriter.write(processedLog);
                        logWriter.flush();
                    }
                }

                // 處理完所有日誌後關閉寫入器
                logWriter.close();
            } catch (InterruptedException | IOException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("日誌處理線程結束");
        });

        // 創建日誌輸出線程
        outputThread = new Thread(() -> {
            try {
                System.out.println("日誌輸出線程啓動");
                BufferedReader reader = new BufferedReader(logReader);
                String line;

                // 從管道中讀取處理後的日誌並輸出
                while ((line = reader.readLine()) != null) {
                    System.out.println("輸出處理後的日誌: " + line);
                }

                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("日誌輸出線程結束");
        });

        // 啓動線程
        collectorThread.start();
        processorThread.start();
        outputThread.start();
    }

    public void stop() {
        stopped = true;  // volatile保證可見性
        System.out.println("日誌收集器停止中...");
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        LogCollector collector = new LogCollector();
        collector.start();

        // 運行10秒後停止
        Thread.sleep(10000);
        collector.stop();
    }
}

這個案例綜合使用了多種線程間通信方式:

  1. BlockingQueue: 作為線程安全的日誌隊列,自動實現生產者-消費者模式
  2. Piped 流: 將處理後的日誌傳輸到輸出線程
  3. volatile 變量: 用作停止標誌控制線程終止

設計説明:這個實例展示瞭如何組合不同的線程間通信機制實現複雜功能:

  • BlockingQueue 處理生產者-消費者數據傳遞(收集線程與處理線程)
  • 管道通信處理字符流傳輸(處理線程與輸出線程)
  • volatile 變量處理狀態同步(停止信號)

八、總結與常見問題

8.1 線程間通信方式總結

通信方式 核心 API 使用場景 注意事項
wait/notify Object.wait()
Object.notify()
Object.notifyAll()
簡單同步
生產者-消費者
必須在 synchronized 中使用
使用 while 循環檢查條件
防止虛假喚醒
Condition lock.newCondition()
condition.await()
condition.signal()
複雜多條件
精確通知
必須與 Lock 配合使用
需手動加解鎖
使用 try/finally 保證鎖釋放
同樣需防範虛假喚醒
管道通信 PipedInputStream
PipedOutputStream
PipedReader
PipedWriter
數據傳輸
流處理
需要 connect 連接
單向通信
注意關閉資源
一寫一讀模式
volatile volatile 關鍵字 狀態標誌
一寫多讀
不保證原子性
適合簡單狀態同步

8.2 常見問題解答

  1. wait()和 sleep()的區別是什麼?

    • wait()釋放鎖,sleep()不釋放鎖
    • wait()需要在 synchronized 塊中調用,sleep()不需要
    • wait()需要被 notify()/notifyAll()喚醒,sleep()時間到自動恢復
    • wait()是 Object 類方法,sleep()是 Thread 類方法
  2. 為什麼 wait()需要在 synchronized 塊中調用?

    • 確保線程在檢查條件和調用 wait()期間持有鎖,避免競態條件
    • 調用 wait()前必須獲得對象的監視器鎖,這是 JVM 層面的要求
    • 確保線程放棄鎖並進入等待狀態的操作是原子的
  3. 如何處理虛假喚醒問題?

    // 正確做法:使用while循環
    synchronized (obj) {
        while (!condition) {  // 循環檢查
            obj.wait();
        }
        // 處理條件滿足情況
    }
    
    // 錯誤做法:使用if語句
    synchronized (obj) {
        if (!condition) {     // 只檢查一次
            obj.wait();
        }
        // 可能在條件仍不滿足時執行
    }
  4. Condition 相比 wait/notify 的優勢在哪裏?

    • 可以創建多個等待隊列,實現精確通知
    • 可以實現不可中斷的等待(awaitUninterruptibly)
    • 支持更靈活的超時控制(可指定時間單位)
    • 與 ReentrantLock 結合可實現公平鎖
  5. 如何選擇合適的線程間通信方式?

    • 簡單狀態同步:volatile 變量
    • 一個等待條件:wait/notify
    • 多個等待條件:Condition
    • 數據流傳輸:管道通信
    • 隊列操作:BlockingQueue
  6. volatile 與 AtomicInteger 的區別?

    • volatile 只保證可見性和有序性,不保證原子性
    • AtomicInteger 通過 CAS(Compare-And-Swap)操作保證原子性
    • 對於 i++ 這樣的操作,需要使用 AtomicInteger 而非 volatile
    • 兩者結合使用可以實現高效的線程安全代碼
  7. 管道通信與消息隊列有什麼區別?

    • 管道是 Java 內置的線程間通信機制,限於單 JVM 內
    • 消息隊列通常指分佈式消息系統(如 Kafka),可跨進程/服務器
    • 管道適合輕量級的線程間流數據傳輸
    • 消息隊列適合更大規模的分佈式系統組件間通信

感謝您耐心閲讀到這裏!如果覺得本文對您有幫助,歡迎點贊 👍、收藏 ⭐、分享給需要的朋友,您的支持是我持續輸出技術乾貨的最大動力!

如果想獲取更多 Java 技術深度解析,歡迎點擊頭像關注我,後續會每日更新高質量技術文章,陪您一起進階成長~

user avatar soroqer 头像 bssj 头像 tdengine 头像 ninedata 头像 sean_5efd514dcd979 头像
点赞 5 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.