RocketMQ实践

  1. 云栖社区>
  2. 博客>
  3. 正文

RocketMQ实践

小飞哥1112 2017-09-24 14:41:00 浏览1427
展开阅读全文

本文主要介绍RocketMQ的使用,主要内容如下:

集群部署:集群部署结构以及Name Server、Broker、Producer、Consumer如何配合保证高可用的
事务:MQ与DB写操作一致性原理,案例分析,系统间数据一致性解决方案
Producer最佳实践
Consumer最佳实践

一、  RocketMQ集群

1.  RocketMQ集群部署结构

383f9f47ba66d33f2cb7e16b0f899fe84f77367d

1)   Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。


2)   Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有NameServer。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

 

3)   Producer

Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

 

4)   Consumer

Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。

 

 

二、  事务

1.  MQ与DB一致性原理

1)   流程图

cad691dfc1e9da6c1f7ae8b197004cd177b39c8e

上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。

MQ消息、DB操作一致性方案:

² 发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。

² 执行DB操作;DB执行成功Commit DB操作,DB执行失败Rollback DB操作。

² 如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败,回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。

² MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,如果发现消息未COMMIT,则通过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE。

 

说明:

² 上面依DB为例,其实此处可以是任何业务或者数据源。

² 以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是clint jar提供的状态,在MQ服务器内部是一个数字。

 

2)   示例代码

a)  创建Producer

创建TransactionMQProducer,并注册TransactionCheckListener。


TransactionCheckListener transactionCheckListener = new TransactionCheckListener() {
	@Override
	public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
		try {
			Object msg = HessianUtils.decode(messageExt.getBody());
			return getCommitStatus(messageExt.getTopic(), messageExt.getTags(), msg);
		} catch (IOException e) {
			logger.error(e.getMessage(), e);
			return LocalTransactionState.COMMIT_MESSAGE;
		}
	}
};
// 设置事务会查监听器
producer.setTransactionCheckListener(transactionCheckListener);



b)  发送事务消息

Message message = new Message(topic, tag, HessianUtils.encode(msg));
// 发送事务性消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, new LocalTransactionExecuter() {

	@Override
	public LocalTransactionState executeLocalTransactionBranch(Message arg0, Object arg1) {
		Boolean result = transactionTemplate.execute(new TransactionCallback<Boolean>() {
			@Override
			public Boolean doInTransaction(TransactionStatus status) {
				try {
					// insert or update db
					return true;
				} catch (Exception e) {
					logger.error("insert / update failed!", e);
					status.setRollbackOnly();
					return false;
				}
			}
		});
		if (result == null || !result) {
			return LocalTransactionState.ROLLBACK_MESSAGE;
		}
				return LocalTransactionState.COMMIT_MESSAGE;
	}
}, null);
if (sendResult.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
	logger.error("send transaction msg failed! topic={}, tags={}, message={}, sendResult={}", topic, tag, msg, JSON.toJSONString(sendResult));
	return sendResult;
}

return sendResult;

2.  案例分析

1)   单机环境下的事务示意图

如下为A给B转账的例子。

   1   

锁定A的账户

2

锁定B的账户

3

检查A账户是否有1元

4

A的账户扣减1元

5

给B的账户加1元

6

解锁B的账户

7

解锁A的账户

以上过程在代码层面甚至可以简化到在一个事物中执行两条sql语句。

2)   集群环境下事务

和单机事务不同,A、B账户可能不在同一个DB中,此时无法像在单机情况下使用事物来实现。此时可以通过一下方式实现,将转账操作分成两个操作。

a)   A账户

   1   

锁定A的账户

2

检查A账户是否有1元

3

A的账户扣减1元

4

解锁A的账户

b)   MQ消息

A账户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转账系统,转账系统来给B账号加钱。

c)   B账户

  1  

锁定B的账户

2

给B的账户加1元

3

解锁B的账户

d)   原理

大事物 = 小事物 + 异步

 

3.  系统之间数据一致性方案

eebd1c50af7b0f638a1ffedc4230a2c36e991b58

以上是交易系统和其他系统之间保持最终一致性的解决方案。

 

三、  顺序消息

1.  顺序消息缺陷

发送顺序消息无法利用集群Fail Over特性
消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。

2.  原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。

注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

3.  Producer

import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}


4.  Consumer

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

 

四、  最佳实践

1.  Producer

1)   Topic

一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags 在broker做消息过滤。

2)   key

每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key 尽可能唯一,这样可以避免潜在的哈希冲突。

//订单Id


String orderId="20034568923546";

message.setKeys(orderId);


3)   日志

消息发送成功或者失败,要打印消息日志,务必要打印 send result 和key 字段。

4)   send

send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。

SEND_OK:消息发送成功

FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

 

2.  Consumer

1)   幂等

RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,此时务必做好幂等。

 

2)   日志

消费时记录日志,以便后续定位问题。

3)   批量消费

尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。

 

 

 

 

五、  参考资料

1.  文档

见附件

 

2.  博客

分布式开放消息系统(RocketMQ)的原理与实践

http://www.jianshu.com/p/453c6e7ff81c

 

RocketMQ事务消费和顺序消费详解

http://www.cnblogs.com/520playboy/p/6750023.html

 

ZeroCopy

http://www.linuxjournal.com/article/6345

 

IO方式的性能数据

http://stblog.baidu-tech.com/?p=851

 

 

 

附件下载: RocketMQ...[小飞哥1112].1514102800.zip

网友评论

登录后评论
0/500
评论
小飞哥1112
+ 关注