博客 / 詳情

返回

9、PipedInputStream和PipedOutputStream的源碼分析和使用方法詳細分析

  在多線程編程中,線程間的數據交換是一個常見需求。Java IO包中的PipedInputStream和PipedOutputStream提供了一種高效的線程間通信機制,允許一批(多個)線程向PipedOutputStream寫入數據,另一批(多個)線程從PipedInputStream讀取數據。
  但是,同一批(多個)線程相互之間會存在競爭,比如,同一批向PipedOutputStream寫入數據的線程會存在競爭,同一批從PipedInputStream讀取數據的線程也會存在競爭。因此PipedInputStream和PipedOutputStream中的線程安全需要通過synchronized關鍵字和wait()/notifyAll()機制實現。不建議在一個線程中同時使用PipedInputStream和PipedOutputStream,因為這樣可能會導致這個線程陷入死鎖狀態。
  PipedInputStream和PipedOutputStream之間的通信本質上是一個生產者-消費者模型,其中PipedOutputStream作為生產者,PipedInputStream作為消費者。兩者通過一個循環緩衝區(byte[]數組)進行數據交換,PipedOutputStream將數據緩存在PipedInputStream的數組當中,等待PipedInputStream的讀取。
  PipedInputStream和PipedOutputStream的UML關係圖,如下所示:
clipboard

一、PipedOutputStream(生產者)源碼——向PipedInputStream(消費者)中的緩衝區(byte[]數組)寫入字節數據的輸出Stream(生產者)

package java.io;

import java.io.*;

public
class PipedOutputStream extends OutputStream {
    //與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)
    private PipedInputStream sink;
    
    //構造函數
    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);//調用connect()函數,來改變PipedInputStream (消費者)中一些變量的值
    }
    
    //構造函數
    public PipedOutputStream() {
    }
    
    //線程同步函數:用來改變將要關聯的PipedInputStream (消費者)中一些變量的值
    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();//如果將要關聯的PipedInputStream (消費者)為null,拋出NullPointerException
        } else if (sink != null || snk.connected) {
            //如果與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)!=null或者將要關聯的PipedInputStream (消費者)的boolean connected變量為true,則拋出IOException
            throw new IOException("Already connected");
        }
        sink = snk;//將這個PipedOutputStream(生產者)與這個PipedInputStream (消費者)相關聯
        snk.in = -1;//改變PipedInputStream (消費者)中的變量int in=-1
        snk.out = 0;//改變PipedInputStream (消費者)中的變量int out=0
        snk.connected = true;//改變PipedInputStream (消費者)中的變量boolean connected=true
    }
    
    //向與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)的緩衝區(byte[]數組)寫入1個字節
    public void write(int b)  throws IOException {
        if (sink == null) {
             //如果與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)== null,拋出IOException
            throw new IOException("Pipe not connected");
        }
        sink.receive(b);//最終調用的是這個相關聯的 PipedInputStream (消費者)的receive(int b)函數
    }
    
    //向與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)的緩衝區(byte[]數組)寫入byte[]數組b的[off,off+len)(左閉右開,不包括off+len)索引位置的字節
    public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            //如果與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)== null,拋出IOException
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();//如果byte[]數組b==null,拋出一個NullPointerException
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                   ((off + len) > b.length) || ((off + len) < 0)) {//byte[]數組b的[off,off+len)(左閉右開)索引位置是否有越界的檢查
            throw new IndexOutOfBoundsException();//越界的話,拋出一個IndexOutOfBoundsException
        } else if (len == 0) {
            return;//如果len==0,結束本次函數調用
        }
        sink.receive(b, off, len);//最終調用的是這個相關聯的 PipedInputStream (消費者)的receive(byte b[], int off, int len)函數
    }
    
    //線程同步函數:使用notifyAll()函數喚醒所有與這個PipedOutputStream(生產者)相關聯的 PipedInputStream (消費者)線程(這個消費者可以綁定1~多個線程)
    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }
    //關閉這個PipedOutputStream(生產者),這個PipedOutputStream(生產者)不能再向與它相關聯的PipedInputStream(消費者)中的緩衝區(byte[]數組)寫入字節數據
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }
}

二、PipedInputStream(消費者)源碼——從自己的緩衝區(byte[]數組)讀取字節數據的輸入Stream(消費者)

package java.io;

