[三]RabbitMQ-客户端源码之ChannelManager

简介: 关于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1.. _channelMax)。

关于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1.. _channelMax)。

ChannelManager类的代码量不是很多,主要用来管理Channel的,channelNumber=0的除外,应为channelNumber=0是留给Connection的特殊的channelNumber。

下面是ChannelManager的成员变量:

/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
private final Object monitor = new Object();
    /** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
    private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();
    private final IntAllocator channelNumberAllocator;

private final ConsumerWorkService workService;

private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();

/** Maximum channel number available on this connection. */
private final int _channelMax;
private final ThreadFactory threadFactory;

这上面的成员变量下面会有涉及。


对于ChannelManager的使用,是AMQConnection中的成员变量:

/** Object that manages a set of channels */
private volatile ChannelManager _channelManager;

AMQConnection中start()的_channelManager中对其初始化:

protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
    return new ChannelManager(this._workService, channelMax, threadFactory);
}

再调用其构造函数:

public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
    if (channelMax == 0) {
        // The framing encoding only allows for unsigned 16-bit integers
        // for the channel number
        channelMax = (1 << 16) - 1;
    }
    _channelMax = channelMax;
    channelNumberAllocator = new IntAllocator(1, channelMax);

    this.workService = workService;
    this.threadFactory = threadFactory;
}

这里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService():

private void initializeConsumerWorkService() {
    this._workService  = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
}

再回到构造函数。

channelMax参数是在client接收到broker的Connection.Tune帧中的“Channel-Max”参数之后设置的,如果为0则表示没有限制,这里就会设置为默认的最大值:2的16次方-1。
threadFactory参数是指:Executors.defaultThreadFactory();

关于ConsumerWorkService请参考文章末尾处。


使用过RabbitMQ的同学知道要生产或者消费消息之前必须要初始化Channel,如下:

Channel channel = connection.createChannel();

这个createChannel()是AMQConnection中的方法:

public Channel createChannel(int channelNumber) throws IOException {
    ensureIsOpen();
    ChannelManager cm = _channelManager;
    if (cm == null) return null;
    return cm.createChannel(this, channelNumber);
}
public Channel createChannel() throws IOException {
    ensureIsOpen();
    ChannelManager cm = _channelManager;
    if (cm == null) return null;
    return cm.createChannel(this);
}

这里就是调用了ChannelManager的createChannel方法。

下面是ChannelManager中关于创建Channel的代码:

public ChannelN createChannel(AMQConnection connection) throws IOException {
    ChannelN ch;
    synchronized (this.monitor) {
        int channelNumber = channelNumberAllocator.allocate();
        if (channelNumber == -1) {
            return null;
        } else {
            ch = addNewChannel(connection, channelNumber);
        }
    }
    ch.open(); // now that it's been safely added
    return ch;
}

public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
    ChannelN ch;
    synchronized (this.monitor) {
        if (channelNumberAllocator.reserve(channelNumber)) {
            ch = addNewChannel(connection, channelNumber);
        } else {
            return null;
        }
    }
    ch.open(); // now that it's been safely added
    return ch;
}

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {
    if (_channelMap.containsKey(channelNumber)) {
        // That number's already allocated! Can't do it
        // This should never happen unless something has gone
        // badly wrong with our implementation.
        throw new IllegalStateException("We have attempted to "
                + "create a channel with a number that is already in "
                + "use. This should never happen. "
                + "Please report this as a bug.");
    }
    ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
    _channelMap.put(ch.getChannelNumber(), ch);
    return ch;
}

protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
    return new ChannelN(connection, channelNumber, workService);
}

上面有两个createChannel方法,一个是带了channelNumber的,一个是自动分片channelNumber的,分别对应AMQConnection中的两个方法。最后都调用addNewChannel方法。

注意两个createChannel方法中都有这样一句代码:

ch.open();

这个是什么呢?其实是调用ChannelN的open方法:

/**
 * Package method: open the channel.
 * This is only called from {@link ChannelManager}.
 * @throws IOException if any problem is encountered
 */
public void open() throws IOException {
    // wait for the Channel.OpenOk response, and ignore it
    exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}

这样就调用了AMQChannel的rpc方法,向broker发送了一个Channel.Open帧。

addNewChannel方法实际上是创建了一个ChannelN对象,然后置其于ChannelManager中的_channelMap中,方便管理。

channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet来实现channelNumber的分配,有兴趣的同学可以深究进去看看。

关于ChannelN类会有专门一篇博文来讲述,其实整个RabbitMQ-client的代码最关键的就是ChannelN这个类,需要着重讲述。

细心的朋友可能会发现关于ConsumerWorkService这个,我并没有做什么阐述。这个主要牵涉到Channel层面的处理,涉及到的类有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化,在ChannelManager中引用。至于这里怎么理解,在ChannelN中这么解释:
service for managing this channel’s consumer callbacks。意思是管理消费回调的服务。
综述,ChannelManager主要用来管理Channel, 包括channelNumber与Channel之间的映射关系。


附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 Java Spring
RocketMQ-JAVA客户端不同版本接入方式
RocketMQ4.0 RocketMQ5.0 JAVA接入 spring springboot
RocketMQ-JAVA客户端不同版本接入方式
|
9月前
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
326 0
|
8月前
|
安全 网络协议 物联网
不看后悔系列之一篇搞懂LinuxCentOS搭建MQTT服务器及客户端操作使用
linux CentOS上搭建MQTT服务器并不难,主要就是用到了mosquitto这款消息代理服务软件。其采用发布/订阅模式传输机制,轻量、简单、开放并易于实现,被广泛应用于物联网之中。
1370 0
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
1851 1
rocketmq客户端发送消息报错和超时问题
|
监控 物联网 API
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。
939 0
【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示
|
1月前
|
Java Maven
【开源视频联动物联网平台】vertx写一个mqtt客户端
【开源视频联动物联网平台】vertx写一个mqtt客户端
32 1
|
4月前
|
消息中间件 运维 负载均衡
负载均衡中后端连了三个rabbitmq,如果挂了一个,客户端连接mq会变慢吗
在负载均衡中使用三个 RabbitMQ 实例,如果其中一个实例发生故障,可能会影响客户端连接到 RabbitMQ 的性能。具体影响取决于负载均衡的配置和客户端的实现方式。 如果负载均衡器能够及时检测到故障的 RabbitMQ 实例并将流量路由到正常的实例,那么客户端连接的性能影响可能较小。但如果负载均衡器不能迅速切换流量或者客户端实现不支持及时的连接故障转移,那么可能会导致客户端连接的延迟或失败。 在设计这样的架构时,有一些考虑因素: 1. **健康检查和故障切换:** 确保负载均衡器能够定期检查 RabbitMQ 实例的健康状态,并在出现故障时快速将流量切换到其他正常的实例。 2.
|
5月前
|
消息中间件
RabbitMQ客户端清空所有消息
RabbitMQ客户端清空所有消息
158 0
|
5月前
|
传感器 JavaScript 物联网
如何在Node.js中使用MQTT客户端库?
如何在Node.js中使用MQTT客户端库?
66 0
|
5月前
|
物联网 Python
如何通过示例在Python中使用Paho MQTT客户端?
如何通过示例在Python中使用Paho MQTT客户端?
76 2
如何通过示例在Python中使用Paho MQTT客户端?