博客 / 詳情

返回

用Java來實現BIO和NIO模型的HTTP服務器(二) NIO的實現

翻了一下(一)發現整體還是不大好, 這裏重新再梳理一下

前言

這是一個系列的文章,按照規劃是用Java標準庫、Netty來實現一個非常簡單的HTTP服務器,HTTP服務器我們可以使用Java標準庫提供的api,實現BIO、NIO模型的HTTP服務器,然後再用Netty實現,前一篇我們寫的類在這一篇還可以用到,讓我們回憶一下上一篇我們講了什麼,我們回顧了通信的發展史,從最開始的點對點鏈路,到總線鏈路,再到mac地址,ip地址,最後引出兩台計算機之間的通信事實上是兩台計算機上面進程之間的通信,那麼該數據包到達計算機之後該如何交給哪個進程呢,這也就是端口,運輸層引入了端口的概念,ip+端口構成TCP連接的一端,那麼要通信就首先要建立連接,也就是三次握手,連接建立之後就可以通過連接來傳輸數據了,那麼該如何管理連接呢? 操作系統在連接建立的時候,會將這個消息通知給進程。

我們的程序通過Socket來和操作系統進行交互,這裏的Socket指的是操作系統提供的服務,當一個進程向一個進程發起請求建立連接的請求,這個數據包首先經過操作系統提供的接口向下傳遞,然後通過互聯網中層層設備轉發來到另一個進程所在的計算機上,兩台計算機完成連接建立之後,通知上層的應用程序。

當我們編寫的應用程序需要使用網絡服務的時候,在Java中我們首先要明確自己是客户端還是服務端,客户端是發起請求的一方, 我們客户端的代碼可以這麼寫:

Socket socket = new Socket();
// 代表客户端請求連接ip地址為127.0.0.1,端口為8080的進程
socket.connect(new InetSocketAddress("127.0.0.1",8080));
// 連接之後獲取輸入流
OutputStream outputStream = socket.getOutputStream();
// 寫入hello world
outputStream.write("hello world".getBytes());

客户端在發起連接請求的時候,這個請求會首先到達操作系統,操作系統會為這次調用所需要的一些資源(CPU時間,網絡帶寬、存儲器空間等)分配該應用進程。操作系統為這些資源總和創建一個套接字描述符的好嘛來表示,然後將這個套接字描述返回給應用進程。看到這裏有一個疑問,客户端沒聲明自己在那個端口上,那服務端在給客户端發送消息的時候,這個消息到達操作系統應該給誰呢? 答案是操作系統會從可用的端口分配一個,但是如果你想綁定指定的端口其實也可以,Socket這個類裏面提供了bind方法:

public void bind(SocketAddress bindpoint) throws IOException 

客户端寫完之後我們來寫服務端, TCP協議中我們需要一個服務端,監聽指定的端口。在Java裏面寫服務端的應用程序事實上有兩套API,一套是JDK 1.0引入的以ServerSocket為中心的API,一套是JDK 1.4 引入的以ServerSocketChannel為核心的API。第一套寫監聽的方法如下:

ServerSocket serverSocket = new ServerSocket();
// 綁定在8080端口
serverSocket.bind(new InetSocketAddress(8080));
// 監聽連接,該方法會阻塞到這裏直到有連接建立完成
// 發起系統調用
Socket socket = serverSocket.accept();
while (true){
    InputStream socketInputStream = socket.getInputStream();
    byte[] readByte = new byte[4096];
    // 這裏其實數據不見得立馬可以讀, 因為數據不代表立馬可以讀
    // 發起系統調用
    int readTotalNumber  = socketInputStream.read(readByte);
    String s = new String(readByte,0,readTotalNumber);
    System.out.println(s);
}

這裏我們詳細的解釋一下為什麼數據為什麼不可讀,我們知道現代高級語言程序想要使用網絡服務,必須調用操作系統提供的接口,這種調用也被稱為系統調用,發生系統調用的時候發生了什麼?

系統調用指運行在用户空間(User space)向操作系統內核請求需要更高權限運行的服務。

讓我們回憶一下操作系統的內核空間和用户空間,計算機的內存被切割為兩個部分:

用户空間: 正如同的它的名字一樣,處內核以外所有的用户進程運行在這個空間上。內核的作用是管理在該空間內運行的應用程序,防止他們互相干擾,避免機器出現混亂。

內核空間: 內核的代碼和數據存放在這個位置上,內核也是一個進程,內核運行在這塊內存之上。

與之相對的兩個概念是內核模式(Kernel mode,有資料也稱為System Model 系統模式),是Linux中CPU運行模式之一。另一種是用户模式(user model),是用户程序的非特權模式,也就是內核以外的所有操作模式。當CPU運行在內核模式下面,默認運行的是受信任的程序,因此它可以執行任何指令和訪問任何內存位置。內核(操作系統的核心,對系統中發生的一切擁有完全的控制權)是被信任的軟件,其他程序不受信任,因此所用的用户進程都必須使用系統調用來請求內核執行特權指令,比如創建進程、I/O操作。

術語System Call 和 System Exit是實際彙編語言指令的佔位符,分別用於將CPU從用户模式切換到內核模式,從內核模式切換到用户模式。當用户進程發起一個調用,Linux會為這個調用分配一個系統調用編號,Linux使用系統調用表(System Call Dispatch Table)存儲調用編號和實際執行系統調用對應功能的函數。

實際運行上面的程序會發現 , 會出現下面的異常:

Exception in thread "main" java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.net.SocketInputStream.read(SocketInputStream.java:127)
    at com.example.quicktest.ServerSocketDemo.oldAPI(ServerSocketDemo.java:42)
    at com.example.quicktest.ServerSocketDemo.main(ServerSocketDemo.java:20)