public class PipedInputStream extends InputStream {
    //標記符:true表示與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)已經關閉,反之,反之
    boolean closedByWriter = false;
    //標記符:true表示當前這個 PipedInputStream (消費者)已經關閉了,反之,反之
    volatile boolean closedByReader = false;
    //標記符:true表示與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)已經持有了這個PipedInputStream (消費者)對象(或者叫已經連接上了),反之,反之
    boolean connected = false;

    Thread readSide;//當前消費的線程
    Thread writeSide;//當前生產者的線程
    
    //默認的PipedInputStream (消費者)的緩衝區(byte[]數組)的長度
    private static final int DEFAULT_PIPE_SIZE = 1024;

    //PipedInputStream (消費者)的緩衝區(byte[]數組)
    protected byte buffer[];
    //緩衝區(byte[]數組)的寫指針
    protected int in = -1;
    //緩衝區(byte[]數組)的讀指針
    protected int out = 0;
    //構造函數
    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);//緩衝區(byte[]數組)的長度使用默認值1024
    }

    //構造函數
    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
         initPipe(pipeSize);//緩衝區(byte[]數組)的長度使用指定的長度
         //最終還是調用PipedOutputStream(生產者)的connect()函數,並把自身對象this傳遞進去,然後在PipedOutputStream(生產者)的connect()函數中,改變自己的3個變量int in=-1、int out=0、boolean connected=true
         connect(src);
    }
    
    //構造函數,緩衝區(byte[]數組)的長度使用默認值1024
    public PipedInputStream() {
        initPipe(DEFAULT_PIPE_SIZE);
    }

    //構造函數,緩衝區(byte[]數組)的長度使用指定的長度
    public PipedInputStream(int pipeSize) {
        initPipe(pipeSize);
    }
    
    //初始化緩衝區(byte[]數組)
    private void initPipe(int pipeSize) {
         if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         }
         buffer = new byte[pipeSize];
    }

    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this); //最終還是調用PipedOutputStream(生產者)的connect()函數,並把自身對象this傳遞進去,然後在PipedOutputStream(生產者)的connect()函數中,改變自己的3個變量int in=-1、int out=0、boolean connected=true
    }
    
    //線程同步函數:該函數只被PipedOutputStream(生產者)的write(int b)函數調用
    protected synchronized void receive(int b) throws IOException {
        checkStateForReceive();//檢查PipedInputStream (消費者)的狀態
        writeSide = Thread.currentThread();//當前執行該函數的線程,就是生產者線程
        if (in == out)
            //如果緩衝區(byte[]數組)的讀指針==緩衝區(byte[]數組)的寫指針,喚醒所有消費者線程,自己這個生產者線程調用wait(1000)函數
            awaitSpace();
        if (in < 0) {//緩衝區(byte[]數組)的寫指針<0時,設置緩衝區(byte[]數組)的寫指針=0,緩衝區(byte[]數組)的讀指針=0
            in = 0;
            out = 0;
        }
        buffer[in++] = (byte)(b & 0xFF);//向緩衝區的寫指針位置寫入1個字節
        if (in >= buffer.length) {
            in = 0;//如果緩衝區滿了,設置緩衝區的寫指針=0
        }
    }

    //線程同步函數:該函數只被PipedOutputStream(生產者)的write(byte b[], int off, int len)函數調用
    synchronized void receive(byte b[], int off, int len)  throws IOException {
        checkStateForReceive();//檢查PipedInputStream (消費者)的狀態
        writeSide = Thread.currentThread();//當前執行該函數的線程,就是生產者線程
        int bytesToTransfer = len;//生產者線程要寫入到緩衝區(byte[]數組)中的字節總量
        while (bytesToTransfer > 0) {
            if (in == out)
                //如果緩衝區(byte[]數組)的讀指針==緩衝區(byte[]數組)的寫指針,喚醒所有消費者線程,自己這個生產者線程調用wait(1000)函數
                awaitSpace();
            int nextTransferAmount = 0;//本次生產者線程要寫入到緩衝區(byte[]數組)中的字節數量
            if (out < in) {
                //如果緩衝區的讀指針<緩衝區的寫指針,本次要寫入到緩衝區(byte[]數組)中的字節數量=緩衝區的長度-緩衝區的寫指針
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    //如果緩衝區的讀指針(out)> 緩衝區的寫指針(in)並且緩衝區的寫指針(in)=-1,先設置緩衝區的讀(out)、寫(in)指針=0,本次要寫入到緩衝區(byte[]數組)中的字節數量=緩衝區的長度
                    nextTransferAmount = buffer.length - in;
                } else {
                    //如果緩衝區的讀指針(out)> 緩衝區的寫指針(in)並且緩衝區的寫指針(in)=-1,本次要寫入到緩衝區(byte[]數組)中的字節數量=讀指針(out)-寫指針(in)
                    nextTransferAmount = out - in;
                }
            }
            //本次生產者線程要寫入到緩衝區(byte[]數組)中的字節數量最多為len,下次為len-本次寫入到緩衝區(byte[]數組)中的字節數量,也就是每次寫入的基於len個字節循環遞減上一次寫入的
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);//向緩衝區(byte[]數組)的[in,in+nextTransferAmount)索引位置寫入byte[]數組b中[off,off+nextTransferAmount)索引位置的字節,都是左閉右開。
            bytesToTransfer -= nextTransferAmount;//每一次都基於len個字節循環遞減本次寫入到緩衝區(byte[]數組)中的字節數量nextTransferAmount
            off += nextTransferAmount;//將下次要從byte[]數組b中取字節的起始索引的位置(偏移量)+本次寫入到緩衝區(byte[]數組)中的字節數量nextTransferAmount
            in += nextTransferAmount;//將緩衝區的寫指針(in)+本次寫入到緩衝區(byte[]數組)中的字節數量nextTransferAmount
            if (in >= buffer.length) {
                in = 0;//如果緩衝區的寫指針(in)> 緩衝區(byte[]數組)的長度,設置緩衝區的寫指針(in)=0
            }
        }
    }

    //檢查PipedInputStream (消費者)的狀態
    private void checkStateForReceive() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
    }
    
    //如果緩衝區(byte[]數組)的讀指針==緩衝區(byte[]數組)的寫指針,喚醒所有消費者線程,自己這個生產者線程調用wait(1000)函數
    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            /* full: kick any waiting readers */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }
    //關閉與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)
    synchronized void receivedLast() {
        closedByWriter = true;
        notifyAll();//喚醒所有消費者線程
    }
    //線程同步函數:消費者線程每次從緩衝區(byte[]數組)中讀取1個字節
    public synchronized int read()  throws IOException {
        if (!connected) {//檢查標記符connected,如果為false,拋出IOException
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {//檢查標記符closedByReader,如果為true,拋出IOException
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
           //檢查當前這個PipedInputStream (消費者)對象中引用的生產者線程和生產者線程的狀態,如果和標記符closedByWriter還有緩衝區(byte[]數組)的寫指針(in)不能對應的話,拋出一個IOException
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();//當前執行該函數的線程,就是消費者線程
        int trials = 2;//這是一個多次檢測的策略變量,防止生產者線程沒有關閉了與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)時便拋出IOException
        //in=-1的情況有種:
        //①、生產者線程還沒有向緩衝區(byte[]數組)中寫任何字節
        //②、消費者線程從緩衝區(byte[]數組)中讀完字節(byte)數據以後讀指針(out)=寫指針(in),那麼,當前消費者線程會設置寫指針(in)=-1
        //③、消費者線程執行PipedInputStream 的close()函數後,關閉了這個 PipedInputStream (消費者)
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                //多個消費者線程從緩衝區(byte[]數組)中讀的時候,並且前一個消費者線程已經把緩衝區(byte[]數組)中寫入的字節讀完了,並且前一個線程設置了寫指針(in)=-1,生產者線程也關閉了與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)時,拋出一個IOException
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();//此處的目的是為了喚醒所有生產者線程
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;//獲取緩衝區(byte[]數組)中讀指針(out)索引位置的字節,並且將讀指針(out)+1
        if (out >= buffer.length) {
            out = 0;//如果讀指針(out)>=緩衝區(byte[]數組)的長度,設置讀指針(out)=0
        }
        if (in == out) {
            /* now empty */
            in = -1;//如果消費者線程從緩衝區(byte[]數組)中讀完字節(byte)數據以後讀指針(out)=寫指針(in),那麼,當前消費者線程會設置寫指針(in)=-1
        }

        return ret;
    }

    //線程同步函數:如果緩衝區(byte[]數組)中有足夠多的字節的話(數量>len),消費者線程每次從緩衝區(byte[]數組)中讀取len個字節放到byte[]數組b的[off, off+len)索引位置(左閉右開,不包括off+len)
    //如果緩衝區(byte[]數組)中字節的數量<len個(比如有in(寫指針)-out(讀指針)個),消費者線程每次從緩衝區(byte[]數組)中讀取(in-out)個字節放到byte[]數組b的[off, off+in-out)索引位置(左閉右開,不包括off+in-out)
    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {//byte[]數組b的[off,off+len)(左閉右開)索引位置是否有越界的檢查
            throw new IndexOutOfBoundsException();//越界的話,拋出一個IndexOutOfBoundsException
        } else if (len == 0) {
            return 0;//如果len==0,返回0
        }

        /* possibly wait on the first character */
        int c = read();//先調用read()函數試探性從緩衝區(byte[]數組)中讀1個字節
        if (c < 0) {
            return -1;//如果試探性的從緩衝區(byte[]數組)中都讀不到1個字節,返回-1
        }
        b[off] = (byte) c;//把試探性從緩衝區(byte[]數組)中讀到的第1個字節放到byte[]數組b的off索引位置
        int rlen = 1;//累計從緩衝區(byte[]數組)中讀到的所有字節數量
        while ((in >= 0) && (len > 1)) {

            int available;//本次執行System.arraycopy()函數可以從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {//減掉試探性從緩衝區(byte[]數組)中讀到的第1個字節
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;//讀指針(out)+System.arraycopy()函數從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量
            rlen += available;//累計從緩衝區(byte[]數組)中讀到的所有字節數量 + System.arraycopy()函數從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量
            len -= available;//len - System.arraycopy()函數從緩衝區(byte[]數組)中讀到byte[]數組b中的字節數量

            if (out >= buffer.length) {
                out = 0;//如果讀指針(out)>=緩衝區(byte[]數組)的長度,設置讀指針(out)=0
            }
            if (in == out) {
                /* now empty */
                in = -1;//如果消費者線程從緩衝區(byte[]數組)中讀完字節(byte)數據以後讀指針(out)=寫指針(in),那麼,當前消費者線程會設置寫指針(in)=-1
            }
        }
        return rlen;//返回累計從緩衝區(byte[]數組)中讀到的所有字節數量
    }
    
    //線程同步函數:返回緩衝區(byte[]數組)中可以被消費者線程讀取的字節數量
    public synchronized int available() throws IOException {
        if(in < 0)
            return 0;
        else if(in == out)
            return buffer.length;
        else if (in > out)
            return in - out;
        else
            return in + buffer.length - out;
    }
    
    //關閉這個 PipedInputStream (消費者),其實就是設置標記符closedByReader=true, 設置寫指針(in)=-1
    public void close()  throws IOException {
        closedByReader = true;
        synchronized (this) {
            in = -1;
        }
    }
}

