一、基本说明
1)、只有看过Netty源码,才能说是真正掌握了Netty框架。
2)、在 io.netty.example 包下,有很多Netty源码案例,可以用来分析。
二、启动过程源码剖析
2.1 目的
1)、用源码分析的方式走一下 Netty(服务器)的启动过程,更好的理解 Netty 的整体设计和运行机制。
2)、源码需要剖析到 Netty 调用 doBind 方法,追踪到 NioServerSocketChannel 的 doBind 方法
3)、要 Debug 程序到 NioEventLoop 类的run代码,无限循环,在服务器端运行
2.2 启动类分析
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1)、在启动类 main 方法中,首先创建了关于 SSL 的配置类。
2)、在启动类 main 方法中,创建了两个 EventLoopGroup 对象:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
1、 这两个对象是整个Netty的核心对象,可以说,整个Netty的运作都依赖于它们bossGroup用于接收Tcp请求,它会将请求交给workerGroup,workerGroup会获取到真正的连接,然后和连接进行通信,比如读写编码解码等等;
2、 EventLoopGroup是事件循环组(线程组)含有多个EventLoop,可以注册channel,用于在事件循环中去进行选择(和选择器相关);
3、 newNioEventLoopGroup(1):这个1表示bossGroup事件组有1个线程,这个值可以指定,如果newNioEventLoopGroup()会含有默认个线程cpu核数*2,即可以充分的利用多核的优势;
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
1、 创建EventExecutor数组,children=newEventExecutor[nThreads];每个元素的类型就是NIOEventLoop,NIOEventLoop实现了EventLoop接口和Executor接口;
2、 在启动类main方法中,try块中创建了一个ServerBootstrap对象,它是一个引导类,用于启动服务器和引导整个程序的初始化它和ServerChannel关联,而ServerChannel继承了Channel,所以它提供了一些方法供我们使用,比如:remoteAddress等随后,变量b(ServerBootstrap)调用了group方法,将两个group放入到自己的字段中,用于后期引导使用;
3、 然后添加了一个channel,其中参数是一个Class对象,引导类将通过这个Class对象反射创建ChannelFactory然后添加了一些TCP的参数【说明:Channel的创建在bind方法,Debug下可以找到:channel=channelFactory.newChannel();】;
4、 添加一个服务器专属的日志处理器;
5、 添加一个SocketChannel(不是ServerSocketChannel)的handler;
6、 绑定端口并阻塞至连接成功;
7、 最后main线程阻塞等待关闭;
8、 finally块中的代码将在服务器关闭时优雅关闭所有资源;
2.3 处理器源码
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
1)、这是一个普通的处理器类,用于处理器客户端发送来的消息。
2.3 EventLoopGroup分析
1)、构造器方法,依次调用(CTRL+D)
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
2)、追踪到源码 抽象类 MultithreadEventExecutorGroup 的构造方法 MultithreadEventExecutorGroup 才是 NioEventLoopGroup 真正的构造方法,这里可以看成是一个模板方法,使用了设计模式的模板模式。
3)、MultithreadEventExecutorGroup
/**
* @param nThreads 使用的线程数,默认为 core*2
* @param executor 执行器,如果传入null,则采用Netty默认的线程工厂和默认的执行器 ThreadPerTaskExecutor
* @param chooserFactory 单例 new DefaultEventExecutorChooserFactory()
* @param args 在创建执行器的时候传入固定参数
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) { // 如果传入的执行器是空,则采用默认的线程工厂和默认的执行器
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads]; // 创建指定线程数的执行器数组
for (int i = 0; i < nThreads; i ++) { // 初始化线程数组
boolean success = false;
try {
// 创建 NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果创建失败,优雅关闭
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 为每一个单例线程池添加一个关闭监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
// 将所有的单例线程池添加到一个 HashSet 中
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
1)、如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂
2)、根据传入的线程数(CPU*2),创建一个线程池(单例线程池)数组
3)、循环填充数组中的元素,如果异常,则关闭所有的单例线程池
4)、根据线程选择工厂创建一个 线程选择器
5)、为每一个单例线程池添加一个关闭监听器
6)、将所有的单例线程池添加到一个 HashSet 中
2.4 ServerBootstrap分析
1)、ServerBootstrap 是个空构造,但是有默认的成员变量
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
2)、基本使用
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
1)、链式调用:group 方法,将 bossGroup 和 workerGroup 传入,bossGroup 赋值给 parentGroup 属性,workerGroup 赋值给 childGroup 属性
2)、channel 方法传入 NioServerSocketChannel.class 对象。会根据这个 class 创建 channel 对象
3)、option 方法传入 TCP 参数,放在一个 LinkedHashMap 中
4)、handler 方法传入一个 handler ,这个 handler 只属于 ServerSocketChannel,而不是 SocketChannel
5)、childHandler 方法传入一个 handler ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: