RocketMQ NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动源码解析

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任后端开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • new
    • BrokerOuterAPI
    • MQClientInstance
    • NettyRemotingClient
  • initialize
    • NettyRemotingServer
  • start
    • NettyRemotingServer
    • NettyRemotingClient
  • 总结

前言

RocketMQ 专栏篇:

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

保护数据完整性:探索 RocketMQ 分布式事务消息的力量

RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

RocketMQ MQClientInstance、生产者实例启动源码分析

RocketMQ 投递消息方式以及消息体结构分析:Message、MessageQueueSelector

RocketMQ DefaultMQProducer#send 方法源码解析:生产者投递消息(一)
RocketMQ DefaultMQProducer#send 方法源码解析:生产者投递消息(二)
RocketMQ 通信机制底层数据结构及源码解析

上篇文章【RocketMQ 通信机制底层数据结构及源码解析 】主要介绍了 RocketMQ 中底层的网络通信机制涉及到的数据结构以及线程模型通信,未做过多源码的介绍,这篇文章主要围绕着一块的源码解读.

new

在 Broker 服务端创建 BrokerController 时,会实例化 BrokerController,在里面会传递 NettyServerConfig、NettyClientConfig,如下:

final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
nettyServerConfig.setUseEpollNativeSelector(true);
// .....
final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);

Netty Server Boss 默认绑定的端口:10911

BrokerOuterAPI

在实例化 BrokerController 时,会先将 NettyRemotingClient 先创建好,它主要用来与其他 Broker 之间进行相互通信的,比如:当通过命令在某台 Broker 创建一个 Topic,会通过当前 Broker 组装好信息,发送给其他 Broker 进行 Topic 路由信息进行传递,以便于其他 Broker 都得知该 Topic 信息,进行消息的接收.

public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;// ....this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}

MQClientInstance

在生产者、消费者启动时,通过 MQClientManager#getOrCreateMQClientInstance会创建 MQClientInstance 实例,会将 NettyClientConfig 绑定好

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客户端远程调用的处理器,接受来自 Broker 请求并做出响应this.clientRemotingProcessor = new ClientRemotingProcessor(this);// MQ 客户端 API 发起请求的类this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,final ClientRemotingProcessor clientRemotingProcessor,RPCHook rpcHook, final ClientConfig clientConfig) {this.clientConfig = clientConfig;// RocketMQ 网络模型的核心类this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);this.clientRemotingProcessor = clientRemotingProcessor;this.remotingClient.registerRPCHook(rpcHook); 		this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);// 消费组数量发生变化,触发重平衡this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);// 消费者客户端重置偏移量this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);// 获取消费者状态this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);// 获取消费者运行的信息this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);// 消费消息this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);// 回复消息this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}

NettyRemotingClient

NettyRemotingClient 充当 RocketMQ 网络通信模型下的客户端,生产者、消费者、Broker 都持有对它的引用进行使用,它整体的实例化过程源码如下:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 单向发送信号量数、异步发送信号量数 = 65535super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());this.nettyClientConfig = nettyClientConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 使用公共线程池处理来自客户端的各种 Processor,最低线程数为 4、最大线程数为 CPU 核数this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}

在其实例化时,提供了一个内部局部变量为 Bootstrap

 private final Bootstrap bootstrap = new Bootstrap();

initialize

在实例化 BrokerController 期间,只是会将 Netty 服务端,给设置好,不做任何处理

NettyRemotingServer

调用 BrokerController#initialize 初始化方法时,会实例化 NettyRemotingServer

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// Processor 公共处理的线程池,当未指定 Executor 时this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 默认都是创建 EpollEventLoopGroupif (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();
}
  1. 创建 Semaphore Oneway 信号量:256,Semaphore Async 信号量:64
  2. 创建 Processor 公共处理的线程池,当 Processor 未指定 Executor 时,分配给这个 Executor 进行处理,公共的业务线程池
  3. 创建 1 个线程数的 EpollEventLoopGroup,Reactor 主线程
  4. 创建 3 个线程数的 EpollEventLoopGroup,Reactor 线程池

通过 useEpoll 方法来判别 EpollEventLoopGroup 还是 NioEventLoopGroup

private boolean useEpoll() {// OS 类型:Windows、Linuxreturn RemotingUtil.isLinuxPlatform()// 通过 NettyServerConfig.setUseEpollNativeSelector 方法设置是否开启 Epoll Selector 模型&& nettyServerConfig.isUseEpollNativeSelector()&& Epoll.isAvailable();
}

在实例化 BrokerController 时已经设置 useEpollNativeSelector 变量为 true.

start

NettyRemotingServer

通过 BrokerController#start 方法再调用 NettyRemotingServer#start 方法启动 Netty Server 服务端

public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();/*1. SslHandler:SSL安全套接字协议2. ⬇3. FileRegionEncoder:文件区域采用 Zero-Copy SendFile 编码传输4. ⬇5. NettyEncoder:编码器6. ⬇7. NettyDecoder:解码器8. ⬇9. IdleStateHandler:空闲检查10. ⬇11. NettyConnectManageHandler:网络连接管理12. ⬇13. NettyServerHandler:服务端请求处理器*/ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 在 TCP 协议中,当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求,然后依次处理// ChannelOption.SO_BACKLOG 参数就是用来设置这个连接队列的大小.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())// 默认情况下,TCP 连接在 TIME_WAIT 状态时,不能立即被重用,必须等待一段时间才能重用// 通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接.option(ChannelOption.SO_REUSEADDR, true)// 用于开启或者关闭保活探测,默认情况下是关闭的// 当 SO_KEEPALIVE 开启时,可以保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于 TCP 连接的输入.option(ChannelOption.SO_KEEPALIVE, false)// TCP_NODELAY 是禁用Nagle算法,即数据包立即发送出去// 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法// 如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送。默认为 false.childOption(ChannelOption.TCP_NODELAY, true)// 绑定本地端口 Broker:10911、NameSrv:9876.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});// 设置发送缓冲区大小if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}// 设置接收缓冲区大小if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}// 设置写缓冲区大小if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}// 设置是否开启池化 ByteBufAllocator,采用默认的 PooledByteBufAllocatorif (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}

启动 NettyRemotingServer 流程如下:

  1. 创建 DefaultEventExecutorGroup Worker 线程池,默认线程数量:8,线程名 prefix:NettyServerCodecThread_
  2. 通过 ServerBootstrap 指定好分组:Reactor 主线程、Reactor 线程池
  3. 创建 EpollServerSocketChannel,ServerSocketChannel 实现类
  4. 设置服务端参数,如下表
  5. 调用 io.netty.bootstrap.AbstractBootstrap#bind 方法创建一个 EpollServerSocketChannel,并且绑定好地址、端口
  6. 启动 NettyEventExecutor,它是一个单独的线程,用来接收来自 Netty 客户端空闲、关闭、连接、异常事件并进行监听回调处理.
  7. 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的客户端请求,并对它们进行处理,响应超时等待回调请求返回给客户端
参数名参数值参数描述
ChannelOption.SO_BACKLOG1024当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求
ChannelOption.SO_REUSEADDRtrue通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接
ChannelOption.SO_KEEPALIVEfalse用于开启或者关闭保活探测,默认情况下是关闭的
ChannelOption.TCP_NODELAYtrue如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法
如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false
ChannelOption.SO_SNDBUF0设置发送缓冲区大小
ChannelOption.SO_RCVBUF0设置接收缓冲区大小
ChannelOption.WRITE_BUFFER_WATER_MARK0设置写缓冲区大小
ChannelOption.ALLOCATORPooledByteBufAllocator.DEFAULT优先分配直接内存

Broker 服务端会在初始化阶段,通过调用 BrokerController#registerProcessor 方法注册,请求 -> Processor 处理器之间的映射关系,将其写入到 NettyRemotingAbstract#processorTable 集合中,当接收来自客户端请求时,代表输入由 Netty 最后一个处理器:NettyRemotingServer.NettyServerHandler 接收处理,执行其内部的 channelRead0 方法处理消息收到的请求,根据请求体 RequestCommand 携带的 code,从 processorTable 集合中找到 Pair 组合「Processor,Executor」等待 Broker 处理完成之后,再执行客户端的回调方法,返回给客户端具体的请求结果.

NettyRemotingClient

在执行 BrokerController#start 时,同时会将 BrokerOuterAPI 启动,也就是启动 NettyRemotingClient

在执行 DefaultMQProducer#start、DefaultMQPushConsumerImpl#start 方法时,同时会将 MQClientAPIImpl 也启动,也就是启动 NettyRemotingClient

所以,从 Broker、生产者、消费者角度作为客户端,它们使用的都是同一个类 NettyRemotingClient 逻辑作为 Netty 客户端使用,以下是其启动时具体的源码:

public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// 默认线程数为 4nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// 后续会发起请求时会通过 eventLoopGroupWorker 去建立 Socket 连接与服务端之间进行读、写交互,NioSocketChannel 代表的就是非阻塞的 SocketChannelBootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 数据包组装为更大的帧然后进行发送.option(ChannelOption.TCP_NODELAY, true)// 定时发送探测包来探测连接的对端是否存活.option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}// DefaultEventExecutorGroup 用来执行以下五个 ChannelHandlerpipeline.addLast(defaultEventExecutorGroup,// 编码 -> 处理请求new NettyEncoder(),// 解码 -> 处理响应new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),// 远程调用->请求、响应处理器new NettyClientHandler());}});// 操作系统客户端发送缓冲区的大小if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 操作系统客户端接收缓冲区的大小if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// Timer 定时执行哪些请求过期的事件,每隔 3 秒this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 生产者、消费者客户端一般为空,在 nameserver 与 Broker 交互时会使用到,做一些连接、关闭、异常、死亡状态的回调处理if (this.channelEventListener != null) {this.nettyEventExecutor.start();}
}

启动 NettyRemotingClient 流程如下:

  1. 创建 DefaultEventExecutorGroup Worker 线程池,用于向客户端发起写事件、接收读事件的处理
  2. 通过 group 绑定 Worker 主线程,创建 NioSocketChannel 非阻塞 SocketChannel
  3. 设置相关的客户端参数,如下表
  4. 设置客户端请求、响应时要执行的处理器逻辑,主要是:编码-NettyEncoder、解码-NettyDecoder、请求_响应处理器-NettyClientHandler
  5. 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的请求,并对它们进行处理,响应超时等待回调请求返回给业务调用方
参数名参数值参数描述
ChannelOption.TCP_NODELAYtrue如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法
如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false
ChannelOption.SO_KEEPALIVEfalse用于开启或者关闭保活探测,默认情况下是关闭的
CONNECT_TIMEOUT_MILLIS3000连接超时时长 3 秒,在规定时间内未处理完成返回 Timeout 异常
ChannelOption.SO_SNDBUF0客户端发送缓冲区的大小
ChannelOption.SO_RCVBUF0客户端接收缓冲区的大小
ChannelOption.WRITE_BUFFER_WATER_MARK0设置写缓冲区大小

在作为客户端角度,只有当每次发起投递消息、消费消息请求时,才会创建与服务端之间的 Channel 通道,核心方法 NettyRemotingClient#createChannel 内部调用 Bootstrap#connect(java.net.SocketAddress) 建立与服务端之间的连接,然后再发起请求,请求的内容以及协议已经在本节专栏的上一篇博文讲到过了.

总结

该篇文章主要介绍在 RocketMQ remoting 底层通信模块中的 NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动时源码的分析,在 BrokerController 实例化会优先构建好 Netty 客户端实例,在其初始化阶段会构建好 Netty 服务端实例,而在生产者、消费者侧,是在实例化 MQClientInstance 实例时会将 Netty 客户端实例也构建好,同时在 Broker、生产者、消费者启动时,会将对应的 Netty 服务端、客户端都一并启动,比编写文章不易,希望对您有帮助,能够喜欢~

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1474876.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【5G VoNR】VoNR流程简述

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G技术研究。 博客内容主要围绕…

go-redis源码解析:如何实现sentinel高可用

go-redis里&#xff0c;sentinel只用来获取master和从节点的ip地址&#xff0c;在获取master和replica节点ip时&#xff0c;如果sentinel不可用&#xff0c;那么会换其他的sentinel重试&#xff0c;并将可用的sentinel换到第一个 1. 用于获取master节点 先通过读锁获取c.senti…

模板进阶:非类型模板参数,类模板特化,模板的编译分离

1. 非类型模板参数 模板参数分类类型形参与非类型形参。 类型形参即&#xff1a;出现在模板参数列表中&#xff0c;跟在class或者typename之类的参数类型名称。 非类型形参&#xff0c;就是用一个常量作为类(函数)模板的一个参数&#xff0c;在类(函数)模板中可将该参数当成常…

【Unity数据交互】如何Unity中读取Ecxel中的数据

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 专栏交流&#x1f9e7;&…

2024/7/7周报

文章目录 摘要Abstract文献阅读题目问题本文贡献问题描述图神经网络Framework实验数据集实验结果 深度学习MAGNN模型相关代码GNN为什么要用GNN&#xff1f;GNN面临挑战 总结 摘要 本周阅读了一篇用于多变量时间序列预测的多尺度自适应图神经网络的文章&#xff0c;多变量时间序…

Vulkan 学习(1)---- Vulkan 基本概念和发展历史

目录 Vulkan及其演化史Vulkan 基本概念基本术语 Vulkan 的原理Vulkan应用程序Vulkan的编程模型硬件初始化窗口展示表面资源设置流水线设置描述符和描述符缓冲池基于SPIR-V的着色器流水线管理指令的记录队列的提交 Vulkan及其演化史 目前主流的图形渲染API有OpenGL、OpenGL ES、…

ROS——多个海龟追踪一个海龟实验

目标 通过键盘控制一个海龟&#xff08;领航龟&#xff09;的移动&#xff0c;其余生成的海龟通过监听实现追踪定期获取领航龟和其余龟的坐标信息&#xff0c;通过广播告知其余龟&#xff0c;进行相应移动其余龟负责监听 疑惑点&#xff08;已解决&#xff09; int main(int…

【感谢告知】本账号内容调整,聚焦于Google账号和产品的使用经验和问题案例分析

亲爱的各位朋友&#xff1a; 感谢您对本账号的关注和支持&#xff01; 基于对朋友们需求的分析和个人兴趣的转变&#xff0c;该账号从今天将对内容做一些调整&#xff0c;有原来的内容改为Google&#xff08;谷歌&#xff09;账号和产品的使用经验&#xff0c;以及相关问题的…

判断是否为完全二叉树

目录 分析 分析 1.完全二叉树的概念&#xff1a;对于深度为K的&#xff0c;有n个结点的二叉树&#xff0c;当且仅当其每一个结点都与深度为K的满二叉树中编号从1至n的结点一一对应时称之为完全二叉树。 要注意的是满二叉树是一种特殊的完全二叉树。 2.思路&#xff1a;可以采…

Redis源码整体结构

一 前言 Redis源码研究为什么先介绍整体结构呢?其实也很简单,作为程序员的,要想对一个项目有快速的认知,对项目整体目录结构有一个清晰认识,有助于我们更好的了解这个系统。 二 目录结构 Redis源码download到本地之后,对应结构如下: 从上面的截图可以看出,Redis源码一…

pdf怎么转换成图片格式文件,pdf文档怎么转换成图片格式

在数字化时代&#xff0c;pdf文件转换成图片格式是一种常见的操作&#xff0c;无论是在工作还是日常生活中&#xff0c;我们总会遇到需要将pdf文件转换为图片的需求。这可能是因为图片格式更易于分享、展示或编辑。那么&#xff0c;如何高效地将pdf转换成图片呢&#xff1f;本文…

C++初学者指南-4.诊断---未定义行为检测器

C初学者指南-4.诊断—未定义行为检测器 未定义行为检测器(UBSAN) 适用编译器&#xff1a;clang,g在运行时检测许多类型的未定义行为 解引用空指针从未对齐的指针读取整数溢出被0除 … 在代码中加入额外的指令:在调试构建中增加运行时约25% 示例&#xff1a;有符号整形溢出 …

RabbitMq - Java客户端基础【简单案例 +Work模型】

目录 1、前置知识 1.1、AMQP怎么理解 1.2、Spring AMQP是什么 1.3、为什么要了解Spring-AMQP&#xff1f; 2、使用Spring-AMQP实现一个发消息案例 3、Work模型 问题&#xff1a; 优化&#xff1a; 小结&#xff1a;Work模型的使用&#xff1a; 1、前置知识 1.1、AMQP怎…

【论文阅读】-- Visual Traffic Jam Analysis Based on Trajectory Data

基于轨迹数据的可视化交通拥堵分析 摘要1 引言2 相关工作2.1 交通事件检测2.2 交通可视化2.3 传播图可视化 3 概述3.1 设计要求3.2 输入数据说明3.3 交通拥堵数据模型3.4 工作流程 4 预处理4.1 路网处理4.2 GPS数据清理4.3 地图匹配4.4 道路速度计算4.5 交通拥堵检测4.6 传播图…

2024世界人工智能大会,神仙打架

B站&#xff1a;啥都会一点的研究生公众号&#xff1a;啥都会一点的研究生 AI圈最近又发生了啥新鲜事&#xff1f; 该栏目以周更频率总结国内外前沿AI动态&#xff0c;感兴趣的可以点击订阅合集以及时收到最新推送 B站首秀世界人工智能大会&#xff0c;展示自研AI技术与AIGC…

生成式人工智能如何改变软件开发:助手还是取代者?

生成式人工智能如何改变软件开发&#xff1a;助手还是取代者&#xff1f; 生成式人工智能&#xff08;AIGC&#xff09;正在引领软件开发领域的技术变革。从代码生成、错误检测到自动化测试&#xff0c;AI工具在提高开发效率的同时&#xff0c;也引发了对开发者职业前景的讨论…

【shell编程小项目】

目录 一、项目拓扑二、要求三、shell编程 一、项目拓扑 二、要求 环境准备&#xff1a; 准备两个虚拟机&#xff0c;按照环境配置好对应的 IP 地址和对应的主机名和 SSH 密钥登录在 workstation.exam.com 节点实现如下需求&#xff1a; 1、编写 Shell 脚本&#xff0c;要求代码…

【web APIs】快速上手Day04(Dom节点)

目录 Web APIs - 第4天日期对象实例化方法案例-页面显示时间时间的另外一个写法 时间戳三种方式获取时间戳案例-毕业倒计时效果 节点操作DOM节点查找节点父节点查找案例-关闭广告子节点查找兄弟关系查找 增加节点创建节点追加节点案例-学成在线案例渲染克隆节点 删除节点 M端事…

ESP32 步进电机精准控制:打造高精度 DIY 写字机器人,实现流畅书写体验

摘要: 想让你的 ESP32 不再仅仅是控制灯光的工具吗&#xff1f; 本文将带你使用 ESP32 开发板、步进电机和简单的机械结构打造一个能够自动写字的机器人。我们将深入浅出地讲解硬件连接、软件代码以及控制逻辑&#xff0c;并提供完整的项目代码和电路图&#xff0c;即使是 Ardu…

在mac下 Vue2和Vue3并存 全局Vue2环境创建Vue3新项目(Vue cli2和Vue cli4)

全局安装vue2 npm install vue-cli -g自行在任意位置创建一个文件夹vue3&#xff0c;局部安装vue3,注意不要带-g npm install vue/cli安装完成后&#xff0c;进入目录&#xff0c;修改vue为vue3 找到vue3/node-moudles/.bin/vue&#xff0c;把vue改成vue3。 对环境变量进行配置…