簡介
在我的心中,JDK有兩個經典版本,第一個就是現在大部分公司都在使用的JDK8,這個版本引入了Stream、lambda表達式和泛型,讓JAVA程序的編寫變得更加流暢,減少了大量的冗餘代碼。
另外一個版本要早點,還是JAVA 1.X的時代,我們稱之為JDK1.5,這個版本引入了java.util.concurrent併發包,從此在JAVA中可以愉快的使用異步編程。
雖然先JDK已經發展到了17版本,但是併發這一塊的變動並不是很大。受限於JDK要保持穩定的需求,所以concurrent併發包提供的功能並不能完全滿足某些業務場景。所以依賴於JDK的包自行研發了屬於自己的併發包。
當然,netty也不例外,一起來看看netty併發包都有那些優勢吧。
JDK異步緣起
怎麼在java中創建一個異步任務,或者開啓一個異步的線程,每個人可能都有屬於自己的回答。
大家第一時間可能想到的是創建一個實現Runnable接口的類,然後將其封裝到Thread中運行,如下所示:
new Thread(new(RunnableTask())).start()
每次都需要new一個Thread是JDK大神們不可接受的,於是他們產生了一個將thread調用進行封裝的想法,而這個封裝類就叫做Executor.
Executor是一個interface,首先看一下這個interface的定義:
public interface Executor {
void execute(Runnable command);
}
接口很簡單,就是定義了一個execute方法來執行傳入的Runnable命令。
於是我們可以這樣來異步開啓任務:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
看到這裏,聰明的小夥伴可能就要問了,好像不對呀,Executor自定義了execute接口,好像跟異步和多線程並沒有太大的關係呀?
別急,因為Executor是一個接口,所以我們可以有很多實現。比如下面的直接執行Runnable,讓Runnable在當前線程中執行:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
又比如下面的在一個新的線程中執行Runnable:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
又比如下面的將多個任務存放在一個Queue中,執行完一個任務再執行下一個任務的序列執行:
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
這些Executor都非常完美。但是他們都只能提交任務,提交任務之後就什麼都不知道了。這對於好奇的寶寶們是不可忍受的,因為我們需要知道執行的結果,或者對執行任務進行管控。
於是就有了ExecutorService。ExecutorService也是一個接口,不過他提供了shutdown方法來停止接受新的任務,和isShutdown來判斷關閉的狀態。
除此之外,它還提供了單獨調用任務的submit方法和批量調用任務的invokeAll和invokeAny方法。
既然有了execute方法,submit雖然和execute方法基本上執行了相同的操作,但是在方法參數和返回值上有稍許區別。
首先是返回值,submit返回的是Future,Future表示異步計算的結果。 它提供了檢查計算是否完成、等待其完成以及檢索計算結果的方法。 Future提供了get方法,用來獲取計算結果。但是如果調用get方法的同時,計算結果並沒有準備好,則會發生阻塞。
其次是submit的參數,一般來説只有Callable才會有返回值,所以我們常用的調用方式是這樣的:
<T> Future<T> submit(Callable<T> task);
如果我們傳入Runnable,那麼雖然也返回一個Future,但是返回的值是null:
Future<?> submit(Runnable task);
如果我又想傳入Runnable,又想Future有返回值怎麼辦呢?
古人告訴我們,魚和熊掌不可兼得!但是現在是2021年了,有些事情是可以發生改變了:
<T> Future<T> submit(Runnable task, T result);
上面我們可以傳入一個result,當Future中的任務執行完畢之後直接將result返回。
既然ExecutorService這麼強大,如何創建ExecutorService呢?
最簡單的辦法就是用new去創建對應的實例。但是這樣不夠優雅,於是JDK提供了一個Executors工具類,他提供了多種創建不同ExecutorService的靜態方法,非常好用。
netty中的Executor
為了兼容JDK的併發框架,雖然netty中也有Executor,但是netty中的Executor都是從JDK的併發包中衍生出來的。
具體而言,netty中的Executor叫做EventExecutor,他繼承自EventExecutorGroup:
public interface EventExecutor extends EventExecutorGroup
而EventExecutorGroup又繼承自JDK的ScheduledExecutorService:
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>
為什麼叫做Group呢?這個Group的意思是它裏面包含了一個EventExecutor的集合。這些結合中的EventExecutor通過Iterable的next方法來進行遍歷的。
這也就是為什麼EventExecutorGroup同時繼承了Iterable類。
然後netty中的其他具體Executor的實現再在EventExecutor的基礎之上進行擴展。從而得到了netty自己的EventExecutor實現。
Future的困境和netty的實現
那麼JDK中的Future會有什麼問題呢?前面我們也提到了JDK中的Future雖然保存了計算結果,但是我們要獲取的時候還是需要通過調用get方法來獲取。
但是如果當前計算結果還沒出來的話,get方法會造成當前線程的阻塞。
別怕,這個問題在netty中被解決了。
先看下netty中Future的定義:
public interface Future<V> extends java.util.concurrent.Future<V>
可以看到netty中的Future是繼承自JDK的Future。同時添加了addListener和removeListener,以及sync和await方法。
先講一下sync和await方法,兩者都是等待Future執行結束。不同之處在於,如果在執行過程中,如果future失敗了,則會拋出異常。而await方法不會。
那麼如果不想同步調用Future的get方法來獲得計算結果。則可以給Future添加listener。
這樣當Future執行結束之後,會自動通知listener中的方法,從而實現異步通知的效果,其使用代碼如下:
EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
Future<?> f = group.submit(new Runnable() { ... });
f.addListener(new FutureListener<?> {
public void operationComplete(Future<?> f) {
..
}
});
還有一個問題,每次我們提交任務的時候,都需要創建一個EventExecutorGroup,有沒有不需要創建就可以提交任務的方法呢?
有的!
netty為那些沒有時間創建新的EventExecutorGroup的同志們,特意創建一個全局的GlobalEventExecutor,這是可以直接使用的:
GlobalEventExecutor.INSTANCE.execute(new Runnable() { ... });
GlobalEventExecutor是一個單線程的任務執行器,每隔一秒鐘回去檢測有沒有新的任務,有的話就提交到executor執行。
總結
netty為JDK的併發包提供了非常有用的擴展。大家可以直接使用。
本文已收錄於 http://www.flydean.com/46-netty-future-executor/
最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!
歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!