作者:京東物流 張弓言
一、背景
Netty 是一款優秀的高性能網絡框架,內部通過 NIO 的方式來處理網絡請求,在高負載下也能可靠和高效地處理 I/O 操作
作為較底層的網絡通信框架,其被廣泛應用在各種中間件的開發中,比如 RPC框架、MQ、Elasticsearch等,這些中間件框架的底層網絡通信模塊大都利用到了 Netty 強大的網絡抽象
下面這篇文章將主要對 Netty 中的各個組件進行分析,並在介紹完了各個組件之後,通過 JSF 這個 RPC 框架為例來分析 Netty 的使用,希望讓大家對 Netty 能有一個清晰的瞭解
二、Netty Server
通過 Netty 來構建一個簡易服務端是比較簡單的,代碼如下:
public class NettyServer {
public static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("Handler Added");
}
})
.childHandler(new ServerChannelInitializer())
.bind(8100);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOGGER.info("Netty Server Start !");
}
}
});
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
上面代碼的主要邏輯如下:
- 新建服務端引導啓動類 ServerBootstrap,內部封裝了各個組件,用來進行服務端的啓動
- 新建了兩個 EventLoopGroup 用來進行連接處理,此時可以簡單的將 EventLoopGroup 理解為多個線程的集合。bossGroup 中的線程用來處理新連接的建立,當新連接建立後,workerGroup 中的每個線程則都會和唯一的客户端 Channel 連接進行綁定,用來處理該 Channel 上的讀、寫事件
- 指定服務端創建的 Channel 類型為 NioServerSocketChannel
- childOption 用來配置客户端連接的 NioSocketChannel 底層網絡參數
- handler 用來指定針對服務端 Channel 的處理器,內部定義了一系列的回調方法,會在服務端 Channel 發生指定事件時進行回調
- childHandler 用來指定客户端 Channel 的處理器,當客户端 Channel 中發生指定事件時,會進行回調
- bind 指定服務端監聽端口號
三、Netty Client
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 1. 啓動類
ChannelFuture channelFuture = new Bootstrap()
// 2. 添加 EventLoop
.group(workGroup)
// 3. 選擇客户端 channel 實現
.channel(NioSocketChannel.class)
// 4. 添加處理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在連接建立後被調用
protected void initChannel(NioSocketChannel ch) throws Exception {
ZAS ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringEncoder());
}
})
// 5. 連接到服務器
.connect(new InetSocketAddress("localhost", 8100));
channelFuture.addListener(future -> {
if (future.isSuccess()) {
((ChannelFuture) future).channel().writeAndFlush("hello");
}
});
channelFuture.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}
上面代碼的主要邏輯如下:
- 新建 Bootstrap 用來進行客户端啓動
- group() 指定一個 NioEventLoopGroup 實例,用來處理客户端連接的建立和後續事件處理
- handler() 指定 Channel 處理器,
- 當將客户端啓動類中的各個屬性都設置完畢後,調用 connect() 方法進行服務端連接
從上面的的兩個例子可以看出,如果想通過 Netty 實現一個簡易的服務器其實是非常簡單的,只需要在啓動引導類中設置好對應屬性,然後完成端口綁定就可以實現。但也正是因為這種簡易的實現方式,導致很多人在學習 Netty 的過程中,發現代碼是寫的出來,但是對內部的組件有什麼作用以及為什麼這麼寫可能就不是很清楚了,因此希望通過這一系列文章來加深大家對 Netty 的理解
四、Netty 基本組件
Channel
Netty 中的 Channel 可以看成網絡編程中的 Socket,其提供了一系列 IO 操作的 API,比如 read、write、bind、connect 等,大大降低了直接使用 Socket 類的複雜性
整體類繼承關係如下:
從上面的繼承關係可以看出,NioSocketChannel 和 NioServerSocketChannel 分別對應客户端和服務端的 Channel,兩者的直接父類不一致,因此對外提供的功能也是不相同的。比如當發生 read 事件時,NioServerSocketChannel 的主要邏輯就是建立新的連接,而 NioSocketChannel 則是讀取傳輸的字節進行業務處理
下面就以 NioServerSocketChannel 為例,帶大家瞭解下該類的初始化過程,整體流程如下:
- 啓動引導類中通過 channel() 指定底層創建的 Channel 類型
- 根據指定的 Channel 類型創建出 ChannelFactory,後續通過該工廠類進行 Channel 的實例化
- 實例化 Channel
channel() 指定 ChannelFactory 類型
在上面的服務端啓動過程中,ServerBootstrap 調用 channel() 方法並傳入 NioServerSocketChannel,其底層代碼邏輯為:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// ReflectiveChannelFactory 構造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
整體邏輯很簡單,通過傳入的 Class 對象指定一個 Channel 反射工廠,後續調用工廠方法獲取指定類型的 Channel 對象
channel 實例化
當服務端啓動引導類 ServerBootstrap 調用 bind() 方法之後,內部會走到 Channel 的實例化過程,代碼精簡如下:
// channel 初始化流程,內部通過 channelFactory 構造
final ChannelFuture initAndRegister() {
channel = channelFactory.newChannel();
}
// channelFactory 的 newChannel 方法邏輯
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
ChannelFactory 的整體邏輯就是通過反射的方式新建 Channel 對象,而 Channel 對象的類型則是在啓動引導類中通過 channel() 方法進行指定的
在實例化 Channel 的過程中,會對其內部的一些屬性進行初始化,而對這些屬性的瞭解,可以使我們對 Netty 中各個組件的作用範圍有一個更加清晰的理解,下面看下 NioServerSocketChannel 的構造函數源碼
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
上述源碼就是一層一層的父類構造,可以對照前面的類關係圖進行閲讀
NioServerSocketChannel 實例化過程中主要完成了以下內部屬性的初始化:
- unsafe 屬性進行賦值為 NioMessageUnsafe,後續 Channel 上事件處理的主要邏輯都是由該類完成
- pipeline 屬性進行初始化賦值,pipeline 是 Channel 中特別重要的一個屬性,後續的所有業務處理器都是通過該 pipeline 組織的
- 指定當前 Channel 的 readInterestOp 屬性為 SelectionKey.OP_ACCEPT,用於後續綁定到 Selector 時指定當前 Channel 監聽的事件類型
- 指定當前 Channel 非阻塞,ch.configureBlocking(false)
總結
對於 Channel 的實例化流程可以總結如下:
- 啓動引導類中通過 channel() 方法指定生成的 ChannelFactory 類型
- 通過 ChannelFactory 來構造對應 Channel,並在實例化的過程中初始化了一些重要屬性,比如 pipeline
ChannelPipeline
ChannelPipeline 也是 Netty 中的一個比較重要的組件,從上面的 Channel 實例化過程可以看出,每一個 Channel 實例中都會包含一個對應的 ChannelPipeline 屬性
ChannelPipeline 初始化
ChannelPipeline 底層初始化源碼:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
從 ChannelPipeline 的構造函數可以看出,每一個 ChannelPipeline 底層都是一個雙向鏈表結構,默認會包含 head 和 tail 頭尾節點,用來進行一些默認的邏輯處理,處理細節會在後續文章中展現
addLast()
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回調 ChannelHandler 中的 handlerAdded() 方法
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用來進行業務處理
整個方法的邏輯為:
- 判斷當前 ChannelHandler 是否已經添加
- 將當前 ChannelHandler 包裝成 ChannelHandlerContext,並將其添加到 ChannelPipeline 的雙向鏈表中
- 回調添加的 ChannelHandler 中的 handlerAdded() 方法
Channel、ChannelPipeline、ChannelHandler 關係
Channel、ChannelPipeline和 ChannelHandler 三者的關係如圖所示:
- 每一個 Channel 中都會包含一個 ChannelPipeline 屬性
- ChannelPipeline 是一個雙向鏈表結構,默認會包含 HeadContext 和 TailContext 兩個節點
- 當向 ChannelPipeline 中添加 ChannelHandler 時,會包裝成 ChannelContext 插入到 ChannelPipeline 鏈表中
- 當 Channel 中發生指定事件時,該事件就會在 ChannelPipeline 中沿着雙向鏈表進行傳播,調用各個 ChannelHandler 中的指定方法,完成相應的業務處理
Netty 正是通過 ChannelPipeline 這一結構為用户提供了自定義業務邏輯的擴展點,用户只需要向 ChannelPipeline 中添加處理對應業務邏輯的 ChannelHandler,之後當指定事件發生時,該 ChannelHandler 中的對應方法就會進行回調,實現業務的處理
ChannelHandler
ChannelHandler 是 Netty 中業務處理的核心類,當有 IO 事件發生時,該事件會在 ChannelPipeline 中進行傳播,並依次調用到 ChannelHandler 中的指定方法
ChannelHandler 的類繼承關係如下:
從上面的類繼承關係可以看出,ChannelHandler 大致可以分為 ChannelInboundHandler 和 ChannelOutboundHandler,分別用來處理讀、寫事件
ChannInboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
在 ChannelInboundHandler 中定義了一系列的回調方法,用户可以實現該接口並重寫相應的方法來自定義的業務邏輯。
重寫方法邏輯是簡單的,但很多人其實不清楚的是這些回調方法到底在什麼場景下會被調用,如何調用,只有瞭解了這些回調方法的調用時機,才能在更適宜的地方完成相應功能
channelRegistered
channelRegistered() 從方法名理解是當 Channel 完成註冊之後會被調用,那麼何為 Channel 註冊?
下面就以 Netty 服務端啓動過程中的部分源碼為例(詳細源碼分析會在後續文章中),看下 channelRegistered() 的調用時機
在 Netty 服務端啓動時,會調用到 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法,精簡代碼如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// neverRegistered 初始值為 true
boolean firstRegistration = neverRegistered;
// 將 Channel 綁定到對應 eventLoop 中的 Selector 上
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 調用 ChannelHandler 中的 ChannelRegistered()
pipeline.fireChannelRegistered();
}
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
從 Netty 底層的 register() 方法可以看出,ChannelHandler 中的 ChannelRegistered() 調用時機是在調用 pipeline.fireChannelRegistered() 時觸發的,此時已經完成的邏輯為:
- 通過傳入的 EventLoopGroup 得到了該 Channel 對應的 EventLoop,並與Channel 中的對應屬性完成了綁定;AbstractChannel.this.eventLoop = eventLoop 邏輯
- 當前 Channel 已經綁定到了對應 EventLoop 中的 Selector 上;doRegister() 邏輯
- ChannelHandler 中的 handlerAdded() 方法已經完成了回調;pipeline.invokeHandlerAddedIfNeeded() 邏輯
因此當 Channel 和對應的 Selector 完成了綁定,Channel 中 pipeline 上綁定的 ChannelHandler 的channelRegisted() 方法就會進行回調
channelActive
上面已經分析了channelRegistered() 方法的調用時機,也就是當 Channel 綁定到了對應 Selector 上之後就會進行回調,下面開始分析 channelActive() 方法的調用時機
對於服務端 Channel,前面還只是將 Channel 註冊到了 Selector 上,還沒有調用到 bind() 方法完成真正的底層端口綁定,那麼有沒有可能當服務端 Channel 完成端口監聽之後,就會調用到 channelActive() 方法呢?
下面繼續分析,在上面完成了 Channel 和 Selector 的註冊之後,Netty 服務端啓動過程中會繼續調用到 io.netty.channel.AbstractChannel.AbstractUnsafe#bind 邏輯:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
在該方法中完成了以下邏輯:
- 完成了 Channel 和本地端口的綁定
- 綁定成功後,isActive() 方法返回 true,此時發佈 ChannelActive 事件,進行方法回調
- safeSetSuccess() 中會回調到服務端啓動過程中添加的 listener 方法,表明當前 Channel 完成了端口綁定
總結:
當 Channel 調用了 bind() 方法完成端口綁定之後,channelActive() 方法會進行回調
channelRead
該方法的調用時機,服務端和客户端是不一致的
服務端 channelRead
服務端 Channel 綁定到 Selector 上時監聽的是 Accept 事件,當客户端有新連接接入時,會回調 channelRead() 方法,完成新連接的接入
Netty 在服務端啓動過程中,會默認添加一個 ChannelHandler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 來處理新連接的接入
客户端 channelRead
當服務端處理完 Accept 事件後,會生成一個和客户端通信的 Channel,該 Channel 也會註冊到對應的 Selector 上,並監聽 read 事件
當客户端向該 Channel 中發送數據時就會觸發 read 事件,調用到 channelRead() 方法(Netty 內部的源碼處理會在後續的文章中進行分析)
exceptionCaught
當前 ChannelHandler 中各回調方法處理過程中如果發生了異常就會回調該方法