三、1個線程向PipedOutputStream(生產者)寫字節數據,1個線程從PipedInputStream(消費者)讀取字節數據的過程

3.1、非循環直接寫和非循環直接讀
package com.chelong.StreamAndReader;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class PipedTest {
   public static void main(String[] args) throws IOException {
      final PipedOutputStream output = new PipedOutputStream();
      final PipedInputStream input = new PipedInputStream(output);
      Thread thread1 = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               output.write("Hello world, pipe!".getBytes());//write()函數是阻塞的
            } catch (IOException e) {
            }
         }
      });

      Thread thread2 = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               int data = -1;
               while ((data = input.read()) != -1) {//read()函數是阻塞的
                  System.out.print((char) data);
               }
            } catch (IOException e) {
            }
         }
      });

      thread1.start();
      thread2.start();
   }
}

程序運行結果,如下所示:
clipboard

  main線程構造PipedOutputStream(生產者)和PipedInputStream(消費者)的過程如下:
clipboard

  向PipedOutputStream(生產者)寫字節數據的生產者線程的執行過程如下:

clipboard
clipboard

  從PipedInputStream(消費者)讀取字節數據的消費者線程的執行過程如下:
clipboard

3.1.1、非循環直接寫和非循環直接讀時1個生產者線程和1個消費者線程處理數據的過程

  Java 語言定義了 6 種線程狀態, 在任意一個時間點, 一個線程只能有且只有其中的一種狀態, 這 6 種狀態分別如下:
clipboard

這 6 種線程狀態的簡單介紹,如下所示
clipboard

  JVM運行時內存結構主要包含了五個部分:程序計數器 (PC寄存器)、 JVM棧、Native方法棧、堆、 方法區。如下圖所示:
clipboard

圖中紅色部分是線程私有區域,進入這個區域的數據不會出現線程競爭的關係。而綠色區域中的數據則被所有線程共享,其中Java堆中存放的是大量對象,方法區中存放class信息、常量、靜態變量等數據。
  每個線程的線程棧中會存放函數(方法)的描述符,成員(本地)變量等,函數(方法)在線程棧中會通過壓棧和彈棧來執行,除了8種(byte、short、int、long、float、double、boolean、char)基本的數據類型存儲在線程棧中以外,其餘的引用數據類型(對象)都存儲在堆中,然後通過引用將堆中的對象和線程棧中的變量關聯起來(也可以叫線程棧中的引用指向堆中的對象)。
clipboard

那麼,當使用者執行3.1中的代碼時,1個生產者線程和1個消費者線程處理數據的過程如下:
①、main線程初始化一個緩衝區(byte[]數組),長度為1024(默認值),然後生產者線程通過不斷的壓棧來完成函數之間的調用,最終執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,如下所示:
clipboard

②、當生產者線程填充完緩衝區之後,寫指針變量int in=17,讀指針變量int out=0,Thread writeSide = 當前這個生產者線程(Thread)對象,生產者線程會把自己線程棧中修改的變量最終刷新到堆中PipedInputStream對象中,以確保其它消費者線程的線程棧從堆中讀取這3個變量時,這3個變量已經為修改後的值,如下所示:
clipboard

③、消費者線程讀緩衝區(byte[]數組)的過程中會不斷地執行out++(讀指針)以讀取緩衝區(byte[]數組)中的可用字節並返回,直到out(讀指針)==in(寫指針),修改in(寫指針)=-1,並且每次同步執行PipedInputStream.class::read()函數時,都會更新Thread readSide = 當前這個消費者線程(Thread)對象,消費者線程也會把自己線程棧中修改的變量最終刷新到堆中PipedInputStream對象中,以確保其它消費者線程的線程棧從堆中讀取這3個變量時,這3個變量已經為修改後的值,如下所示:
clipboard

④、更新in(寫指針)=-1後,消費者線程再次同步執行PipedInputStream.class::read()函數時,如果PipedInputStream::boolean closedByWriter變量為true,則會返回-1

3.2、加鎖循環寫和非加鎖循環讀到byte[]數組b中再處理
package com.chelong.pipe;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class PipeForTransferInThread {
   public static void main(String[] args) throws IOException, InterruptedException {
      final PipedOutputStream output = new PipedOutputStream();
      final PipedInputStream input = new PipedInputStream(output);
      //生產者線程
      Thread producer = new Thread(new Runnable() {
         @Override
         public void run() {
            for (int i = 0; i < 3; i++) {
               synchronized (input) {
                  try {
//                    input.wait();
                     output.write("Hello world, pipe!".getBytes());
                     input.wait();//釋放鎖並無限等待,直到消費者線程consumer 執行notifyAll()函數來喚醒當前阻塞
                  } catch (Exception e) {
                     e.printStackTrace();
                  }
               }
            }
         }
      },"生產者線程");
      
      //消費者線程
      Thread consumer = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               byte[] b = new byte[1024];//1KB
               int readBytes = -1;
               long lastTime = System.currentTimeMillis();
               while ((readBytes = input.read(b, 0, b.length)) != -1) {
                  long curTime = System.currentTimeMillis();
                  System.out.print(Thread.currentThread().getName()+"本次讀取花費時間:" + (curTime - lastTime) + "ms,讀到的數據是:");
                  lastTime = curTime;
                  for (int i = 0; i < readBytes; i++) {
                     System.out.print((char) b[i]);//模擬處理字節數據
                  }
                  System.out.println();
               }
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      },"消費者線程");
      producer.start();//生產者線程啓動
      consumer.start();//消費者線程啓動
   }
}

程序運行結果,如下所示:
clipboard

  main線程構造PipedOutputStream(生產者)和PipedInputStream(消費者)的過程可以參考3.1;
  向PipedOutputStream(生產者)寫字節數據的生產者線程的執行過程可以參考3.1;
  從PipedInputStream(消費者)讀取字節數據的消費者線程的執行過程如下:
clipboard
clipboard

3.2.1、加鎖循環寫和非加鎖循環讀到byte[]數組b中再處理時1個生產者線程和1個消費者線程處理數據的過程

  標題3.2中的代碼的整個執行過程如下:
①、main線程初始化一個緩衝區(byte[]數組),長度為1024(默認值),如下所示:
clipboard

②、然後生產者線程通過不斷的壓棧來完成函數之間的調用,最終執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,並且先在自己的線程棧中更新in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程(Thread)對象 如下所示:
clipboard
clipboard

當生產者線程對緩衝區(byte[]數組)填充完成之後,再執行標題3.2中的代碼

input.wait();

這行代碼會釋放鎖並讓生產者線程進入無限等待,直到消費者線程consumer執行notifyAll()函數來喚醒當前這個生產者線程。在這之前,生產者線程會將自己線程棧中的in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。
clipboard

③、消費者線程讀緩衝區(byte[]數組)的過程也是通過不斷的壓棧來完成函數之間的調用,最終執行PipedInputStream::read()函數(試探性的讀取1個字節)和PipedInputStream::read(byte b[], int off, int len)函數(讀取剩餘其它的字節)將步驟②中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來
clipboard
clipboard
clipboard

附言:最終消費者線程也會將自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,writeSide=當前這個消費者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。

因此,本次消費者線程從緩衝區(byte[]數組)中讀數據的過程中沒有執行read()函數中的wait(1000)這一行代碼,如下:
clipboard

所以,本次消費者線程從緩衝區(byte[]數組)中讀取數據到消費者線程中自己創建的byte[]數組中時,只花費了0ms:
clipboard

接下來,當消費者線程將步驟②中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來以後(通過System.arraycopy()函數複製到了消費者線程中自己創建的byte[]數組中),消費者線程會遍歷從緩衝區讀到的這個byte[]數組,來處理這些數據,如下所示(標題3.2中的代碼片段):

                   //標題3.2中的代碼片段
                   for (int i = 0; i < readBytes; i++) {
                     System.out.print((char) b[i]);//模擬處理字節數據
                  }

然後,當消費者線程再次執行

//標題3.2中的代碼片段
input.read(b, 0, b.length)

從緩衝區(byte[]數組)中讀數據到自己創建的byte[]數組中時,由於此時in(寫指針)=-1,並且當下圖中的其它5個條件都不成立時,喚醒執行了

input.wait()

的生產者線程,然後當前這個正在從緩衝區(byte數組)中讀數據的消費者線程執行wait 1000ms ,如下:
clipboard
clipboard

④、當生產者線程被消費者線程執行的

notifyAll();

喚醒之後,會再次通過不斷的壓棧來完成函數之間的調用,再次執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,並且先在自己的線程棧中先更新in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程(Thread)對象 如下所示:
clipboard
clipboard

當生產者線程對緩衝區(byte[]數組)填充完成之後,再執行標題3.2中的代碼

input.wait();

這行代碼會釋放鎖並讓生產者線程進入無限等待,直到消費者線程consumer執行notifyAll()函數來喚醒當前這個生產者線程。在這之前,生產者線程會將自己線程棧中的in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。如下所示:
clipboard

⑤、消費者線程在第③步執行了

wait(1000);

在等待了1000ms之後,消費者線程會自動喚醒繼續執行,此時自己線程棧中的in(寫指針)= -1,out(讀指針)= 17已經被第④步中的生產者線程修改為in(寫指針)=17,out(讀指針)=0(生產者線程不會直接修改消費者線程棧中的變量,生產者線程會先將自己線程棧中in(寫指針),out(讀指針)變量的值修改到主內存中,然後消費者線程會自己將主內存中的這2個變量值刷新到消費者自己的線程棧中),如下所示:
clipboard

然後執行PipedInputStream::read()函數(試探性的讀取1個字節)和PipedInputStream::read(byte b[], int off, int len)函數(讀取剩餘其它的字節)將步驟④中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來
clipboard
clipboard
clipboard

附言:最終消費者線程也會將自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,writeSide=當前這個消費者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。

由於,本次消費者線程從緩衝區(byte[]數組)中讀數據的過程是從步驟③中自動喚醒繼續執行的,所以,本次消費者線程從緩衝區(byte[]數組)中讀取數據到消費者線程中自己創建的byte[]數組中時,花費了1015ms:
clipboard