原因在於我們上面寫的程序是是在不斷的處理連接的,收到數據之後,再讀收到了RST包,那什麼是RST包,讓我們回憶一下關於TCP的經典面試題三次握手和四次揮手:

聲明圖片來自於參考文檔[16]

我們一邊寫這個一邊將我們的所學聯繫起來,在TCP協議中客户端主動關閉連接,發起系統調用之後,內核發一個TCP數據包,這個TCP數據包的終止位FIN置成1,序號seq = u,它等於前面已傳送的數據的最後一個字節的序號加1,這時Client進入FIN-WAIT-1(終止等待1)狀態,等待服務端的確認。

服務端收到FIN之後,向Client發送確認包ack = u +1,這個u等於前面已經傳送的數據的最後一個序號加1 , Client收到ack之後進入到FIN-WAIT-2,服務端如果沒有數據要發送了就會向客户端發送TCP數據包,數據包中的FIN置為1, 服務端還必須重複上次已發送過的確認好ack = u + 1.這時B就進入LAST-ACK(最後確認)狀態,等待A的確認。

客户端再次收到服務端的連接釋放報文段後,必須對此發出確認。在確認報文段中把ACK置為1、確認號ack = w + 1 ,而自己的序號是seq = u + 1(根據TCP標準, 前面發送給的FIN報文段要消耗一個序號)。然後進入到TIME-WAIT(時間等待)的狀態。請注意,現在TCP連接還沒有釋放掉。必須經過時間等待計時器(TIME-WAIT timer) 設置的時間2MSL後,A才進入到CLOSED狀態

然後我們客户端退出之後,沒有顯式的調用close,也就是客户端沒有走正常流程關閉TCP連接,但對於操作系統來説還是要回收對應的資源,所以進程退出的時候,內核會監測到這個變化,因為這個連接已經是異常了。 在傳輸控制協議(TCP)連接的數據包流中,每個數據包都包含一個TCP包頭。這些包頭中的每一個都包含一個稱為“復位”(RST)標誌的位。在大多數數據包中,該位設置為0,並且無效;但是,如果此位設置為1,則向接收計算機指示該計算機應立即停止使用TCP連接;它不應使用連接的標識號(端口)發送更多數據包,並丟棄接收到的帶有包頭的其他數據包,這些包頭指示它們屬於該連接。

所以上面的代碼客户端完善一點應當是這個樣子:

// 藉助try-with-resources 自動關閉釋放資源
try (Socket socket = new Socket()){
    socket.connect(new InetSocketAddress("127.0.0.1",8080));
    try(OutputStream outputStream = socket.getOutputStream()){
        outputStream.write("hello world".getBytes());
    }
}

accept調用也應該放在whie(true)循環裏面,所以代碼應當改成下面這個樣子:

ServerSocket serverSocket = new ServerSocket();
// 綁定在8080端口
serverSocket.bind(new InetSocketAddress(8080));
// 監聽連接,該方法會阻塞到這裏直到有連接建立完成
// 發起系統調用
while (true) {
    try (Socket socket = serverSocket.accept()) {
        try (InputStream socketInputStream = socket.getInputStream()) {
            byte[] readByte = new byte[4096];
            // 這裏其實數據不見得立馬可以讀, 因為數據不代表立馬可以讀
            // 發起系統調用
            int readTotalNumber = socketInputStream.read(readByte);
            String s = new String(readByte, 0, readTotalNumber);
            System.out.println(s);
        }
    }
}

在這種模型下同時只能處理一下一個連接,因為我們只有一個線程,這個鏈接的讀取邏輯沒處理完畢,下一個得等待在那裏。我們這裏解釋一下,我們調用SocketInputStream的read函數的時候為什麼不是立刻能讀取,一般來説,一台能聯網的計算機首先得有網卡不管是有線網卡還是無線網卡,數據經過路由器之後網線到達網卡,然後將數據包從網卡硬件緩存轉移到內存中,然後通知內核處理,然後經過TCP/IP協議層處理,最後應用程序通過系統調用讀取到發送過來的數據。

上面的寫法還面臨的一個問題就是沒有判斷什麼時候報文結束,如果是短鏈接即傳輸一次消息連接就關閉,那麼read函數返回-1就代表數據結束,如果我們希望TCP連接保活,即保持這個鏈接,我們只是做示例,完整的會在下面構建HTTP服務器中詳細講述。

如果你熟悉網絡編程,還有一個網絡異常會經常碰到:

Exception in thread "main" java.net.SocketException: Connection reset by peer

被對等方重置連接,這是啥意思? 相當於突然掛電話,這比單純的不迴應、讓人等着更有禮貌。但這並不是真正有禮貌的TCP/IP的關閉方式。也就是説連接建立了,某一方突然關閉連接,另一方還在使用這個連接,就會出現這個異常。

在《用Java的BIO和NIO、Netty來實現HTTP服務器(一) 》裏面我們用的是在1.4引入的新API,這套API的優勢就是比較統一,可以通過ServerSockeChannel的configureBlocking來制定使用BIO還是NIO,所以上面服務端的寫法可以等價轉換為:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
while (true) {
    try (SocketChannel socketChannel = serverSocketChannel.accept()) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
        socketChannel.read(byteBuffer);
        byteBuffer.flip();
        byte[] readDataArray = new byte[byteBuffer.limit()];
        byteBuffer.get(readDataArray);
        String readData = new String(readDataArray);
        System.out.println(readData);
    }
}

ByteBuffer基於byte數組封裝了一些常見的操作,可以理解為一個容器,put存,get取,但是在取之前我們需要知道取到哪個位置,調用flip方法之後,調用limit方法之後就能知道ByteBuffer中放了多少元素。

前面面臨的問題

(一)是七月份寫的,這裏我們在複習一下里面的內容, 我們希望構建用Java標準庫,以及Java的NIO框架Netty實現一個簡單的HTTP服務器,(一)是BIO模型,不管是NIO還是BIO,還是用框架構建,基於TCP協議的網絡編程都會面臨這樣一個問題,首先是管理連接,也就是連接建立,連接建立之後我們讀取數據,那麼HTTP報文不固定大小,我們需要根據報文結束標誌來判斷是否讀取結束,然後讀取完整之後交給下一層去處理。但是NIO還是BIO他們具備共性,所以我們用繼承來實現,我們首先抽象了一個Server的基礎類:

public abstract class Server {

    protected ServerSocketChannel serverSocketChannel;

    protected SSLContext sslContext;

    static int port = 8000;

    static int backlog = 1024;

    static boolean secure = false;

    public Server(int port, int backlog, boolean secure) throws IOException {
        // 創建一個ServerSocketChannel
        serverSocketChannel =  ServerSocketChannel.open();
        /**
         * 可以重用ip+端口這個鏈接,TCP以鏈接為單位,當TCP鏈接要關閉的時候,會等待一段時間再進行關閉,
         * 如果我想要重用端口,那麼channel就無法綁定,在綁定到對應地址之前,設定重用地址。即使在這個端口上的tcp連接
         * 處於處於TIME_WAIT狀態,我們仍然可以使用
         */
        serverSocketChannel.socket().setReuseAddress(true);
        // 綁定端口和backlog
        serverSocketChannel.bind(new InetSocketAddress(port),backlog);

    }

    /**
     * 這裏是抽象方法,我們後面要用NIO再實現一遍
     * 所以這裏交給子類來實現
     */
    protected  abstract void runServer();

    /**
     * 我們要寫的是一個簡單的HTTP服務器,
     * 這個服務器可以從命令行方式啓動的時候接收參數
     * 我們可以選擇從main函數
     * @param args
     */
    public static void main(String[] args) throws IOException {
        Server server = null;
        if (args.length == 0){
            System.out.println("http server running default model");
            server = new BlockingServer(port,backlog,secure);
            server.runServer();
        }
        // 端口目前先固定死, 我們目前只讀一個參數
        if ("B".equals(args[0])){
            server = new BlockingServer(port,backlog,secure);
        }else if ("N".equals(args[0])){
            server = new NonBlockingServer(port,backlog,secure);
        }else{
            System.out.println("input args error only support B OR N");
            return;
        }
        server.runServer();
    }
}

public class BlockingServer extends Server {
    
    public BlockingServer(int port, int backlog, boolean secure) throws IOException {
        super(port, backlog, secure);
    }
    
    @Override
    protected void runServer() throws IOException {
        for (;;){
            SocketChannel socketChannel = serverSocketChannel.accept();
            ChannelIO channelIO = ChannelIO.getInstance(socketChannel, true);
            RequestServicer requestServicer = new RequestServicer(channelIO);
            requestServicer.run();
        }
    }
}

我們用ServerSocketChannel這個為核心來構建HTTP服務器,原因是實現上更為統一,我們最終可以做成一個jar,所以我們根據命令行參數來決定是BIO還是NIO模型的服務器。啓動之後應當是一個無限循環,不斷接收連接,不斷處理請求。連接建立之後,我們需要不斷的讀數據,這是NIO和BIO共同的特徵,所以我們寫了一個ChannelIO工具類,來實現對數據的讀取:

public class ChannelIO {

    private SocketChannel socketChannel;

    private ByteBuffer requestBuffer;

    int defaultByteBufferSize = 4096;

    private ChannelIO(SocketChannel socketChannel, boolean blocking) throws IOException {
        this.socketChannel = socketChannel;
        this.requestBuffer = ByteBuffer.allocate(4096);
        this.socketChannel.configureBlocking(blocking);
    }


    public static ChannelIO getInstance(SocketChannel socketChannel, boolean blocking) throws IOException {
        return  new ChannelIO(socketChannel,blocking);
    }

    public int read() throws IOException {
        // 剩餘的小於百分之五自動擴容
        resizeByteBuffer(defaultByteBufferSize / 20);
        return socketChannel.read(requestBuffer);
    }

    private void resizeByteBuffer(int remaining) {
        if (requestBuffer.remaining() < remaining){
            // 擴容一倍
            ByteBuffer newRequestBuffer = ByteBuffer.allocate(requestBuffer.capacity() * 2);
            // 轉為讀模式
            requestBuffer.flip();
            //  將舊的buffer放入到新的buffer中
            newRequestBuffer.put(requestBuffer);
            requestBuffer = newRequestBuffer;
        }
    }

    public ByteBuffer getReadBuf(){
        return this.requestBuffer;
    }

    public int write(ByteBuffer byteBuffer) throws IOException{
        return socketChannel.write(byteBuffer);
    }

    public void close() throws IOException{
        socketChannel.close();
    }
}

ChannelIO主要的幾個作用就是讀和寫,默認的ByteBuffer為4096,但是報文大小有可能超過,所以這裏我們讀之前看看需不需要自動擴容,這個類被請求處理者所處理, 請求處理者要負責解析HTTP報文,HTTP報文有請求方式,有結束標誌,有uri,這個解析的任務我們放在Request這個類來處理:

public class Request {

    private Action action;

    private URI uri;

    private String version;

    public Request(Action action, URI uri, String version) {
        this.action = action;
        this.uri = uri;
        this.version = version;
    }

    static class Action{
        private String name;

        static Action GET = new Action("GET");

        static Action POST = new Action("POST");

        static Action PUT = new Action("PUT");

        static Action HEAD = new Action("HEAD");

        public Action(String name) {
            this.name = name;
        }
        public String toString(){
            return this.name;
        }
        static Action parse(String s){
            if ("GET".equals(s)){
                return GET;
            }
            if ("POST".equals(s)){
                return POST;
            }
            if ("PUT".equals(s)){
                return PUT;
            }
            if ("HEAD".equals(s)){
                return HEAD;
            }
            // 參數不合法
            throw new IllegalArgumentException(s);
        }
    }


    public  static boolean isComplete(ByteBuffer byteBuffer){
        int position = byteBuffer.position() - 4;
        if (position < 0){
            return false;
        }
        return byteBuffer.get(position + 0) == '\r'
                && byteBuffer.get(position + 1) == '\n'
                && byteBuffer.get(position + 2) == '\r'
                && byteBuffer.get(position + 3) == '\n';
    }

    private static Charset ascii = StandardCharsets.US_ASCII;

    /**
     * 正則表達式 用來分割請求報文
     * http 請求的報文是: GET /dir/file HTTP/1.1
     *  Host: hostname
     *  被正則表達式分割以後:
     *      group[1] = "GET"
     *      group[2] = "/dir/file"
     *      group[3] = "1.1"
     *      group[4] = "hostname"
     */
    private static Pattern requestPattern
            = Pattern.compile("\\A([A-Z]+) +([^ ]+) +HTTP/([0-9\\.]+)$"
                    + ".*^Host: ([^ ]+)$.*\r\n\r\n\\z",
            Pattern.MULTILINE | Pattern.DOTALL);

    public static  Request parse(ByteBuffer byteBuffer) throws RequestException {
        // byte to char
        CharBuffer charBuffer = ascii.decode(byteBuffer);
        Matcher matcher = requestPattern.matcher(charBuffer);
        // 未匹配
        if (!matcher.matches()){
            throw new  RequestException();
        }
        Action a;
        try {
            a = Action.parse(matcher.group(1));
        }catch (IllegalArgumentException  e){
            throw new RequestException();
        }
        URI u = null;
        try {
            u = new URI("http://" + matcher.group(4) + matcher.group(2));
        }catch (URISyntaxException e){
           throw new RequestException(e);
        }
        return new Request(a,u,matcher.group(3));
    }
}

這個類主要封裝HTTP報文的請求方式、版本、URI。有Request就有Reply,一個HTTP響應通常情況下會有狀態碼和內容,這裏我們的HTTP服務器將來要擴展到各種類型:

public interface Sendable {
    // 做轉碼
    void prepare() throws IOException;
    // 發送動作
    boolean send(ChannelIO channelIO);
    
    void release();
}
public interface Content extends Sendable {
    // 發送類型
    String type();
    
    // 長度
    long length();
}
public class StringContent implements Content{

    private String type;    // MIME type

    private String content;

    private ByteBuffer byteBuffer;

    private static final Charset ascii = StandardCharsets.US_ASCII;

    StringContent(CharSequence c, String t) {
        content = c.toString();
        type = t + "; charset=iso-8859-1";
    }

    StringContent(CharSequence c) {
        this(c, "text/plain");
    }

    StringContent(Exception x) {
        StringWriter sw = new StringWriter();
        x.printStackTrace(new PrintWriter(sw));
        type = "text/plain; charset=iso-8859-1";
        content = sw.toString();
    }

    @Override
    public String type() {
        return type;
    }

    @Override
    public long length() {
        return byteBuffer.remaining();
    }

    @Override
    public void prepare() throws IOException {
        encode();
        // 在寫入之前就需要調用一下rewind方法
        byteBuffer.rewind();
    }

    private void encode() {
        if (byteBuffer == null){
            byteBuffer =  ascii.encode(CharBuffer.wrap(content));
        }
    }

    @Override
    public boolean send(ChannelIO channelIO) throws IOException {
        if (byteBuffer == null)
            throw new IllegalStateException();
        // 寫的時候 不見得一次寫完
        channelIO.write(byteBuffer);
        // hasRemaining 代表是否還有剩餘
        // 如果有剩餘就可以寫着寫
        return  byteBuffer.hasRemaining();
    }

    /**
     * 這是個空方法
     * 後面只是為了統一調用
     */
    @Override
    public void release() {

    }
}
public class Reply implements Sendable {

    static class Code{

        private int number;

        private String description;

        public Code(int number, String description) {
            this.number = number;
            this.description = description;
        }

        static Code OK = new Code(200,"OK");

        static Code BAD_REQUEST = new Code(400,"Bad Request");

        static Code NOT_FOUND = new Code(404,"Not Found");

        static Code METHOD_NOT_ALLOWED = new Code(405,"Method Not Allowed");
    }

    private Code code;

    private Content content;

    private ByteBuffer headerBuffer;


    private static String CRLF = "\r\n";

    private static Charset ascii = Charset.forName("US-ASCII");

    public Reply(Code code, Content content) {
        this.code = code;
        this.content = content;
    }

    /**
     * 這個方法負責添加請求頭
     * @return
     */
    private ByteBuffer headers(){
        CharBuffer cb = CharBuffer.allocate(1024);
        cb.put("HTTP/1.0 ").put(code.toString()).put(CRLF);
        cb.put("Server: niossl/0.1").put(CRLF);
        cb.put("Content-type: ").put(content.type()).put(CRLF);
        cb.put("Content-length: ")
                .put(Long.toString(content.length())).put(CRLF);
        cb.put(CRLF);
        cb.flip();
        return ascii.encode(cb);
    }

