Netty source code analysis - ChannelPipeline mechanism and reading and writing process

Netty source code analysis - ChannelPipeline mechanism and reading and writing process

This article continues to read the Netty source code, analyze the ChannelPipeline event propagation principle, and the Netty read and write process.
Source code analysis is based on Netty 4.1

ChannelPipeline

The ChannelPipeline in Netty can be understood as an interceptor chain, and a ChannelHandler linked list is maintained. ChannelHandler is the specific interceptor, which can process data during the reading and writing process.
ChannelHandler can also be divided into two categories.
ChannelInboundHandler monitors Channel status changes, such as channelActive, channelRegistered, and usually processes the read data by overriding the ChannelOutboundHandler#channelRead method. For example, HttpObjectDecoder parses the read data into (netty)HttpRequest.
ChannelOutboundHandler , intercept IO events, such as bind, connect, read, write, usually by rewriting the ChannelInboundHandler#write method to process the data that will be written to the Channel. Such as HttpResponseEncoder, convert the data to be written into Http format.

The default implementation class of ChannelPipeline is DefaultChannelPipeline, which maintains two special ChannelHandlers - HeadContext and TailContext at the beginning and end of the ChannelHandler linked list.
HeadContext is responsible for forwarding IO events to the corresponding UnSafe processing, such as the register, bind, read and other operations mentioned in the previous article.
TailContext mainly deals with the bottom line, such as channelRead method to release the reference of ByteBuf.

Event propagation

ChannelOutboundInvoker is responsible for triggering the methods of ChannelOutboundHandler. Their method names are the same, but the ChannelHandlerContext parameter is missing from the ChannelOutboundInvoker method.
Similarly, ChannelInboundInvoker is responsible for triggering the method of ChannelInboundHandler, but the method name of ChannelInboundInvoker is more called fire, such as ChannelInboundInvoker#fireChannelRead method, which triggers ChannelInboundHandler#channelRead. Both
ChannelPipeline and ChannelHandlerContext inherit these two interfaces.
But they have different functions. ChannelPipeline is an interceptor chain, and the actual request is entrusted to ChannelHandlerContext for processing.
The ChannelHandlerContext interface (ie, the ChannelHandler context) maintains the upper and lower nodes of the linked list. As a parameter of the ChannelHandler method, it is responsible for interacting with ChannelPipeline and other ChannelHandlers. Through it, you can dynamically modify the properties of the Channel, submit tasks to EventLoop, and propagate events to the next (previous) ChannelHandler.
For example, after ChannelInboundHandler#channelRead processes the data, you can write the data to the Channel through ChannelHandlerContext#write.
The ChannelInboundHandler#handler method returns the real ChannelHandler and uses the ChannelHandler to perform actual operations.
When adding a ChannelHandler through the methods such as DefaultChannelPipeline#add1. Netty will construct a DefaultChannelHandlerContext for the ChannelHandler, and the handler method will return the corresponding ChannelHandler.
HeadContext and TailContext also implement AbstractChannelHandlerContext, and the handler method returns this.

We can also submit asynchronous tasks to EventLoop through ChannelHandlerContext

ctx.channel().eventLoop().execute(new Runnable() {
	public void run() {
		...
	}
});
 

For operations with a long blocking time, using asynchronous task completion is a good choice.

Let's take DefaultChannelPipeline#fireChannelRead as an example to take a look at their event propagation process.
DefaultChannelPipeline

public final ChannelPipeline fireChannelRead(Object msg) {
	AbstractChannelHandlerContext.invokeChannelRead(head, msg);
	return this;
}
 

Using HeadContext as the starting node, call the AbstractChannelHandlerContext#invokeChannelRead method to start calling the interceptor linked list.

AbstractChannelHandlerContext

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
	final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
	EventExecutor executor = next.executor();
	if (executor.inEventLoop()) {
		next.invokeChannelRead(m);
	} else {
		...
	}
}

private void invokeChannelRead(Object msg) {
	if (invokeHandler()) {
		try {
			//#1
			((ChannelInboundHandler) handler()).channelRead(this, msg);
		} catch (Throwable t) {
			notifyHandlerException(t);
		}
	} else {
		fireChannelRead(msg);
	}
}
 

#1 The handler method gets the real Handler of AbstractChannelHandlerContext, and then triggers its ChannelPipeline#channelRead method.
Since the invokeChannelRead method is executed in HeadContext, HeadContext is handler()returned here, and HeadContext#channelRead will be triggered.

HeadContext#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	ctx.fireChannelRead(msg);
}
 

The HeadContext method call ctx.fireChannelRead(msg)is to propagate the event to the next ChannelInboundHandler.

AbstractChannelHandlerContext#fireChannelRead

public ChannelHandlerContext fireChannelRead(final Object msg) {
	invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
	return this;
}
 

AbstractChannelHandlerContext#fireChannelRead(final Object msg)The method is mainly responsible for finding the next ChannelInboundHandler and triggering its channelRead method.

A complete call link can be seen from the DefaultChannelPipeline#fireChannelRead method:
#1DefaultChannelPipeline starts to call
#2ChannelInboundHandler through HeadContext to process the current logic, then calls the ctx.fireChannelRead(msg)backward propagation event
#3AbstractChannelHandlerContext to find the next ChannelInboundHandler and triggers its channelRead to ensure that the interceptor chain continues to execute .