接下來,當消費者線程將步驟④中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來以後(通過System.arraycopy()函數複製到了消費者線程中自己創建的byte[]數組中),消費者線程會遍歷從緩衝區讀到的這個byte[]數組,來處理這些數據,如下所示(標題3.2中的代碼片段):

                   //標題3.2中的代碼片段
                   for (int i = 0; i < readBytes; i++) {
                     System.out.print((char) b[i]);//模擬處理字節數據
                  }

然後,當消費者線程再次執行

//標題3.2中的代碼片段
input.read(b, 0, b.length)

從緩衝區(byte[]數組)中讀數據到自己創建的byte[]數組中時,由於此時in(寫指針)=-1,並且當下圖中的其它5個條件都不成立時,喚醒執行了

input.wait()

的生產者線程,然後當前這個正在從緩衝區(byte[]數組)中讀數據的消費者線程執行wait 1000ms ,如下:
clipboard
clipboard

⑥、當生產者線程被消費者線程執行的

notifyAll();

喚醒之後,會再次通過不斷的壓棧來完成函數之間的調用,再次執行PipedInputStream.class::receive(byte b[], int off, int len)函數來對緩衝區(byte[]數組)進行填充,並且先在自己的線程棧中先更新in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程(Thread)對象 如下所示:
clipboard
clipboard

當生產者線程對緩衝區(byte[]數組)填充完成之後,再執行標題3.2中的代碼

input.wait();

這行代碼會釋放鎖並讓生產者線程進入無限等待,直到消費者線程consumer執行notifyAll()函數來喚醒當前這個生產者線程。在這之前,生產者線程會將自己線程棧中的in(寫指針)=17,out(讀指針)=0,writeSide=當前這個生產者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。如下所示:
clipboard

⑦、消費者線程在第⑤步執行了

wait(1000);

在等待了1000ms之後,消費者線程會自動喚醒繼續執行,此時自己線程棧中的in(寫指針)= -1,out(讀指針)= 17已經被第⑥步中的生產者線程修改為in(寫指針)=17,out(讀指針)=0(生產者線程不會直接修改消費者線程棧中的變量,生產者線程會先將自己線程棧中in(寫指針),out(讀指針)變量的值修改到主內存中,然後消費者線程會自己將主內存中的這2個變量值刷新到消費者自己的線程棧中),然後執行PipedInputStream::read()函數(試探性的讀取1個字節)和PipedInputStream::read(byte b[], int off, int len)函數(讀取剩餘其它的字節)將步驟⑥中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來
clipboard
clipboard
clipboard

附言:最終消費者線程也會將自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,writeSide=當前這個消費者線程,這3個變量更新到主內存(也就是堆)中的PipedInputStream對象中。

由於,本次消費者線程從緩衝區(byte[]數組)中讀數據的過程是從步驟⑤中自動喚醒繼續執行的,所以,本次消費者線程從緩衝區(byte[]數組)中讀取數據到消費者線程中自己創建的byte[]數組中時,花費了1017ms:
clipboard

接下來,當消費者線程將步驟⑥中生產者線程寫入到緩衝區(byte[]數組)中的17個字節讀取出來以後(通過System.arraycopy()函數複製到了消費者線程中自己創建的byte[]數組中),消費者線程會遍歷從緩衝區讀到的這個byte[]數組,來處理這些數據,如下所示(標題3.2中的代碼片段):

                   //標題3.2中的代碼片段
                   for (int i = 0; i < readBytes; i++) {
                     System.out.print((char) b[i]);//模擬處理字節數據
                  }

然後,當消費者線程再次執行

//標題3.2中的代碼片段
input.read(b, 0, b.length)

從緩衝區(byte[]數組)中讀數據到自己創建的byte[]數組中時,由於此時in(寫指針)=-1,並且當下圖中的其它5個條件都不成立時,喚醒執行了

input.wait()

的生產者線程,然後當前這個正在從緩衝區(byte[]數組)中讀數據的消費者線程執行wait 1000ms ,如下:
clipboard
clipboard

⑧、當生產者線程被消費者線程執行的

notifyAll();

喚醒之後,會跳出for循環,結束生產者線程的生命週期,之後,該線程對象會被操作系統回收。
⑨、消費者線程在第⑦步執行了

wait(1000);

