十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
本篇内容介绍了“Netty是如何绑定端口和启动服务的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

阿拉山口ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!
服务端启动DEMO
先从一个简单的服务端启动`DEMO`开始,以下是一个标准的`Netty`服务端代码
public final class NettyServer {public static void main(String[] args) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer() {@Overridepublic void initChannel(SocketChannel channel) {                            ChannelPipeline channelPipeline = channel.pipeline();                            channelPipeline.addLast("decoder", new StringDecoder());                            channelPipeline.addLast("encoder", new StringEncoder());                            channelPipeline.addLast("handler", new ServerHandler());                        }                    });            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}br  注:ServerBootstrap.childHandler()用于指定处理新连接数据的读写处理逻辑,同时ServerBootstrap还提供handler()用于指定在服务端启动过程中的一些逻辑,通常情况下我们用不着这个方法
`ServerHandler`代码如下:
public class ServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) {        System.out.println("channelActive");    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) {        System.out.println("channelRegistered");    }    @Override    public void handlerAdded(ChannelHandlerContext ctx) {        System.out.println("handlerAdded");    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelReadComplete");    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelInactive");    }    @Override    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("service receive msg:" + msg);    }}br 当有新连接接入时,控制台打印出
reStructuredTexthandlerAddedchannelRegisteredchannelActivebr
但接收到新消息时,控制台打印出
reStructuredTextservice receive msg:xxxchannelReadCompletebr
本文主要分析服务端的启动过程,而新连接接入 新消息的读取会在后续章节中说明
服务端启动源码分析
`ServerBootstrap`是`Netty`为方便开发者使用而设计的一个启动类,其核心代码入口在`bind()`,代码如下
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}//通过端口号创建一个`InetSocketAddress`,然后继续调用重载的`bind()`public ChannelFuture bind(SocketAddress localAddress) {// ...return doBind(localAddress);}br 由于博客篇幅有限所以有些健壮性分支会以`// ...`略过,健壮性分支不会影响对`Netty`主流程的理解。
`Netty`服务端启动最后会调用到`doBind()`方法,代码如下
private ChannelFuture doBind(final SocketAddress localAddress) {//...final ChannelFuture regFuture = initAndRegister();//...final Channel channel = regFuture.channel();//...    doBind0(regFuture, channel, localAddress, promise);//...return promise;br 在`doBind()`中我们关注两个核心方法`initAndRegister()`以及`doBind0()`
服务端`Channel`创建
final ChannelFuture initAndRegister() {    Channel channel = null;// 新建Channel    channel = channelFactory.newChannel();// 初始化Channelinit(channel);// 将这个Channel Register到某个对象    ChannelFuture regFuture = config().group().register(channel);return regFuture;}br `Channel`是`Netty`的核心概念之一,它是`Netty`网络通信的主体由它负责同对端进行网络通信、注册和数据操作等功能。
`Channel`的创建是由`channelFactory.newChannel()`完成的,接下来跟踪`channelFactory`是在何时被初始化,我们层层回溯最终发现是在这个函数中
public B channel(Class extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");    }return channelFactory(new ReflectiveChannelFactory(channelClass));}br  在`Demo`程序调用`.channel()`方法并传入`NioServerSocketChannel.class`
public class ReflectiveChannelFactoryimplements ChannelFactory {private final Class extends T> clazz;public ReflectiveChannelFactory(Class extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz"); }this.clazz = clazz; }@Overridepublic T newChannel() {try {return clazz.newInstance(); } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t); } }}br 
即在`Netty`服务端启动的时候通过反射方式(调用默认构造函数)来创建一个`NioServerSocketChannel`对象,
加下来我们继续跟进`NioServerSocketChannel`的默认构造函数
public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) {try {// 利用SelectorProvider产生一个ServerSocketChannel对象return provider.openServerSocketChannel();    } catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);    }}br 通过`newSocket(DEFAULT_SELECTOR_PROVIDER)`创建一条`server`端`channel`,然后进入到以下方法
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}br 该方法主要完成两个功能,首先是调用父类的构造方法然后初始化`NioServerSocketChannelConfig`属性
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;// 设置SelectionKey.OP_ACCEPT事件this.readInterestOp = readInterestOp;// 设置ServerSocketChannel为非阻塞的    ch.configureBlocking(false);}br 这里将前面通过`provider.openServerSocketChannel()`创建出来的`ServerSocketChannel`保存到成员变量,然后调用将该`channel`为非阻塞模式,这是个标准的`JDK NIO`编程的玩法。这里的`readInterestOp`即前面层层传入的`SelectionKey.OP_ACCEPT`,接下来继续跟进`super(parent)`
protected AbstractChannel(Channel parent) {this.parent = parent;    id = newId();unsafe = newUnsafe();    pipeline = newChannelPipeline();}br 在`AbstractChannel`的构造方法中主要是初始化了`id`,`unsafe`,`pipeline`属性
服务端`Channel`初始化
在创建完`Channel`后,我们在`init`方法中对`Channel`进行初始化操作,代码如下
void init(Channel channel) throws Exception {// 给channel设置optionfinal Map, Object> options = options0();synchronized (options) {        channel.config().setOptions(options);    }// 给channel设置attrfinal Map, Object> attrs = attrs0();synchronized (attrs) {for (Entry, Object> e: attrs.entrySet()) {            AttributeKey    以上代码主要完成如下功能:
ChannelFuture regFuture = config().group().register(channel);```此处`config().group()`返回的对象为`NioEventLoopGroup````java@Overridepublic ChannelFuture register(Channel channel) {// 调用了NioEventLoop对象中的register方法// NioEventLoop extends SingleThreadEventLoopreturn next().register(channel);}br public ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}public ChannelFuture register(final ChannelPromise promise) {    ObjectUtil.checkNotNull(promise, "promise");    promise.channel().unsafe().register(this, promise);    return promise;}br public final void register(EventLoop eventLoop, final ChannelPromise promise) {// ...    AbstractChannel.this.eventLoop = eventLoop;// ...    register0(promise);}br private void register0(ChannelPromise promise) {    try {        boolean firstRegistration = neverRegistered;        doRegister();        neverRegistered = false;        registered = true;        pipeline.invokeHandlerAddedIfNeeded();        safeSetSuccess(promise);        pipeline.fireChannelRegistered();        if (isActive()) {            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                beginRead();            }        }    } catch (Throwable t) {        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}//这一段其实也很清晰,先调用`doRegister()`进行注册protected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            // ...        }    }}br private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {    channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}//在`dobind0()`方法中通过`EventLoop`执行一个任务,接下来我们进入到`channel.bind()`方法public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}br public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {unsafe.bind(localAddress, promise);}//这里的`unsafe`就是前面提到的`AbstractUnsafe`, 准确点应该是`NioMessageUnsafe`@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ...    boolean wasActive = isActive();// ...    doBind(localAddress);if (!wasActive && isActive()) {        invokeLater(new Runnable() {            @Overridepublic void run() {                pipeline.fireChannelActive();            }        });    }    safeSetSuccess(promise);}//在`doBind`方法中完成绑定操作,代码如下protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {        javaChannel().bind(localAddress, config.getBacklog());    } else {        javaChannel().socket().bind(localAddress, config.getBacklog());    }}br 最终调用到了`JDK`里面的`bind`方法真正进行了端口的绑定。按照正常流程我们前面已经分析到`isActive()`方法返回`false`,进入到`doBind()`之后如果`channel`被激活了,就发起`pipeline.fireChannelActive()`调用
public void channelActive(ChannelHandlerContext ctx) throws Exception {    ctx.fireChannelActive();    readIfIsAutoRead();}//`pipeline.channelActive`会逐一调用`pipeline`中每一个节点的`channelActive`方法,所以`HeadContext`的`channelActive`将会被调用,即`readIfIsAutoRead`方法将会被调用public void channelActive(ChannelHandlerContext ctx) throws Exception {    ctx.fireChannelActive();    readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {        channel.read();    }}//最终这个方法会调用到`AbstractNioChannel`的`doBeginRead`方法protected void doBeginRead() throws Exception {final SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;    }    readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {        selectionKey.interestOps(interestOps | readInterestOp);    }}br ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 建NioServerSocketChannelserverSocketChannel.configureBlocking(false);//AbstractNioChannel中ch.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));// NioServerSocketChannel.doBind()Selector selector = Selector.open();// NioEventLoop.openSelector()serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) //  AbstractNioChannel.doBeginRead()br “Netty是如何绑定端口和启动服务的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!