    @Override
    public void prepare() throws IOException {
        content.prepare();
        headerBuffer = headers();
    }

    @Override
    public boolean send(ChannelIO channelIO) throws IOException {
         // 先寫請求頭
        if (headerBuffer.hasRemaining()){
            if (channelIO.write(headerBuffer) <= 0)
                return true;
        }
        // 再寫響應內容
        if (content.send(channelIO))
            return true;
        return false;
    }

    @Override
    public void release() {
        content.release();
    }
}

連接建立之後開始提取數據:

public class RequestServicer implements Runnable {


    private ChannelIO channelIO;

    public RequestServicer(ChannelIO channelIO) {
        this.channelIO = channelIO;
    }

    private void service() throws IOException {
        ByteBuffer byteBuffer = receive(); // 接收數據
        Request request = null;
        Reply reply = null;
        try {
            request = Request.parse(byteBuffer);
        } catch (RequestException e) {
            reply = new Reply(Reply.Code.BAD_REQUEST, new StringContent(e));
        }
        // 説明正常解析
        if (reply == null) {
            reply  = build(request); // 構建回覆
        }
        reply.prepare();
        do {} while (reply.send(channelIO));         // Send
    }

    @Override
    public void run() {
        try {
            service();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    ByteBuffer receive() throws IOException {
        for (; ; ) {
            int read = channelIO.read();
            ByteBuffer bb = channelIO.getReadBuf();
            if ((read < 0) || (Request.isComplete(bb))) {
                bb.flip();
                return bb;
            }
        }
    }

    Reply build(Request rq) throws IOException {

        Reply rp = null;
        Request.Action action = rq.action();
        if ((action != Request.Action.GET)) {
            rp = new Reply(Reply.Code.METHOD_NOT_ALLOWED,
                    new StringContent(rq.toString()));
            rp.prepare();
            return rp;
        }
        rp = new Reply(Reply.Code.OK,
                new StringContent("hello world"));
        rp.prepare();
        return rp;
    }
}

NIO簡介

上面模型也被稱為BIO模型,也就是Blocking Input/Output, 其實上面已經分析出來在哪裏了,也就是讀數據的時候未必可以讀,但是我們的read調用就被阻塞在那裏,我們自然能夠想到能否讓操作系統為我們提供一個非阻塞的read函數, 這個 read 函數的效果是,如果沒有數據到達時(到達網卡並拷貝到了內核緩衝區),立刻返回一個錯誤值(-1),而不是阻塞地等待。操作系統提供了這樣的功能,只需要在調用 read 前,將文件描述符設置為非阻塞即可。這樣我們在線程裏面調用read函數,直到返回值不為-1的,再開始處理業務。 但是在數據到達內核緩衝區,這個階段仍然是阻塞的,需要等待數據從內核緩衝區拷貝到用户緩衝區,才能返回。

這裏可以為每個連接準備一個線程來處理,這其實也是解決問題的方案,一些連接請求不多的HTTP服務器現在還是這麼處理的,那麼對於連接過多的,多線程就有些乏力了,當然也可以有聰明的方法,我們可以每accept一個連接之後,將這個文件描述符(可以理解為Socket的引用)放在一個數組裏面,然後弄一個新的線程去不斷遍歷這個數組,調用每一個元素的非阻塞 read 方法,這樣,我們就成功用一個線程處理了多個客户端連接。

這看起來就有多路複用的意思了,但這和我們用多線程去將阻塞 IO 改造成看起來是非阻塞 IO 一樣,這種遍歷方式也只是我們用户自己想出的小把戲,每次遍歷遇到 read 返回 -1 時仍然是一次浪費資源的系統調用。所以,還是得懇請操作系統老大,提供給我們一個有這樣效果的函數,我們將一批文件描述符通過一次系統調用傳給內核,由內核層去遍歷,才能真正解決這個問題。

select 是操作系統提供的系統調用函數,通過它,我們可以把一個文件描述符的數組發給操作系統, 讓操作系統去遍歷,確定哪個文件描述符可以讀寫, 然後告訴我們去處理。

但是這個函數仍然不完美,原因在於:

  1. select 調用需要傳入 fd 數組,需要拷貝一份到內核,高併發場景下這樣的拷貝消耗的資源是驚人的。(可優化為不復制)
  2. select 在內核層仍然是通過遍歷的方式檢查文件描述符的就緒狀態,是個同步過程,只不過無系統調用切換上下文的開銷。(內核層可優化為異步事件通知)
  3. select 僅僅返回可讀文件描述符的個數,具體哪個可讀還是要用户自己遍歷。(可優化為只返回給用户就緒的文件描述符,無需用户做無效的遍歷)

但也不是不能用,但select還有限制,這個限制就是select 只能監聽 1024 個文件描述符的限制,後面的poll去掉了這個限制。最終解決select函數的大boss叫epoll,針對select函數的三個不完美的點進行了修復:

  1. 內核中保存一份文件描述符集合,無需用户每次都重新傳入,只需告訴內核修改(添加、修改、監控的文件描述符)的部分即可。
  2. 內核不再通過輪詢的方式找到就緒的文件描述符,而是通過異步 IO 事件喚醒。
  3. 內核僅會將有 IO 事件的文件描述符返回給用户,用户也無需遍歷整個文件描述符集合。

重回NIO

“內核僅會將有IO事件的文件描述符返回給用户”,仔細讀這一句話,我以為是select函數的返回值是一個集合,但是我去看了一下這個函數:

int select(int maxfd, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);

fd是一個集合類型,從參數名字上我們來推斷readfds傳入需要監視讀事件的文件描述符,writefds是需要監視讀事件的文件描述符,exceptfds是異常事件的文件描述符,這裏我們提到了文件描述符,這個文件描述符是用來代表一個打開的文件、或者socket、或者其他數據源。定義了能對該文件做的操作。當select函數有所返回的時候,會修改傳入的集合。select函數是系統調用,Java層面對應的抽象也就是Selector,使用起來倒是簡單:

Selector selector = Selector.open();
selector.select();

默認選擇當前操作系統的實現,我們看下Open的實現,我的電腦裝的操作系統是Windows,select的實現在Oracle 的hotspot VM中是閉源的,觀察他的實現要在OpenJDK上,我這裏隨手選了一個JDK 11版本的實現:

那怎麼讓這個選擇器知道我對某個事件感興趣 , 讀事件就緒、寫事件就緒其實通道(通道也就是對連接的抽象)上發生的事件,按照我之前的想法Selector這個類裏面應該會有一個register之類的方法,但是沒找到,不在Selector就在ServerSocketChannel裏面,果然我在ServerSocketChannel找到了register方法, 這個register來自AbstractSelectableChannel。

// ops是一個枚舉值,att是對應的事件觸發之後,交付給哪個對象處理
public final SelectionKey register(Selector sel, int ops, Object att)

所以我們可以寫成下面這樣:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080),1024);
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
Runnable runnable = ()-> {
    System.out.println("hello world");
};
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,runnable);
while (true){
        selector.select();
        Set<SelectionKey> selectKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectKeys.iterator();
        while(iterator.hasNext()) {
            SelectionKey selectionKey = iterator.next();
            Runnable handle = (Runnable)selectionKey.attachment();
            handle.run();
            iterator.remove();
        }
}

解釋一下這段代碼,我們首先將ServerSocketChannel變成非阻塞模式 , 然後綁定監聽的端口,打開一個選擇器,然後通過register方法將ServerSocketChannel納入Selector的管轄範圍之內之內,告知選擇器這個Channel對連接就緒這個事件感興趣,然後調用Selector的select方法,這個方法上直到有就緒事件之前一直會阻塞,然後從選擇器上獲取SelectionKey集合,那麼該如何理解這個SelectionKey呢? 我們翻下SelectionKey的註釋:

A token representing the registration of a SelectableChannel with a Selector.

表示Selector和SelectableChannel註冊的標記

這個SelectableChannel是何方神聖,我們翻閲一下:

SocketChannel和ServerSocketChannel都繼承了這個類,上面我們使用的register就來自於這個類, 現在我們大致能理清楚Selector和這兩個Channel之間的關係:

連接建立之後我們可以通過ServerSocketChannel的accept方法拿到SocketChannel,通過SocketChannel可以讀寫數據,但是在NIO下面讀未必是就緒的,在SelectableChannel裏面提供了向選擇器註冊感興趣事件的方法,而ServerSocketChannel和SocketChannel又繼承SelectableChannel。

A selection key is created each time a channel is registered with a selector.

每次通道註冊到Selector的時候都會創建一個SelectionKey

SelectionKey關聯了Channel,Selector向我們返回連接就緒,可讀可寫的時候,返回一個SelectionKey,然後我們通過這個SelectionKey去拿註冊的對象和通道,然後接着向選擇器註冊可讀、可寫事件。

那麼對於TCP協議來説,讀和連接就緒相對不可控一些,需要客户端主動發起,那麼寫就緒呢,一個Socket創建之後,操作系統會為他分配資源,相對寫比較可控,那什麼時候寫呢,原則上我們在連接建立之後就可以馬上寫,如果寫到內核的緩衝區滿了不能寫,我們再向選擇器註冊寫事件,注意寫完資源要馬上取消對這個事件感興趣,要不然選擇器上就一直會有就緒的寫事件返回出來。我們回想一下HTTP是收完客户端發完的報文,再寫,那麼對於WebSocket來説連接建立,雙方就能隨時寫。本篇我們寫是一個HTTP服務器,所以我們設計的是在讀完之後再註冊對寫事件感興趣。

A key remains valid until it is cancelled by invoking its cancel method, by closing its channel, or by closing its selector.

這個Key會一直有效,直到調用cancel方法,關閉其通道,關閉選擇器。

Cancelling a key does not immediately remove it from its selector; it is instead added to the selector's cancelled-key set for removal during the next selection operation.

當你選擇從Selector中取消一個鍵的時候,這個鍵不會立刻從它所屬的Selector中被移除,相反,這個取消的鍵會被加入到已取消的集合中,下一次選擇操作的時候,才會被移除。

The validity of a key may be tested by invoking its isValid method.

可以通過調用鍵的 isValid 方法來測試鍵的有效性。

上面我們給的示例,事實上會一直輸出Hello World,原因在於我們沒有處理這個連接,也就是調用對應的accept方法,所以Selector上一直都會有就緒的事件。理解了這一步,我們就能給出NIO的實現了,首先向Selector註冊對連接就緒事件感興趣,連接就緒之後,我們向Selector註冊對讀事件感興趣,在接收玩HTTP報文之後,我們就可以寫,如果寫不完,我們可以向選擇器上註冊對寫事件就緒,就緒的時候調用對應的處理者:

// 這裏負責註冊
public interface Dispatcher extends Runnable {
    void register(SelectableChannel selectableChannel, int ops, HttpHandler httpHandler) throws ClosedChannelException;
}

public class Dispatcher1 implements Dispatcher{
    private Selector selector;
    
    public Dispatcher1() throws IOException {
        selector = Selector.open();
    }