在等待了1000ms之後,消費者線程會自動喚醒繼續執行,此時自己線程棧中的in(寫指針)= -1,out(讀指針)= 17,並且從

wait(1000);

的代碼之後,繼續執行,執行過程如下(從下圖的紫色流程繼續執行):
clipboard

在執行了2個循環後,直到int trials = 0時,執行到判斷(writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)這個條件時就會為true(下圖的紅色流程)
clipboard

然後,拋出了一個IOException("Pipe broken"),因此,可以得出int trials變量的含義:這個變量是一個多次檢測的策略變量,當生產者線程沒有關閉了與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)時,並且writeSide變量指向的當前生產者線程已經被操作系統回收時(此時當前生產者線程對象的isAlive()函數會返回false),消費者線程會拋出1個IOException("Pipe broken"),並結束while循環,進而結束消費者線程的生命週期。之後,該線程對象也會被操作系統回收。如下圖所示:
clipboard

3.2.2、怎樣防止3.2.1中第⑨步的生產者線程拋出IOException("Pipe broken")

  回顧3.2.1中第⑨步中的消費者線程拋出IOException("Pipe broken")的產生過程:當執行到判斷(writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)這個條件時就會為true(下圖的紅色流程)
clipboard

那麼,使用者就可以將上圖中紅色流程的前一步變成true即可,如下代碼所示(只修改了生產者線程中的代碼,消費者線程中的代碼沒有變化):

package com.chelong.pipe;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
   public static void main(String[] args) throws IOException, InterruptedException {
      final PipedOutputStream output = new PipedOutputStream();
      final PipedInputStream input = new PipedInputStream(output);
      //生產者線程
      Thread producer = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               for (int i = 0; i < 3; i++) {
                  synchronized (input) {
//                    input.wait();
                     output.write("Hello world, pipe!".getBytes());
                     input.wait();//釋放鎖並無限等待,直到消費者線程thread2執行notifyAll()函數來喚醒當前阻塞
                  }
               }
            } catch (Exception e) {
               e.printStackTrace();
            } finally {
               try {
                  if (output != null) output.close();//調用close()函數關閉生產者對象
               } catch (IOException e) {
                  e.printStackTrace();
               }
            }
         }
      }, "生產者線程");

      //消費者線程
      Thread consumer = new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               byte[] b = new byte[1024];//1KB
               int readBytes = -1;
               long lastTime = System.currentTimeMillis();
               while ((readBytes = input.read(b, 0, b.length)) != -1) {
                  long curTime = System.currentTimeMillis();
                  System.out.print(Thread.currentThread().getName() + "本次讀取花費時間:" + (curTime - lastTime) + "ms,讀到的數據是:");
                  lastTime = curTime;
                  for (int i = 0; i < readBytes; i++) {
                     System.out.print((char) b[i]);//模擬處理字節數據
                  }
                  System.out.println();
               }
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      }, "消費者線程");
      producer.start();//生產者線程啓動
      consumer.start();//消費者線程啓動
   }
}

程序運行結果,如下所示:
clipboard

  通過PipedOutputStream.class::close()的源碼可以看到這樣修改後消費者線程不再拋出IOException("Pipe broken")原因:
PipedOutputStream.class(生產者類)的源碼

package java.io;

import java.io.*;

public
class PipedOutputStream extends OutputStream {
    ...省略部分代碼...
    //關閉這個PipedOutputStream(生產者),這個PipedOutputStream(生產者)不能再向與它相關聯的PipedInputStream(消費者)中的緩衝區(byte[]數組)寫入字節數據
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();//調用PipedInputStream.class::receivedLast()函數
        }
    }
}

PipedInputStream .class(消費者類)的源碼

package java.io;

public class PipedInputStream extends InputStream {
    //標記符:true表示與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)已經關閉,反之,反之
    boolean closedByWriter = false;
    ...省略部分代碼...
    //關閉與這個 PipedInputStream (消費者)相關聯的PipedOutputStream(生產者)
    synchronized void receivedLast() {
        closedByWriter = true;//關閉後消費者再從緩衝區(byte[])數組中讀取字節數據時,會返回-1,不會拋出IOException了
        notifyAll();//喚醒所有消費者線程
    }
    ...省略部分代碼...

四、多個線程向PipedOutputStream(生產者)寫字節數據,多個線程從PipedInputStream(消費者)讀取字節數據的過程

  略(待補充)

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.