ActiveMQ的几种集群配置。
Queue consumer clusters
此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下:
Broker clusters
此种配置是一个消费者连接到多个broker集群的中的一个broker,当该broker出问题时,消费者自动连接到其他一个正常的broker。消费者使用 failover:// 协议来连接broker。
failover:(tcp://localhost:61616,tcp://localhost:61617)
failover官网介绍 http://activemq.apache.org/failover-transport-reference.html
broker之间的通过静态发现(static discovery)和动态发现(dynamic discovery)来维持彼此发现,下面来介绍静态发现和动态发现的机制:
静态发现:
静态发现通过配置固定的broker uri来发现彼此,配置语法如下:
static:(uri1,uri2,uri3,...)?options
例如:
1
|
static
:(tcp:
//localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100
|
更多静态发现介绍,见ActiveMQ官网 http://activemq.apache.org/static-transport-reference.html
动态发现:
动态发现机制是在各个broker启动时通过Fanout transport来发现彼此,配置举例如下:
1
2
3
4
5
6
|
<broker name=
"foo"
>
<transportConnectors>
<transportConnector uri=
"tcp://localhost:0"
discoveryUri=
"multicast://default"
/>
</transportConnectors>
...
</broker>
|
更多动态发现机制介绍,见官网 http://activemq.apache.org/discovery-transport-reference.html
Networks of brokers
多个broker组成集群,当其中一个broker的消费者出问题导致消息堆积无法消费掉时,通过ActiveMQ支持的Network of Broker方案可将该broker堆积的消息转发到其他有消费者的broker。
该方案主要有以下两种配置方式:
1、为broker配置文件配置networkConnector元素
2、使用发现机制互相探测broker
Here is an example of using the fixed list of URIs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
<?
xml
version="1.0" encoding="UTF-8"?>
<
beans
xmlns="http://activemq.org/config/1.0">
<
broker
brokerName="receiver" persistent="false" useJmx="false">
<
networkConnectors
>
<!-- Static discovery -->
<
networkConnector
uri="static:(tcp://localhost:62001)"/>
<!-- MasterSlave Discovery -->
<!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->
</
networkConnectors
>
<
persistenceAdapter
>
<
memoryPersistenceAdapter
/>
</
persistenceAdapter
>
<
transportConnectors
>
<
transportConnector
uri="tcp://localhost:62002"/>
</
transportConnectors
>
</
broker
>
</
beans
>
|
This example uses multicast discovery:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
<?
xml
version="1.0" encoding="UTF-8"?>
<
beans
xmlns="http://activemq.org/config/1.0">
<
broker
name="sender" persistent="false" useJmx="false">
<
networkConnectors
>
<
networkConnector
uri="multicast://default"/>
</
networkConnectors
>
<
persistenceAdapter
>
<
memoryPersistenceAdapter
/>
</
persistenceAdapter
>
<
transportConnectors
>
<
transportConnector
uri="tcp://localhost:0" discoveryUri="multicast://default"/>
</
transportConnectors
>
</
broker
>
</
beans
>
|
Master Slave
通过部署多个broker实例,一个master和多个slave关系的broker来达到高可用性,有三种方案:
1、Master-Slave
2、SharedFile System Master Slave
3、JDBCMaster Slave
第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;
第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;
第二种方案同样支持N个AMQ实例组网,基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。
Master Slave方案当其中一个broker启动并拿到独占锁时自动成为master,其他后续的broker则一直等待锁,当master宕机释放锁时其他slave拿到独占锁则自动成为master,部署结构如下:
第二种方案的配置只需修改config文件夹下activemq.xml文件,修改消息持久化使用的方案:
1
2
3
4
5
6
7
|
<
broker
xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">
...
<
persistenceAdapter
>
<
kahaDB
directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
</
persistenceAdapter
>
...
</
broker
>
|
消息生产者代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
public
class
P2PSender {
private
static
final
String QUEUE =
"client1-to-client2"
;
public
static
void
main(String[] args) {
// ConnectionFactory :连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS客户端到JMS Provider的连接
Connection connection =
null
;
// Session:一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory =
new
ActiveMQConnectionFactory(
"failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"
);
try
{
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(QUEUE);
// 获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息
sendMessage(session, producer);
// session.commit();
connection.close();
}
catch
(Exception e) {
e.printStackTrace();
}
finally
{
if
(
null
!= connection) {
try
{
connection.close();
}
catch
(JMSException e) {
e.printStackTrace();
}
}
}
}
public
static
void
sendMessage(Session session, MessageProducer producer)
throws
Exception {
for
(
int
i =
1
; i <=
1
; i++) {
Date d =
new
Date();
TextMessage message = session.createTextMessage(
"ActiveMQ发送消息"
+ i +
" "
+
new
Date());
System.out.println(
"发送消息:ActiveMQ发送的消息"
+ i +
" "
+
new
Date());
producer.send(message);
}
}
}
|
消息消费者代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public
class
P2PReceiver {
private
static
final
String QUEUE =
"client1-to-client2"
;
public
static
void
main(String[] args) {
// ConnectionFactory :连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS客户端到JMS Provider的连接
Connection connection =
null
;
// Session:一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory =
new
ActiveMQConnectionFactory(
"failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"
);
try
{
// 得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
// 创建Queue
destination = session.createQueue(QUEUE);
consumer = session.createConsumer(destination);
while
(
true
) {
TextMessage message = (TextMessage) consumer.receive();
if
(
null
!= message) {
System.out.println(
"收到消息"
+ message.getText());
}
}
}
catch
(Exception e) {
e.printStackTrace();
}
finally
{
try
{
if
(
null
!= connection)
connection.close();
}
catch
(Throwable ignore) {
}
}
}
}
|
本文转自邴越博客园博客,原文链接:http://www.cnblogs.com/binyue/p/5325945.html,如需转载请自行联系原作者