    @Override
    public void register(SelectableChannel selectableChannel, int ops, HttpHandler httpHandler) throws ClosedChannelException {
        selectableChannel.register(selector,ops,httpHandler);
    }

    @Override
    public void run() {
        // 無限循環分發事件
        for(;;){
            try {
                dispatch();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch() throws IOException {
       // 阻塞到至少有一個就緒事件
        selector.select();
        for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();){
            SelectionKey selectionKey = iterator.next();
            iterator.remove();
            // 獲取對應的register方法
            HttpHandler httpHandler = (HttpHandler)selectionKey.attachment();
            // 交給對應的處理者處理
            httpHandler.handler(selectionKey);
        }
    }
}
// 這裏用接口是為了統一處理
public interface HttpHandler {
    void handler(SelectionKey selectionKey);
}
public class AcceptHandler implements HttpHandler{

    private ServerSocketChannel serverSocketChannel;

    private Dispatcher dispatcher;

    public AcceptHandler(ServerSocketChannel serverSocketChannel, Dispatcher dispatcher) {
        this.serverSocketChannel = serverSocketChannel;
        this.dispatcher = dispatcher;
    }

    @Override
    public void handler(SelectionKey selectionKey) throws IOException {    
        SocketChannel socketChannel = serverSocketChannel.accept();     
        // 註冊對讀事件感興趣
            dispatcher.register(socketChannel,SelectionKey.OP_READ,new questHandler(dispatcher,ChannelIO.getInstance(socketChannel,false)));

    }
}
public class NonBlockingServer extends Server{
    public NonBlockingServer(int port, int backlog, boolean secure) throws IOException {
        super(port, backlog, secure);
        serverSocketChannel.configureBlocking(false);
    }

    @Override
    protected void runServer() throws IOException {
        Dispatcher dispatcher = new Dispatcher1();
        AcceptHandler acceptHandler = new AcceptHandler(serverSocketChannel,dispatcher);
        dispatcher.register(serverSocketChannel, SelectionKey.OP_ACCEPT,acceptHandler);
        dispatcher.run();
    }
}

接下來讓我們來想想這個RequestHandler的邏輯應該是什麼,我們做的是一個HTTP Server,所以理應是先接收報文,等報文接收完畢,再解析報文,報文解析之後構建響應,寫回復。

public class RequestHandler implements HttpHandler{

    private Dispatcher dispatcher;

    private ChannelIO channelIO;

    private Request request = null;

    private boolean requestReceived;

    private ByteBuffer rbb = null;

    private Reply reply;

    public RequestHandler(Dispatcher dispatcher, ChannelIO channelIO) {
        this.dispatcher = dispatcher;
        this.channelIO = channelIO;
    }

    /**
     * 先接收數據,後解析, 然後是發送
     * @param selectionKey
     */
    @Override
    public void handler(SelectionKey selectionKey) throws IOException {
        SelectableChannel  selectableChannel  = selectionKey.channel();
        if (request == null){
            if (!received()){
                return;
            }
            rbb = channelIO.getReadBuf();
            rbb.flip();

            if (requestParse(rbb)){
                reply = replyBuild();
            }
            reply.prepare();
            // 開始構建回覆
            if (replySend()){
                dispatcher.register(selectableChannel,SelectionKey.OP_WRITE,new ResponseHandler());
            }
            channelIO.close();
            if (reply != null){
                reply.release();
            }

        }
    }
    private boolean replySend() {
        try {
            return  reply.send(channelIO);
        } catch (IOException e) {
            System.out.println("處理異常");
            return false;
        }
    }

    private Reply replyBuild() throws IOException {
        Reply rp = null;
        Request.Action action = request.action();
        if ((action != Request.Action.GET)) {
            rp = new Reply(Reply.Code.METHOD_NOT_ALLOWED,
                    new StringContent(request.toString()));
            rp.prepare();
            return rp;
        }
        rp = new Reply(Reply.Code.OK,
                new StringContent("hello world"));
        rp.prepare();
        return rp;

    }

    private boolean requestParse(ByteBuffer rbb) {
        try {
            request =  Request.parse(rbb);
            return true;
        } catch (RequestException e) {
            reply = new Reply(Reply.Code.BAD_REQUEST,
                    new StringContent("解析異常"));
        }
        return false;
    }

    private boolean received() throws IOException {

        if (requestReceived) {
            return true;
        }
        // 讀取完畢,將對應
        if ((channelIO.read() < 0) || Request.isComplete(channelIO.getReadBuf())){
            return requestReceived = true;
        }
        return false;
    }
}
public class ResponseHandler implements HttpHandler{

    /**
     *  只是完善這個模型,我們的寫一次就到位了
     * @param selectionKey
     * @throws IOException
     */
    @Override
    public void handler(SelectionKey selectionKey) throws IOException {
        System.out.println();
        selectionKey.cancel();
    }
}

到現在為止我們用Java 的Socket API 已經基本構建了一個NIO模型的HTTP服務器,這裏沒有一股腦將相關API全部列在這裏,我選擇的風格是我喜歡教科書的風格,用到什麼講什麼,一個應用服務器整體是不斷處理請求的,所以我們做了一個Dispatch,用來分發Selector上的就緒事件,先是註冊對連接就緒這個事件感興趣的處理器我們調用的是SelectableChannel中的register方法:

register(Selector sel, int ops, Object att)

第三個參數在事件被觸發的時候,我們可以用SelectionKey的attachment拿到這個對象,為了統一調用我們讓對應都實現了HttpHandler接口。注意到現在為止我們只有一個線程在處理所有的事件,這可以做的很快,如果是純內存操作的話,但是如果寫入的內容有些大導致,其他連接就可能就要排隊,這無疑是我們不想看到的,所以上面的模型我們可以對其進行改造,將接受連接的處理器,單獨啓用一個線程來處理,後面讀寫的掛一個線程池來處理。這就是單線程的Reactor模式和多線程的Reactor模式的雛形。

NIO解析

所謂的BIO,Blocking在read上面,讀數據的時候未必還可以讀,數據的流轉需要一個過程,但是我們不希望被阻塞在這裏,我們自然能夠想到,能否讓操作系統為我們提供一個非阻塞的read函數,這個read函數的效果是,如果沒有數據到達時,立刻返回一個值,而不是立刻等待。

當然我們可以為每一個連接準備一個線程來處理,這也不是不能用,現在來説一些應用服務器還有停留在BIO模式的,但對於連接過多的,我們就需要考慮別的解決方案了,當然也有聰明的方法,我們可以每accept一個連接之後,將這個文件描述符放在一個數組裏面,然後弄一個新的線程去不斷遍歷這個數組,調用每一個元素的非阻塞read方法,這樣我們就可以用一個線程來處理多個客户端的連接。

這看起來就有點多路複用的意思了,但是這只是用户態的NIO,我們仍然需要發起系統調用,系統調用相當昂貴,所以我們得懇求操作系統老大,提供給我們一個這樣的函數,我們將一批文件描述符通過一次調用傳給內核,由內核層去遍歷,才真正去解決這個問題。

select是Linux提供的系統調用函數,通過它,我們可以把一個文件描述符的數組發給操作系統,讓操作系統去遍歷,確定哪個文件描述可以讀寫,然後告訴我們去處理。但是這個函數仍然不完美,原因在於:

  1. select 調用需要傳入 fd 數組,需要拷貝一份到內核,高併發場景下這樣的拷貝消耗的資源是驚人的。(可優化為不復制)
  2. select 在內核層仍然是通過遍歷的方式檢查文件描述符的就緒狀態,是個同步過程,只不過無系統調用切換上下文的開銷。(內核層可優化為異步事件通知)
  3. select 僅僅返回可讀文件描述符的個數,具體哪個可讀還是要用户自己遍歷。(可優化為只返回給用户就緒的文件描述符,無需用户做無效的遍歷)

但也不是不能用,但select還有限制,這個限制就是select 只能監聽 1024 個文件描述符的限制,後面的poll去掉了這個限制。最終解決select函數的大boss叫epoll,針對select函數的三個不完美的點進行了修復:

  1. 內核中保存一份文件描述符集合,無需用户每次都重新傳入,只需告訴內核修改(添加、修改、監控的文件描述符)的部分即可。
  2. 內核不再通過輪詢的方式找到就緒的文件描述符,而是通過異步 IO 事件喚醒。
  3. 內核僅會將有 IO 事件的文件描述符返回給用户,用户也無需遍歷整個文件描述符集合。

總結一下

我們總結一下構建一個應用服務器的要點,也就是管理連接、讀取數據、構建回覆,這是三個最核心的問題,那麼我們自然能夠想到能不能將這些封裝起來,我們只用關心處理的邏輯就行,這也就是我們將在下一篇講的Netty,上面我們我們沒有提到拆包的概念,原因我認為這是應用層報文需要考慮的問題,報文的結束標誌位是什麼。這篇文章寫了一段時間,倒是有點超出意料,每次寫感覺都對網絡的理解更深刻了一些,然後看不清的東西又多了一些。

參考資料

[1] what happens after read is called for a Linux socket https://stackoverflow.com/questions/10226294/what-happens-after-read-is-called-for-a-linux-socket

[2] What is the difference between the kernel space and the user space? https://stackoverflow.com/questions/5957570/what-is-the-difference-between-the-kernel-space-and-the-user-space

[3] What is difference between User space and Kernel space? https://unix.stackexchange.com/questions/87625/what-is-differ...

[4] Linux網絡數據包接受過程 https://simonzgx.github.io/2020/08/17/Linux%E7%BD%91%E7%BB%9C...

[5] Kernel Mode Definition https://www.linfo.org/kernel_mode.html

[6] What are high memory and low memory on Linux? https://unix.stackexchange.com/questions/4929/what-are-high-m...

[7] Implementing System Calls https://www.cs.swarthmore.edu/~kwebb/cs45/s19/labs/lab2.html

[8] LinuxSystemCalls.pdf http://comet.lehman.cuny.edu/jung/cmp426697/LinuxSystemCalls.pdf

[9] The Operating System https://www.cs.swarthmore.edu/~kwebb/cs31/s15/bucs/system_cal...

[10] Difference between System call and System call service routines https://stackoverflow.com/questions/70410917/difference-between-system-call-and-system-call-service-routines

[11] what is “java.net.SocketException: Connection reset” https://learn.redhat.com/t5/General/what-is-java-net-SocketEx...

[12] What does "connection reset by peer" mean? https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean

[13] TCP: Differences Between FIN and RST https://www.baeldung.com/cs/tcp-fin-vs-rst

[14] FIN vs RST in TCP connections https://stackoverflow.com/questions/13049828/fin-vs-rst-in-tcp-connections

[15] TCP學習筆記(二) 相識篇 https://juejin.cn/post/7103092974841511950#heading-2

[16] TCP-4-times-close https://wiki.wireshark.org/TCP-4-times-close.md

[18] What does "connection reset by peer" mean? https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean

[19] linux select函數解析以及事例 https://zhuanlan.zhihu.com/p/57518857

user avatar tracy_5cb7dfc1f3f67 頭像 u_16213560 頭像
2 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.