沒看過的建議先看上一篇,本來打算講講linux內核,也看了一些書籍,可是c放了太久了,看代碼實在頭疼,就先放棄了,寫寫業務也沒必要卷這麼深吧。就講到調用底層api為止我覺得剛剛好。不太擅長將源碼結合講故事,所以整片略顯枯燥,將就看下吧~~
demo
public class ServerConnect
{
public static void main(String[] args)
{
selector();
}
public static void handleAccept(SelectionKey key) throws IOException{
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));
}
public static void handleRead(SelectionKey key) throws IOException{
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buf = (ByteBuffer)key.attachment();
long bytesRead = sc.read(buf);
while(bytesRead>0){
buf.flip();
while(buf.hasRemaining()){
System.out.print((char)buf.get());
}
buf.clear();
bytesRead = sc.read(buf);
}
if(bytesRead == -1){
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException{
ByteBuffer buf = (ByteBuffer)key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while(buf.hasRemaining()){
sc.write(buf);
}
buf.compact();
}
public static void selector() {
Selector selector = null;
ServerSocketChannel ssc = null;
try{
selector = Selector.open();
ssc= ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
if(selector.select(3000) == 0){
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
if(key.isAcceptable()){
handleAccept(key);
}
if(key.isReadable()){
handleRead(key);
}
if(key.isWritable() && key.isValid()){
handleWrite(key);
}
if(key.isConnectable()){
System.out.println("isConnectable = true");
}
iter.remove();
}
}
}catch(IOException e){
e.printStackTrace();
}finally{
try{
if(selector!=null){
selector.close();
}
if(ssc!=null){
ssc.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
}
}
源碼分析
Selector.open()
此處會有SelectorProvider是個模板方法,不用系統的實現不同,這就很蛋疼,沒有EPollSelectorProvider,莫的辦法,只能去github找了一份源碼遷移到gitee,jdk8感興趣的可以自行下載,想看其他版本的同學就要自行尋找了。
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
IOUtil.makePipe(false)
IOUtil.makePipe(false);是一個JNI方法對應的路徑為/jdk8u_jdk/src/solaris/native/sun/nio/ch/IOUtil.c
- 底層會調用
int pipe(int pipefd[2]); -
函數pipe()會建立管道,並將文件描述詞由參數pipefd數組返回。
- pipefd[0]為管道里的讀取端
- pipefd[1]則為管道的寫入端
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
int fd[2];
//調用pipe函數
if (pipe(fd) < 0) {
JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
return 0;
}
if (blocking == JNI_FALSE) {
if ((configureBlocking(fd[0], JNI_FALSE) < 0)
|| (configureBlocking(fd[1], JNI_FALSE) < 0)) {
JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
close(fd[0]);
close(fd[1]);
return 0;
}
}
//2個地址合併成long返回
return ((jlong) fd[0] << 32) | (jlong) fd[1];
}
new EPollArrayWrapper()
這個類呢,類註釋寫的很清楚了,這裏剪短的概述下,這個類就是操作epoll相關的結構體的,所以啦,裏面也基本上都是JNI方法
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
//創建epoll結構體
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
new AllocatedNativeObject(allocationSize, true);
下面pollArray = new AllocatedNativeObject(allocationSize, true);
pollArray註釋為:來自 epoll_wait 的結果的 epoll_event 數組,但是看源碼吧,也算不上數組,就開闢了一個內存塊。
protected NativeObject(int var1, boolean var2) {
//略
if (!var2) {
this.allocationAddress = unsafe.allocateMemory((long)var1);
this.address = this.allocationAddress;
} else {
//獲取內存頁數
int var3 = pageSize();
//開闢內存空間,返回地址
long var4 = unsafe.allocateMemory((long)(var1 + var3));
this.allocationAddress = var4;
this.address = var4 + (long)var3 - (var4 & (long)(var3 - 1));
}
}
pollWrapper.initInterrupt(fd0, fd1)
fd0:前面pipe申請的讀取端的文件描述符
上一篇我們介紹了epoll_ctl()函數,
這裏我們重温下。epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
向epfd添加fd0且fd0可讀,over~~
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
ServerSocketChannel.open()
下面就來到了channel,內部調用為sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(java.nio.channels.spi.SelectorProvider)
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
Net.serverSocket(true)
這裏看個方法名就大概知道這是創建socket套接字的地方,各種協議規則之類的就跳過了,感興趣的可以研究下。
static FileDescriptor serverSocket(boolean var0) {
return IOUtil.newFD(socket0(isIPv6Available(), var0, true, fastLoopback));
}
JNIEXPORT int JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
jboolean stream, jboolean reuse, jboolean ignored)
{
int fd;
int type = (stream ? SOCK_STREAM : SOCK_DGRAM);
#ifdef AF_INET6
int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;
#else
int domain = AF_INET;
#endif
//創建套接字
fd = socket(domain, type, 0);
if (fd < 0) {
return handleSocketError(env, errno);
}
#ifdef AF_INET6
/* Disable IPV6_V6ONLY to ensure dual-socket support */
if (domain == AF_INET6) {
int arg = 0;
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
sizeof(int)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IPV6_V6ONLY");
close(fd);
return -1;
}
}
#endif
if (reuse) {
int arg = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set SO_REUSEADDR");
close(fd);
return -1;
}
}
#if defined(__linux__)
if (type == SOCK_DGRAM) {
int arg = 0;
int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP;
if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) &&
(errno != ENOPROTOOPT)) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IP_MULTICAST_ALL");
close(fd);
return -1;
}
}
#endif
#if defined(__linux__) && defined(AF_INET6)
/* By default, Linux uses the route default */
if (domain == AF_INET6 && type == SOCK_DGRAM) {
int arg = 1;
if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IPV6_MULTICAST_HOPS");
close(fd);
return -1;
}
}
#endif
return fd;
}
這裏newFD,就是創建一個對應的java對象,setfdVal()就是把文件描述符地址,放入到java對象中
public static FileDescriptor newFD(int var0) {
FileDescriptor var1 = new FileDescriptor();
setfdVal(var1, var0);
return var1;
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val)
{
(*env)->SetIntField(env, fdo, fd_fdID, val);
}
ssc.socket().bind()
無聊的省略了,最後會到sun.nio.ch.ServerSocketChannelImpl#bind
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
synchronized(this.lock) {
if (!this.isOpen()) {
throw new ClosedChannelException();
} else if (this.isBound()) {
throw new AlreadyBoundException();
} else {
//端口
InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
SecurityManager var5 = System.getSecurityManager();
if (var5 != null) {
var5.checkListen(var4.getPort());
}
NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
//最終會調用bind和listen
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2 < 1 ? 50 : var2);
synchronized(this.stateLock) {
this.localAddress = Net.localAddress(this.fd);
}
return this;
}
}
}
bind()
可以看到,channel裏面封裝的fd(Socket文件描述符),後續也是把這個綁定端口。
public static void bind(FileDescriptor var0, InetAddress var1, int var2) throws IOException {
bind(UNSPEC, var0, var1, var2);
}
static void bind(ProtocolFamily var0, FileDescriptor var1, InetAddress var2, int var3) throws IOException {
boolean var4 = isIPv6Available() && var0 != StandardProtocolFamily.INET;
bind0(var1, var4, exclusiveBind, var2, var3);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6,
jboolean useExclBind, jobject iao, int port)
{
SOCKADDR sa;
int sa_len = SOCKADDR_LEN;
int rv = 0;
if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *)&sa, &sa_len, preferIPv6) != 0) {
return;
}
//
rv = NET_Bind(fdval(env, fdo), (struct sockaddr *)&sa, sa_len);
if (rv != 0) {
handleSocketError(env, errno);
}
}
int
NET_Bind(int fd, struct sockaddr *him, int len)
{
//略
rv = bind(fd, him, len);
//略
return rv;
}
listen()
listen()就很直白了,就相當於直接調用本地方法。
static native void listen(FileDescriptor var0, int var1) throws IOException;
JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog)
{
if (listen(fdval(env, fdo), backlog) < 0)
handleSocketError(env, errno);
}
ssc.configureBlocking(false)
將文件描述符設置為NIO,簡單描述下略過啦
configureBlocking(int fd, jboolean blocking)
{
int flags = fcntl(fd, F_GETFL);
int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}
ssc.register(selector, SelectionKey.OP_ACCEPT);
在此之前先梳理下,
selector:就相當於epoll結構體,添加了pipe()創建的讀取端文件描述符channel:封裝了創建的socket,記錄了套接字文件描述符,並且綁定了本地端口,並且開始監聽請求
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
//略
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
//att :null 這裏先忽略
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
if (!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
var4.attach(var3);
synchronized(this.publicKeys) {
//對應不同系統的實現
this.implRegister(var4);
}
var4.interestOps(var2);
return var4;
}
}
implRegister()
SelectionKey內部封裝了channel,前面已經很明顯的看到,channel創建的Socket(套接字)和epoll結構體之間並沒有顯示關聯,在這裏,就是進行關聯。下面是Epoll的實現
fdToKey是selector初始化的時候創建的類型為Map<Integer,SelectionKeyImpl>,就是將文件描述符和SelectionKey關聯起來,最終會添加到EpollArrayWrapper::eventsLow,此時events=0
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
//pollWrapper.add(fd);最終會調用
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
在隨後的var4.interestOps(var2);吧eventsLow數組中event從0改成POLLIN,此時還沒有調用epollCtl()添加進epoll結構體
selector.select(3000)
直接從sun.nio.ch.EPollSelectorImpl#doSelect開始
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
poll()
這裏就可以很容易的看出來在每次poll()的時候會先把未註冊的Socket套接字,通過調用epollCtl()添加進epoll結構體中
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
//這一段我猜測是中斷操作,因為就算設置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也會修改成false。如有dalao歡迎告訴我
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
//這裏添加進結構體
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
epollWait也就是對應的調用底層方法了,pollArrayAddress前面開闢的內存塊,這裏也就知道幹什麼用了,也就是對應着epoll_event結構體指針
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
static int
iepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout)
{
jlong start, now;
int remaining = timeout;
struct timeval t;
int diff;
gettimeofday(&t, NULL);
start = t.tv_sec * 1000 + t.tv_usec / 1000;
for (;;) {
int res = epoll_wait(epfd, events, numfds, remaining);
if (res < 0 && errno == EINTR) {
if (remaining >= 0) {
gettimeofday(&t, NULL);
now = t.tv_sec * 1000 + t.tv_usec / 1000;
diff = now - start;
remaining -= diff;
if (diff < 0 || remaining <= 0) {
return 0;
}
start = now;
}
} else {
return res;
}
}
}
//這一段我猜測是中斷操作,因為就算設置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也會修改成false,暫時存疑。如有dalao歡迎告訴我
//sun.nio.ch.EPollArrayWrapper#poll
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
//sun.nio.ch.EPollSelectorImpl#doSelect
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
updateSelectedKeys()
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
//從pollArrayAddress找到epoll_event結構體
int nextFD = pollWrapper.getDescriptor(i);
//找到對應的SelectionKey
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
//第一次沒有
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
//添加進selectedKeys集合
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
其他
到這裏,整體脈絡就很清楚了剩下的這些也沒必要分析了,基本和上一篇裏面的對應了,各位讀者姥爺這麼聰明,就不浪費大家時間了
if(key.isReadable()){
handleRead(key);
}
if(key.isWritable() && key.isValid()){
handleWrite(key);
}
if(key.isConnectable()){
System.out.println("isConnectable = true");
}
//下面是c
// 如果是新的連接,需要把新的socket添加到efd中
if (ep[i].data.fd == listenfd )
{
connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);
tep.events = EPOLLIN;
tep.data.fd = connfd;
ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep);
}
// 否則,讀取數據
else
{
connfd = ep[i].data.fd;
int bytes = read(connfd,buf,MAXLEN);
// 客户端關閉連接
if (bytes == 0){
ret =epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL);
close(connfd);
printf("client[%d] closed\n", i);
}
else
{
for (int j = 0; j < bytes; ++j)
{
buf[j] = toupper(buf[j]);
}
// 向客户端發送數據
write(connfd,buf,bytes);
}
}
總結
- Channel:對socket的封裝
- Selector:對Epoll&epoll_event2個結構體的封裝
- SelectionKey:關聯上面2個,有了文件描述符快速找到對應Socket
參考文章
epoll:https://zh.wikipedia.org/wiki...
linux文檔:https://man7.org/linux/man-pa...