Note: For the methods in ChannelOutboundHandler, DefaultChannelPipeline is called from TailContext and propagates events forward, which is the opposite of ChannelInboundHandler.
When you read the Netty source code, for the DefaultChannelPipeline method, you should pay attention to whether the underlying call of the method is the ChannelInboundHandler or the ChannelOutboundHandler method, and their propagation direction.

If we define an Http echo program, the code is as follows

new ServerBootstrap().group(parentGroup, childGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new HttpRequestDecoder());
				        p.addLast(new HttpResponseEncoder());
				        p.addLast(new LoggingHandler(LogLevel.INFO));
				        p.addLast(new HttpEchoHandler());
                    }
                });
 

Among them, HttpEchoHandler implements ChannelInboundHandler and calls the ChannelHandlerContext#write method in the channelRead method to return data.
Then, the data flow is as follows

Socket.read() -> head#channelRead  -> HttpRequestDecoder#channelRead -> LoggingHandler#channelRead -> HttpEchoHandler#channelRead
                                                                                                                 |
                                                                                                              /|/
Socket.write() <-   head#write     <- HttpResponseEncoder#write     <-     LoggingHandler#write   <-  ChannelHandlerContext#write
 

ChannelHandlerContext#write is different from DefaultChannelPipeline#write. The former finds a ChannelOutboundHandler from the current node and starts calling, while the latter starts calling from tail.

Read

As mentioned in the previous article "Principle of Event Loop Mechanism Implementation", in NioEventLoop#processSelectedKey, accept and read events are processed through the NioUnsafe#read method. Let's look at the processing of some read events.
NioByteUnsafe#read

public final void read() {
	final ChannelConfig config = config();
	if (shouldBreakReadReady(config)) {
		clearReadPending();
		return;
	}
	final ChannelPipeline pipeline = pipeline();
	final ByteBufAllocator allocator = config.getAllocator();
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	allocHandle.reset(config);

	ByteBuf byteBuf = null;
	boolean close = false;
	try {
		do {
			//#1
			byteBuf = allocHandle.allocate(allocator);
			//#2
			allocHandle.lastBytesRead(doReadBytes(byteBuf));
			//#3
			if (allocHandle.lastBytesRead() <= 0) {
				byteBuf.release();
				byteBuf = null;
				close = allocHandle.lastBytesRead() < 0;
				if (close) {
					readPending = false;
				}
				break;
			}

			allocHandle.incMessagesRead(1);
			readPending = false;
			//#4
			pipeline.fireChannelRead(byteBuf);
			byteBuf = null;
			//#5
		} while (allocHandle.continueReading());
		//#6
		allocHandle.readComplete();
		//#7
		pipeline.fireChannelReadComplete();

		if (close) {
			//#8
			closeOnRead(pipeline);
		}
	} catch (Throwable t) {
		handleReadException(pipeline, byteBuf, t, close, allocHandle);
	} finally {
		...
	}
}
 

#1Allocate memory to ByteBuf to
#2read Socket data to ByteBuf. By default, it will try to read 1024 bytes of data.
#3If the lastBytesRead method returns -1, it means that the Channel has been closed. At this time, the current ByteBuf reference is released and the Channel is ready to be closed to
#4use the read data and trigger ChannelPipeline#fireChannelRead. Usually we process the data here.
#5Determine whether to continue reading data.
The default condition is that if the size of the data read is equal to 1024 bytes of the data attempted to be read, then continue to read.
#6The reserved method is provided to RecvByteBufAllocator to do some extended operations.
#7Trigger ChannelPipeline#fireChannelReadComplete, for example, convert the data read multiple times into an object.
#8Close Channel

Note that if ChannelPipeline#fireChannelRead does not continue to propagate the channelRead event, it will not be executed to the TailContext#channelRead method, which is that we need to release the corresponding ByteBuf by ourselves.
It can be implemented by inheriting the SimpleChannelInboundHandler class, SimpleChannelInboundHandler#channelRead guarantees the final release of ByteBuf.

Write

We need to call the ChannelHandlerContext#write method to trigger the write operation.
ChannelHandlerContext#write -> HeadContext#write -> AbstractUnsafe#write

public final void write(Object msg, ChannelPromise promise) {
	assertEventLoop();
	//#1
	ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
	...

	int size;
	try {
		//#2
		msg = filterOutboundMessage(msg);
		//#3
		size = pipeline.estimatorHandle().size(msg);
		if (size < 0) {
			size = 0;
		}
	} catch (Throwable t) {
		safeSetFailure(promise, t);
		ReferenceCountUtil.release(msg);
		return;
	}
	//#4
	outboundBuffer.addMessage(msg, size, promise);
}
 

#1Get the ChannelOutboundBuffer maintained in AbstractUnsafe, this class is responsible for caching the data written, and then actually writing the data after flushing.
#2AbstractChannel provides extension methods for subclasses, which can do some ByteBuf checks, conversions and other operations.
#3Check the amount
#4of data to be written . Add data to the ChannelOutboundBuffer buffer.
As you can see, write does not actually write data, but puts the data in a buffer object ChannelOutboundBuffer.
The data in ChannelOutboundBuffer should be written out when ChannelHandlerContext#flush.

ByteBuf is the memory buffer responsible for interacting with Channel in Netty, while ByteBufAllocator and RecvByteBufAllocator are mainly responsible for allocating memory to ByteBuf, and an article will analyze them later.
ChannelOutboundBuffer is mainly for buffering write data, and then writing to the Channel at the time of flush. There is an article to analyze it later.

If you think this article is good, welcome to follow my WeChat public account, your attention is my motivation for persistence!