【原创】rabbitmq-c源码分析之amqp_socket.c

简介:

     走读 amqp_socket.c 源码,发现若干值得注意的细节:

amqp_open_socket 

在 amqp_open_socket 函数中通过 getaddrinfo 函数进行域名解析可能会产生阻塞。其中参数指定了

?
1
2
3
4
5
6
struct addrinfo hint;
...
hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */
hint.ai_socktype = SOCK_STREAM;
hint.ai_protocol = IPPROTO_TCP;
...

故其会对IPv4和IPv6地址均进行查询。

创建 socket 时的代码表明,其根据实际情况(IPv4或IPv6)创建对应的socket。

?
1
sockfd = ( int )socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);

socket 创建后仅设置了 SO_NOSIGPIPE(该标识仅在 IOS 系统下有效) 和 TCP_NODELAY 选项,之后就进行了 connect 动作,说明该 socket 为阻塞式用法,并且未绑定本地任何地址。这样的结果是 connect 会根据连接到的对端地址自动绑定本地对应地址。

另外,由于处理的是阻塞式 socket ,故 connect 仅对出错返回进行了简单处理。唯一可能需要注意的,实现中没有 connect 重连机制,直接结果是对每一个地址仅尝试 connect 一次。


【amqp_send_header】

该函数仅需看一下 send 函数的 flags 即可。

?
1
send(state->sockfd, ( void *)header, 8, MSG_NOSIGNAL);

设置 send 函数第三个参数为 MSG_NOSIGNAL ,即要求进程不把信号 SIGPIPE 发送给系统,而输出到终端。


【sasl_method_name】

该函数说明 rabbitmq-c 仅支持 PLAIN SASL ,其余扩展需要自己实现。


【sasl_response】

没啥好说的,只有 response_buf 的内容拼接格式看下就可以。


【wait_frame_inner】

该函数是当前 C 文件最核心的一个。其中包含了用于帧数据解析和生成的 amqp_handle_input 函数,以及用于接收数据 recv 函数。其中 recv 是阻塞点。在 sockfd 上收到的数据先缓存在 sock_inbound_buffer 中,随后会在 while 循环中按协议,逐个按帧解析出来,返回给上层。此代码写的挺精巧,可以学习借鉴。


【amqp_simple_wait_frame】

此函数是对 wait_frame_inner 的封装,并增加了对帧链表的检测和处理。意思是只要帧链表中还有数据待处理就先把其中的数据取出来处理,否则通过 wait_frame_inner 接收帧后再处理。


【amqp_simple_wait_method

此函数是对 amqp_simple_wait_frame 的封装,并增加了对帧的内容的检测和处理。


【amqp_send_method】

构造待发送帧的内容,内部使用 amqp_send_frame 来发送实际的帧数据。


【amqp_simple_rpc】

该函数是另一个需要留意的点。其内部通过调用 amqp_send_method 和 wait_frame_inner 实现了简单的 RPC 功能。

通过 wait_frame_inner 得到的帧不一定是 amqp_send_method 方法希望得到的应答帧,故需要对前者的返回内容进行相应的处理,包括对 channel 关闭、connection 关闭,以及非 method 帧的处理(放进帧链表)。只有在获得正确的应答帧后才返回。


【amqp_simple_rpc_decoded】

此函数是对 amqp_simple_rpc 的封装,仅增加了对数据的组织处理。


【amqp_login_inner】

该函数就是使用上述的函数实现了登陆流程中的各个动作:发送协议版本信息、进行 connection 处理、进行 SASL 处理、进行 TUNE 处理。详细请参考协议文档。


【amqp_login】

除了包含 amqp_login_inner 外,额外还增加了对 channel 信令的处理。


以上就是对 amqp_socket.c 内容的全部要点总结,相信有助于你对 rabbitmq-c 的进一步研究。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
5月前
|
消息中间件 NoSQL 数据库
一文讲透消息队列RocketMQ实现消费幂等
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。
一文讲透消息队列RocketMQ实现消费幂等
|
2月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
76 0
|
4月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
46 0
|
4月前
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
124 0
|
10天前
|
消息中间件 大数据 Java
消息队列 MQ
消息队列 MQ
18 3
|
14天前
|
消息中间件 数据安全/隐私保护
MQTT微消息队列服务器连接报错:Error: Connection refused: Not authorized
使用MQTTX工具进行测试时,通过AccessKey创建了Client ID的用户名和密码。配置了公网接入点及端口1883,但尝试连接时出现错误。已附上工具截图:![](https://ucc.alicdn.com/pic/developer-ecology/3byii5uar64gg_36327474e991439da422f38c450ef153.png)。确认过用户名、密码和Client ID无误,问题仍未解决,期待回复!
|
27天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
115 4