III 23 rabbitmq

  1. 云栖社区>
  2. 博客>
  3. 正文

III 23 rabbitmq

技术小阿哥 2017-11-28 14:36:00 浏览790
展开阅读全文

MOMmessage oriented middleware

消息中间件(是在消息的传递过程中保存消息的容器,消息中间件再将消息从它的源中继到它的目标时,充当中间人的作用,队列的主要目的是提供路由并保证消息的传递,如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功的传递它为止,消息队列保存消息也是有期限的);

 

消息中间件特点:

采用异步处理模式(消息发送者可以发送一个消息而无须等待响应,消息发送者将消息发送到一条虚拟的通道上(主题topic或队列queue),消息接收者订阅或监听该通道,一条消息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者作出同步回应,整个过程是异步的;例如用户信息注册,注册完毕后,过段时间才收到邮件或短信(pub/sub模型));

应用程序和应用程序调用关系为松耦合关系(发送者和接收者不必了解对方,只需要确认消息;发送者和接收者不必同时在线;例如在线交易系统,为保证数据的最终一致,在支付系统处理完成后会把支付结果放到消息中间件里,通知订单系统修改订单支付状态,两个系统通过消息中间件)

 

消息传递服务模型:

wKioL1fgtrnzpWqvAAA1u8p3WkE026.jpg

 

 

消息中间件的传递模型(PTP点对点模型;pub/sub发布/订阅模型):

PTP模型用于消息producer和消息consumer之间点到点的通信,消息producer将消息发送到由某个名字标识的特定consumer,这个名字实际是对应于消息服务中的一个queue,在消息传递给消费者之前它被存储在这个queue中,队列消息可放在内存中,也可是持久的,以保证在消息服务出故障时仍能传递消息;

PTP模型特性(每个消息对应一个consumer;发送者和接收者没有时间依赖;接收者确认消息接收和处理成功);

wKiom1fgtsKQFDFIAAAQg6_SVf8318.jpg

 

pub/sub模型支持向一个特定的消息topic生产消息,0或多个subscriber可能对接收来自特定消息topic的消息感兴趣,这种模型下,pubsub彼此不知道对方,类似匿名公告板,可理解为broadcast广播;多个consumer可获取消息,在pubsub之间存在时间依赖性(publishersubscriber只有建立订阅关系,subscriber才能收到消息),发布者需要建立一个订阅subscription,以便能让consumer订阅;subscriber必须保持一定的活动状态以接收消息,除非subscriber建立了持久的订阅,这种情况下,在subscriber未连接时发布的消息将在subscriber重新连接时重新发布;

pub/sub模型特性(每个消息可以有多个subscriberconsumer只有订阅后才能接收到消息;持久订阅和非持久订阅(时间长短的区别));

注:持久订阅(长连接,订阅关系建立后,消息就不会消失(就算consumer不在线或宕机消息不会消失),会暂存在MOM上);非持久订阅(长连接,订阅者为了接收消息,必须一直在线,consumer不在线或宕机将收不到消息;当只有一个subscriber时相当于是PTP模式)

wKioL1fgtszhN2A_AAAgCflz4gg062.jpg

 

互联网消息中间件应用场景:

1、用户信息注册,注册完后过段时间才收到邮件、短信(pub/sub模型,pushpull):

wKiom1fgttfCEw9fAAAcBYkrD5o828.jpg

 

2、日志分析使用,pv分析计算、用户行为分析(pushpull,时效性要求高时push):

wKioL1fgtuDg7eQ-AAAgnvKOIb4258.jpg

 

3、数据复制(将数据从源头复制到多个目的地,一般要求顺序或保证因果顺序,用于跨机房数据传输、搜索、离线数据计算等;pushpull,对时间要求高时用push):

wKiom1fgtuzy_3fKAAAlGj6bm3Y564.jpg

 

4、延迟消息发送和暂存,把消息中间件当成可靠的消息暂存地,定时进行消息投递(例如模拟用户秒杀访问,进行系统性能压测,pull):

wKioL1fgtvXQnYirAAAc1xobTL0120.jpg

 

5、消息广播(缓存数据同步更新,往应用推送数据,例如更新本地缓存、变更商品价格;push):

wKiom1fgtwGxg9alAAAZ9VGDzoc125.jpg

 

 

消息中间件分类(push推;pull拉):

push推消息模型(消息producer将消息发送给消息传递服务mom,消息传递服务又将消息推给消息consumer);

pull拉消息模型(消息producer将消息发送给消息传递服务,消息consumer从消息传递服务上请求拉取消息);

 


Push

Pull

MOM

消息存储;处理请求;保存推送轨迹;保存订阅关系;消费者LB;集中式

消息存储;处理请求;分布式

Consumer

处理响应和请求

处理响应和请求;保存pull状态,如拉取位置的offset;异常情况下的消息暂存和recover

实时性

较好,收到数据后可立即发送给consumer

取决于pull的间隔时间

Consumer故障

服务端堆积消息;重复推送耗费资源;保存推送轨迹压力大

对中间件无影响

其它

对消息推送有更多控制;能实现多样化的推送机制;当消费者数量增多时,推送压力大,性能天花板;consumer处理能力差异,导致堆消息

需要在consumer端实现消息过滤,浪费资源;需要在不同consumer之间协调作LB

 

 

rabbitmq是在AMQPadvanced message queuing protocol)协议标准基础上实现的,遵循mozilla public license开源协议,采用erlang实现的工业级mq server

http://www.rabbitmq.com/

AMQP是一个异步消息传递所使用的应用层协议规范,作为线路层协议而不是API(如jmsjava message service),amqp能无视消息的来源任意发送和接收消息;amqp的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构,提供通用构建工具;因此面向消息的消息中间件系统,如pub/sub队列没有作为基本元素实现,反而通过发送简化的amq实体,用户被赋予了构建这些实体的能力,这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级;amqp模型统一了消息模式,如pub/sub、列队、事务及流数据,并且添加了额外的特性,如更易于扩展基于内容的路由;

百科上的AMQPAMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计;基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制;Erlang中的实现有 RabbitMQ等;

RabbitMQ是由RabbitMQ Technologies Ltd开发并且提供商业支持的,该公司在20104月被SpringSourceVMWare的一个部门)收购,在20135月被并入Pivotal;其实VMWarePivotalEMC本质上是一家的,不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市;

 

amqp modelvirtual host=exchange+MQ;一个server可有多个virtual host):

wKioL1fgtybDmJCOAABhEbsiM8A288.jpg

 

rabbitmq的整体架构(两大核心组件exchangequeueserverbroker):

wKiom1fgtzHAgTmNAACptXpXVOk961.jpg

注:

RabbitMQ Server:也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQ isnt a food truck, its a delivery service. 他的角色就是维护一条从ProducerConsumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。

Client A & B:也叫Producer,数据的发送方。create messages and publish (send) them to a broker server (RabbitMQ).一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。labelexchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个ConsumerAMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。

Client 123:也叫Consumer,数据的接收方。Consumers attach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payloadlabel已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。是协议本身不支持。但是当然了如果Producer发送的payload包含了Producer的信息就另当别论了。

 

Exchanges are where producers publish theirmessages.

Queues are where the messages end up and arereceived by consumers

Bindings are how the messages get routedfrom the exchange to particular queues.

Connection:就是一个TCP的连接,ProducerConsumer都是通过TCP连接到RabbitMQ Server的,程序的起始处就是建立这个TCP连接

Channels:虚拟连接,它建立在上述的TCP连接中,数据流动都是在Channel中进行的,也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel

为什么使用Channel,而不是直接使用TCP连接?对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力;但是,在TCP连接中建立Channel是没有上述代价的,对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。有实验表明,1s的数据可以Publish 10K的数据包,当然对于不同的硬件环境,不同的数据包大小这个数据肯定不一样,但是我只想说明,对于普通的Consumer或者Producer来说,这已经足够了,如果不够用,你考虑的应该是如何细化split你的设计

 

rabbitmq重要术语:

serverbroker,接受clientproducerconsumer)连接,实现amqp消息队列和路由功能的进程);

virtual host(是虚拟概念,类似于权限控制组,一个virtualhost里面可有若干个exchangequeue,但是权限控制的最小粒度是virtual host);

exchange(接收producer发送的消息,并根据binding规则将消息路由给server中的queueexchangetype决定了exchange路由消息的行为,exchange typedirect exchangefanout exchangetopic exchangeheaders exchange四种,不同类型的exchange路由的行为是不一样的);

message queue(用于存储还未被consumer消费的消息);

message(由headerbody组成,header是由producer添加的各种属性的集合,包括message是否被持久化,由哪个messagequeue接收,优先级是多少等,而body是真正需要传输的app数据);

BindingKey(所谓绑定就是将一个特定的exchange和一个特定的queue绑定起来,关键字为BindingKey);

 

exchange type

direct exchange(直接交互式处理路由键(bindingkey,可理解为是一个队列名字),需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配,这是一个完整的匹配;如果一个队列绑定到该交换机上要求路由键dog,则只有被标记为dog的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog);

wKioL1fgt1riwaQEAABGgJEiZfU883.jpg

fanout exchange(广播式路由键,只需将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;类似子网广播,子网内的每台主机都获得了一份复制的消息;fanout交换机转发消息是最快的);

wKiom1fgt2SDIP9DAABSVJq5HQM927.jpg

topic exchange(主题式交换器,通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中,这种路由器类型可被用来支持经典的pub/sub消息传输类型(使用主题名字空间作为消息寻址模式,将消息传递给那些部分或全部匹配主题模式的多个consumer);工作方式:绑定关键字用0个或多个标记构成,每一个标记之间用点分隔,绑定关键字必须用这种形式明确说明,支持通配符(*匹配一个词组,#匹配0个或多个词组),例如绑定关键字*.stock.#,匹配路由关键字usd.stockeur.stock.db,不匹配stock.nasdaq);

wKioL1fgt2_hyPRKAACEtCBS3So963.jpg

headers exchange

A headers exchange is designed to forrouting on multiple attributes that are more easily expressed as messageheaders than a routing key. Headers exchanges ignore the routing key attribute.Instead, the attributes used for routing are taken from the headers attribute.A message is considered matching if the value of the header equals the valuespecified upon binding.

It is possible to bind a queue to a headersexchange using more than one header for matching. In this case, the brokerneeds one more piece of information from the application developer, namely,should it consider messages with any of the headers matching, or all of them?This is what the "x-match" binding argument is for. When the"x-match" argument is set to "any", just one matchingheader value is sufficient. Alternatively, setting "x-match" to"all" mandates that all the values must match.

Headers exchanges can be looked upon as"direct exchanges on steroids". Because they route based on headervalues, they can be used as direct exchanges where the routing key does nothave to be a string; it could be an integer or a hash (dictionary) for example.

 

 

rabbitmq配置(一般情况下,默认配置足够,若有特殊配置通过以下两个文件):

/etc/rabbitmq/rabbitmq-env.conf   #define ports, file locations and names (taken from the shell, or set in the environment configuration file,rabbitmq-env.conf/rabbitmq-env-conf.bat),常用的变量见:http://www.rabbitmq.com/configure.html#define-environment-variables

/etc/rabbitmq/rabbitmq.config   #defines server component settings for permissions, limits and clusters, and also plugin settings.

 

rabbitmq-env.conf常用的参数:

RABBITMQ_NODE_IP_ADDRESS=

RABBITMQ_NODE_PORT=

RABBITMQ_DIST_PORT=   #Port to use for clustering. Ignored if your config file sets inet_dist_listen_min orinet_dist_listen_max

RABBITMQ_NODENAME=

RABBITMQ_CONF_ENV_FILE=   #Location of the file that contains environment variable definitions (without the RABBITMQ_ prefix).Note that the file name on Windows is different from other operating systems.

 

rabbitmq.config这是个标准的erlang配置文件,必须符合erlang配置文件标准,erlangtuple,结构为{key,value}keyatom类型,value为一个term,关键字参数有:

tcp_listeners   #List of ports on which to listen for AMQP connections (without SSL). Can contain integers(meaning "listen on all interfaces") or tuples such as {"127.0.0.1",5672} to listen on one or more interfaces.Default: [5672];设置rabbitmq监听的port,要与rabbitmq-env.confRABBITMQ_NODE_PORT一致);

disk_free_limit   #Disk free space limit of the partition on which RabbitMQ is storing data. When available diskspace falls below this limit, flow control is triggered. The value may be setrelative to the total amount of RAM (e.g. {mem_relative, 1.0}). The value mayalso be set to an integer number of bytes. Or, alternatively, in informationunits (e.g "50MB"). By default free disk space must exceed 50MB. See the Disk Alarms documentation.Default: 50000000disk低水位线,若disk容量低于指定值则停止接收数据);

vm_memory_high_watermark   #Memory threshold at which the flow control is triggered. See the memory-based flow control documentation.Default: 0.4即内存问题的40%;内存低水位线,若低于该水位线,则开启流控机制)

[

   {rabbit, [{tcp_listeners, [5673]}]}

].

 

rabbitmq命令:

#service rabbitmq-server start|stop|restart|reload

#rabbitmqctl add_vhost VHOSTNAME   #(创建虚拟主机)

#rabbitmqctl delete_vhost VHOSTNAME   #(删除虚拟主机)

#rabbitmqctl list_vhosts   #(遍历所有虚拟主机信息)

#rabbitmqctl add_user USERNAMEPASSWORD   #(添加用户)

#rabbitmqctl change_password USERNAMENEW_PASSWORD   #(修改用户密码)

#rabbitmqctl set_permissions -p VHOSTNAMEUSER ".*" ".*" ".*"   #vhost+user,绑定权限,并赋予rw权限)

#rabbitmqctl list_queues   (显示所有队列)

 

 

操作:

[root@server1 ~]# uname -rm

2.6.32-431.el6.x86_64 x86_64

[root@server1 ~]# cat /etc/redhat-release

Red Hat Enterprise Linux Server release 6.5(Santiago)

 

[root@server1 ~]# wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo   #aliyumyum源用来安装依赖的包,如tkerlang-*等,epel这个yum源用来安装rabbitmq-server

[root@server1 ~]# rpm -Uvh --force epel-release-6-8.noarch.rpm   #(下载地址:http://mirrors.ustc.edu.cn/fedora/epel/6/x86_64/

Preparing...                ###########################################[100%]

  1:epel-release          ########################################### [100%]

[root@server1 ~]# yum makecache

……

[root@server1 ~]# yum -y install rabbitmq-server

……

Installed:

 rabbitmq-server.noarch 0:3.1.5-1.el6                                                                                              

Dependency Installed:

 SDL.x86_64 0:1.2.14-7.el6_7.1                                    erlang.x86_64 0:R14B-04.3.el6                                    

 erlang-appmon.x86_64 0:R14B-04.3.el6                             erlang-asn1.x86_64 0:R14B-04.3.el6                              

 erlang-common_test.x86_64 0:R14B-04.3.el6                         erlang-compiler.x86_640:R14B-04.3.el6                           

 erlang-cosEvent.x86_64 0:R14B-04.3.el6                           erlang-cosEventDomain.x86_64 0:R14B-04.3.el6                    

 erlang-cosFileTransfer.x86_64 0:R14B-04.3.el6                    erlang-cosNotification.x86_64 0:R14B-04.3.el6                   

 erlang-cosProperty.x86_64 0:R14B-04.3.el6                         erlang-cosTime.x86_640:R14B-04.3.el6                           

 erlang-cosTransactions.x86_64 0:R14B-04.3.el6                     erlang-crypto.x86_64 0:R14B-04.3.el6                            

 erlang-debugger.x86_64 0:R14B-04.3.el6                           erlang-dialyzer.x86_64 0:R14B-04.3.el6                          

 erlang-diameter.x86_64 0:R14B-04.3.el6                            erlang-docbuilder.x86_640:R14B-04.3.el6                        

 erlang-edoc.x86_64 0:R14B-04.3.el6                               erlang-erl_docgen.x86_64 0:R14B-04.3.el6                        

 erlang-erl_interface.x86_64 0:R14B-04.3.el6                       erlang-erts.x86_640:R14B-04.3.el6                              

 erlang-et.x86_64 0:R14B-04.3.el6                                 erlang-eunit.x86_64 0:R14B-04.3.el6                             

 erlang-examples.x86_64 0:R14B-04.3.el6                            erlang-gs.x86_640:R14B-04.3.el6                                

 erlang-hipe.x86_64 0:R14B-04.3.el6                               erlang-ic.x86_64 0:R14B-04.3.el6                                

 erlang-inets.x86_64 0:R14B-04.3.el6                              erlang-inviso.x86_640:R14B-04.3.el6                            

 erlang-jinterface.x86_64 0:R14B-04.3.el6                          erlang-kernel.x86_640:R14B-04.3.el6                            

 erlang-megaco.x86_64 0:R14B-04.3.el6                             erlang-mnesia.x86_64 0:R14B-04.3.el6                            

 erlang-observer.x86_64 0:R14B-04.3.el6                            erlang-odbc.x86_640:R14B-04.3.el6                              

 erlang-orber.x86_64 0:R14B-04.3.el6                              erlang-os_mon.x86_64 0:R14B-04.3.el6                            

 erlang-otp_mibs.x86_64 0:R14B-04.3.el6                           erlang-parsetools.x86_64 0:R14B-04.3.el6                        

 erlang-percept.x86_64 0:R14B-04.3.el6                             erlang-pman.x86_640:R14B-04.3.el6                              

 erlang-public_key.x86_64 0:R14B-04.3.el6                          erlang-reltool.x86_640:R14B-04.3.el6                            

 erlang-runtime_tools.x86_64 0:R14B-04.3.el6                       erlang-sasl.x86_640:R14B-04.3.el6                              

 erlang-snmp.x86_64 0:R14B-04.3.el6                               erlang-ssh.x86_64 0:R14B-04.3.el6                               

 erlang-ssl.x86_64 0:R14B-04.3.el6                                erlang-stdlib.x86_64 0:R14B-04.3.el6                            

 erlang-syntax_tools.x86_64 0:R14B-04.3.el6                       erlang-test_server.x86_64 0:R14B-04.3.el6                       

 erlang-toolbar.x86_64 0:R14B-04.3.el6                            erlang-tools.x86_64 0:R14B-04.3.el6                             

 erlang-tv.x86_64 0:R14B-04.3.el6                                 erlang-typer.x86_64 0:R14B-04.3.el6                             

 erlang-webtool.x86_64 0:R14B-04.3.el6                             erlang-wx.x86_640:R14B-04.3.el6                                

 erlang-xmerl.x86_64 0:R14B-04.3.el6                               unixODBC.x86_640:2.2.14-14.el6                                 

 wxBase.x86_64 0:2.8.12-1.el6.centos                               wxGTK.x86_640:2.8.12-1.el6.centos                              

 wxGTK-gl.x86_64 0:2.8.12-1.el6.centos                            

Complete!

[root@server1 ~]# rpm -ql rabbitmq-server

/etc/logrotate.d/rabbitmq-server

/etc/rabbitmq

/etc/rc.d/init.d/rabbitmq-server

/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server

/usr/lib/rabbitmq/bin

/usr/lib/rabbitmq/bin/rabbitmq-defaults

/usr/lib/rabbitmq/bin/rabbitmq-env

/usr/lib/rabbitmq/bin/rabbitmq-plugins

/usr/lib/rabbitmq/bin/rabbitmq-server

/usr/lib/rabbitmq/bin/rabbitmqctl

/usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5

……

/usr/sbin/rabbitmq-server

/usr/sbin/rabbitmqctl

……

[root@server1 ~]# service rabbitmq-server start

Starting rabbitmq-server: SUCCESS

rabbitmq-server.

[root@server1 ~]# netstat -tnulp | grep:5672

tcp       0      0 :::5672                     :::*                        LISTEN      11303/beam 

[root@server1 ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins list  #rabbitmq_web_dispathrabbitmq_web_stomprabbitmq_web_stomp_example用于监控)

[ ] amqp_client                       3.1.5

[ ] cowboy                           0.5.0-rmq3.1.5-git4b93c2d

[ ] eldap                             3.1.5-gite309de4

[ ] mochiweb                         2.7.0-rmq3.1.5-git680dba8

[ ] rabbitmq_amqp1_0                  3.1.5

[ ] rabbitmq_auth_backend_ldap        3.1.5

[ ] rabbitmq_auth_mechanism_ssl       3.1.5

[ ] rabbitmq_consistent_hash_exchange 3.1.5

[ ] rabbitmq_federation               3.1.5

[ ] rabbitmq_federation_management    3.1.5

[ ] rabbitmq_jsonrpc                  3.1.5

[ ] rabbitmq_jsonrpc_channel          3.1.5

[ ] rabbitmq_jsonrpc_channel_examples 3.1.5

[ ] rabbitmq_management               3.1.5

[ ] rabbitmq_management_agent         3.1.5

[ ] rabbitmq_management_visualiser    3.1.5

[ ] rabbitmq_mqtt                     3.1.5

[ ] rabbitmq_shovel                   3.1.5

[ ] rabbitmq_shovel_management        3.1.5

[ ] rabbitmq_stomp                    3.1.5

[ ] rabbitmq_tracing                  3.1.5

[ ] rabbitmq_web_dispatch             3.1.5

[ ] rabbitmq_web_stomp                3.1.5

[ ] rabbitmq_web_stomp_examples       3.1.5

[ ] rfc4627_jsonrpc                   3.1.5-git5e67120

[ ] sockjs                           0.3.4-rmq3.1.5-git3132eb9

[ ] webmachine                       1.10.3-rmq3.1.5-gite9359c7

[root@server1 ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management

The following plugins have been enabled:

 mochiweb

 webmachine

 rabbitmq_web_dispatch

 amqp_client

 rabbitmq_management_agent

 rabbitmq_management

Plugin configuration has changed. RestartRabbitMQ for changes to take effect.

[root@server1 ~]# rabbitmqctl list_queues  

Listing queues ...

...done.

[root@server1 ~]# rabbitmqctl list_vhosts

Listing vhosts ...

/

...done.

[root@server1 ~]# rabbitmqctl add_vhost test   (新添加vhost/为默认就有)

Creating vhost "test" ...

...done.

[root@server1 ~]# rabbitmqctl list_vhosts

Listing vhosts ...

/

test

...done.

[root@server1 ~]# rabbitmqctl add_user jowin 123456

Creating user "jowin" ...

...done.

[root@server1 ~]# rabbitmqctl list_users

Listing users ...

guest        [administrator]

jowin         []

...done.

[root@server1 ~]# rabbitmqctl set_permissions -p test jowin ".*" ".*"".*"

Setting permissions for user"jowin" in vhost "test" ...

...done.

 

 

 

 

MetaQ(全称Metamorphosis)是一个高性能、高可用、可扩展的分布式消息中间件,思路起源于LinkedInKafka,但并不是Kafka的一个CopyMetaQ具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,目前在淘宝和支付宝有着广泛的应用;

metaq是一款完全队列模型的消息中间件,用java编写,可在多种平台上部署,典型的pull机制;consumer-side支持javaC++编程语言;单台server可支持1w+消息队列,通过扩容,队列数几乎可任意横向扩展;每个队列可持久化、长度无限(取决于disk空间);可从任意位置开始消费;

http://metaq.taobao.org/

 

总体架构:

wKioL1fguA2THWVAAAA-TObaTzg374.jpg

 

metaq特点:

producerMOMbroker)、consumer都可分布式;

消息存储顺序写;支持消息顺序;

性能极高,吞吐量大;

consumer拉取,随机读,批量拉取数据;

数据迁移、扩容对用户透明;

消费状态保存在consumer-side

 

metaq术语:

brokermetaq的服务端,在消息中间件中通常称为broker);

topic(消息的主题,可理解为队列名,由用户定义并在中间件配置,producer发送消息到某个topic下,consumer从某个topic下消费消息);

offset(消息在中间件broker上的每个分区都组织成一个文件列表,consumer拉取数据需要知道数据在文件中的offset,是绝对偏移量,中间件会将offset转化为具体文件的相对偏移量);

partition(分区,同一个topic下有多个partition,例如meta-test这个topic可分10partition,分别有两台broker提供,那可能每台broker提供5partition,若broker id分别为01,所有分区为0-00-10-20-30-41-01-11-21-31-4);

 

metaq主要配置项:

[root@server1 ~]# vim /usr/local/taobao/metamorphosis-server-wrapper/conf/server.ini

[system]

brokerId=0  #(必须是集群内唯一,0-1024

numPartitions=1

serverPort=8123   #(服务器port

;; hostName=   #(默认取本机IP,多网卡需指明)

;; dataPath=   #(默认数据存储路径)

;; dataLogPath=   #(日志数据文件路径,默认与dataPath一样)

dashboardHttpPort=8120

unflushThreshold=0   #(每隔多少条消息做一次disk sync,强制将更改的数据刷入disk,默认0表示强制每次写入都sync,当为0时服务器会自动启用groupcommit技术,将多个消息合并成一次sync来提升io性能,经测试group commit情况下消息发送者的tps没有受到太大影响,但server负载会上升很多;若为1000表示在掉电时最多允许丢失1000条消息,

unflushInterval=10000   #(间隔多少ms做一次disk sync,默认10s,在掉电时最多丢失10s内发送过来的消息,不可设为0<0

maxSegmentSize=1073741824

maxTransferSize=1048576

deletePolicy=delete,168   #(数据删除策略,单位小时,默认超过7天就删除,此处为168小时,若为其它单位要注明,例如10s10m

deleteWhen=0 0 6,18 * * ?   #(何时执行删除策略的cron表达式,默认是此处设置,每天6:0018:00执行)

flushTxLogAtCommit=1   #(事务日志的同步设置,0表示os决定,1表示每次commit都同步,2表示每隔1s同步一次;此参数严重影响性能,可根据需要的性能和可靠性权衡作出合理选择,建议设为2,有问题最多丢失1s内运行的事务,这个级别对大多数服务是可靠的;最安全的是设为1这将严重影响事务性能;而0的安全级别最低,在安全级别上1>=2>0,而在性能上0>=2>1

stat=true

updateConsumerOffsets=true

[zookeeper]

;; zk.zkEnable=True   #(是否注册到zk,默认True

zk.zkConnect=10.96.20.113:2181,10.96.20.114:2181   #zk的服务器列表)

zk.zkSessionTimeoutMs=30000   #zk心跳超时,单位ms,默认30s

zk.zkConnectionTimeoutMs=30000   #zk连接超时时间,单位ms,默认30s

zk.zkSyncTimeMs=5000

[topic=test]

[topic=meta-test]

 

metaq的集群实现:

所有的broker注册到zookeeperproducer连接zookeeper并返回可用的broker列表,选择一个broker发送消息

 

metaq主要命令:

#./metaServer.sh start &

#./metaServer.sh stop

#./metaServer.sh restart &

#./metaServer.sh reload &

#./metaServer.sh stats

 

http://apache.fayea.com/zookeeper/zookeeper-3.4.6/

https://github.com/killme2008/Metamorphosis/releases

 

 

metaq操作:

准备:

server1eth0:10.96.20.113eth1:192.168.10.113

server2eth0:10.96.20.114eth1:192.168.10.114

jdk-8u51-linux-x64.rpm

zookeeper-3.4.6.tar.gz

metaq-server-1.4.6.2.tar.gz

 

[root@server1 ~]# uname -rm

2.6.32-431.el6.x86_64 x86_64

[root@server1 ~]# cat /etc/redhat-release

Red Hat Enterprise Linux Server release 6.5(Santiago)

 

server1server2执行相同操作(安装java运行环境;安装配置zookeeper;安装配置metaq):

[root@server1 ~]# rpm -ivh jdk-8u51-linux-x64.rpm

……

[root@server1 ~]# vim /etc/profile.d/java.sh

JAVA_HOME=/usr/java/jdk1.8.0_51

CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib

PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin

export JAVA_HOMECLASSPATH PATH RESIN_HOME

[root@server1 ~]# . !$

. /etc/profile.d/java.sh

[root@server1 ~]# java -version

java version "1.8.0_51"

Java(TM) SE Runtime Environment (build1.8.0_51-b16)

Java HotSpot(TM) 64-Bit Server VM (build25.51-b03, mixed mode)

 

[root@server1 ~]# tar xf zookeeper-3.4.6.tar.gz -C /usr/local/

[root@server1 ~]# cd /usr/local

[root@server1 local]# ln -sv zookeeper-3.4.6/ zookeeper

`zookeeper' -> `zookeeper-3.4.6/'

[root@server1 local]# cd zookeeper

[root@server1 zookeeper]# mkdir dataDir dataLogDir

[root@server1 zookeeper]# echo 1 > dataDir/myid   #server2上为#echo 2> dataDir/myid

[root@server1 zookeeper]# cat dataDir/myid

1

[root@server1 zookeeper]# cp conf/zoo_sample.cfg conf/zoo.cfg

[root@server1 zookeeper]# vim conf/zoo.cfg  #8880为选举port7770是心跳传递port

dataDir=/usr/local/zookeeper/dataDir

dataLogDir=/usr/local/zookeeper/dataLogDir

clientPort=2181

server.1=10.96.20.113:8880:7770

server.2=10.96.20.114:8880:7770

[root@server1 zookeeper]# cd bin

[root@server1 bin]#./zkServer.sh start

JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[root@server1 bin]# netstat -tnulp | grepjava

tcp       0      0::ffff:10.96.20.113:7770    :::*                        LISTEN      9542/java          

tcp       0      0 :::2181                     :::*                        LISTEN      9542/java          

tcp       0      0 :::39498                    :::*                        LISTEN      9542/java  

[root@server1 bin]# tail zookeeper.out

[root@server1 bin]# ./zkCli.sh -server 10.96.20.113:2181

Connecting to 10.96.20.113:2181

2016-09-19 02:01:31,347 [myid:] - INFO  [main:Environment@100] - Clientenvironment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT

2016-09-19 02:01:31,352 [myid:] - INFO  [main:Environment@100] - Clientenvironment:host.name=server1

……

[main-SendThread(10.96.20.113:2181):ClientCnxn$SendThread@1235]- Session establishment complete on server 10.96.20.113/10.96.20.113:2181,sessionid = 0x15741ac19b00000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:Nonepath:null

[zk: 10.96.20.113:2181(CONNECTED) 0] ls /

[zookeeper]

[zk: 10.96.20.113:2181(CONNECTED) 1] ls /zookeeper

[quota]

[zk: 10.96.20.113:2181(CONNECTED) 2] ls /zookeeper/quota

[]

[zk: 10.96.20.113:2181(CONNECTED) 3] quit

Quitting...

2016-09-19 02:03:16,018 [myid:] - INFO  [main:ZooKeeper@684] - Session:0x15741ac19b00000 closed

2016-09-19 02:03:16,019 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512]- EventThread shut down

[root@server1 bin]# cd

 

[root@server1 ~]# tar xf metaq-server-1.4.6.2.tar.gz -C /usr/local

[root@server1 ~]# cd /usr/local/taobao/metamorphosis-server-wrapper/conf

[root@server1 conf]# vim server.ini   #(注意以下信息,brokerId为全局唯一,server2上的brokerId1dashboardHttpPort用于监控;zk.zkConnect=多个server间用逗号分隔;[topic-jowin]是新添加的一行)

[system]

brokerId=0

serverPort=8123

dashboardHttpPort=8120

[zookeeper]

zk.zkConnect=10.96.20.113:2181,10.96.20.114:2181

[topic=jowin]

[root@server1 conf]# cd ../bin

[root@server1 bin]# ./metaServer.sh start &

……

[INFO] [main] 09-19 02:20:26,837[MetaMorphosisBroker] - Starting metamorphosis server...

[INFO] [main] 09-19 02:20:26,838[MetaMorphosisBroker] - Start metamorphosis server successfully

[INFO] [main] 09-19 02:20:32,021 [Server] -Starting dashboard http server at port 8120

[INFO] [main] 09-19 02:20:32,062 [Server] -jetty-7.6.1.v20120215

[INFO] [main] 09-19 02:20:32,146[AbstractConnector] - Started SelectChannelConnector@0.0.0.0:8120

[INFO] [main] 09-19 02:20:32,147 [Server] -Started dashboard http server successfully.

[root@server1 bin]# netstat -tnulp | grepjava

tcp       0      0 :::24402                    :::*                        LISTEN      9830/java          

tcp       0      0 :::8120                     :::*                        LISTEN      9830/java          

tcp       0      0::ffff:10.96.20.113:7770    :::*                        LISTEN      9542/java          

tcp       0      0 :::8123                     :::*                        LISTEN      9830/java          

tcp       0      0 :::19552                    :::*                        LISTEN      9830/java          

tcp       0      0 :::9123                     :::*                        LISTEN      9830/java          

tcp       0      0 :::2181                     :::*                        LISTEN      9542/java          

tcp       0      0 :::39498                    :::*                        LISTEN      9542/java     

[root@server1 bin]# ./metaServer.sh stats

[2016-09-19 02:21:20,263] INFO 即将启动RemotingController...

配置为:

BaseConfig [callBackExecutorPoolSize=1,callBackExecutorQueueSize=20000, dispatchMessageThreadCount=0, idleTime=10,keepAlive=true, linger=0, maxCallBackCountPerConnection=100000,maxCallBackExecutorPoolSize=30, maxReadBufferSize=0,maxScheduleWrittenBytes=43253760, rcvBufferSize=65536, readBufferSize=131072,readThreadCount=0, reuseAddr=true, scanInvalidCallBackInterval=300,selectorPoolSize=1, sndBufferSize=65536, soLinger=true, tcpNoDelay=true,wireFormatType=metamorphosis, writeThreadCount=0](com.taobao.gecko.service.impl.BaseRemotingController)

[2016-09-19 02:21:20,332] INFO Creating 1rectors... (com.taobao.gecko.core.nio.impl.SelectorManager)

[2016-09-19 02:21:20,501] WARN TheController started at null ... (com.taobao.gecko.core.core.impl.AbstractController)

STATS

pid 9693

broker_id 0

port 8123

uptime 56

version 1.4.6.2

slave false

curr_connections 1

threads 34

cmd_put 0

cmd_get 0

cmd_offset 0

tx_begin 0

tx_xa_begin 0

tx_commit 0

tx_rollback 0

get_miss 0

put_failed 0

total_messages 0

topics 3

config_checksum1414045505

END

 [2016-09-19 02:21:21,940] INFO Controller hasbeen stopped. (com.taobao.gecko.core.core.impl.AbstractController)

 

[root@server1 bin]# cd/usr/local/zookeeper/bin

[root@server1 bin]# ./zkCli.sh -server 10.96.20.113:2181

……

[zk: 10.96.20.113:2181(CONNECTED) 0] ls /

[zookeeper, meta]

[zk: 10.96.20.113:2181(CONNECTED) 1] ls/meta

[brokers]

[zk: 10.96.20.113:2181(CONNECTED) 2] ls/meta/brokers

[topics-sub, ids, topics-pub, topics]

[zk: 10.96.20.113:2181(CONNECTED) 3] ls /meta/brokers/topics

[meta-test, test,jowin]

[zk: 10.96.20.113:2181(CONNECTED) 4] ls /meta/brokers/ids  

[0, 1]

[zk: 10.96.20.113:2181(CONNECTED) 5] quit

Quitting...

2016-09-19 02:34:31,031 [myid:] - INFO  [main:ZooKeeper@684] - Session:0x15741c7b5e40002 closed

2016-09-19 02:34:31,032 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512]- EventThread shut down

 

[root@server1 bin]# ./zkCli.sh -server 10.96.20.114:2181

……

[zk: 10.96.20.114:2181(CONNECTED) 0] get /meta/brokers/ids/0  #server1server2各两块网卡)

meta://192.168.10.113:8123

cZxid = 0x10000007f

ctime = Mon Sep 19 02:27:07 PDT 2016

mZxid = 0x200000004

mtime = Mon Sep 19 02:29:51 PDT 2016

pZxid = 0x20000000f

cversion = 6

dataVersion = 2

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 26

numChildren = 2

[zk: 10.96.20.114:2181(CONNECTED) 1] get /meta/brokers/ids/1

meta://192.168.10.114:8123

cZxid = 0x200000032

ctime = Mon Sep 19 02:41:10 PDT 2016

mZxid = 0x20000004b

mtime = Mon Sep 19 02:42:03 PDT 2016

pZxid = 0x200000055

cversion = 6

dataVersion = 2

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 26

numChildren = 2

[zk: 10.96.20.114:2181(CONNECTED) 2] quit

Quitting...

2016-09-19 02:44:26,186 [myid:] - INFO  [main:ZooKeeper@684] - Session:0x25741c7b6f30003 closed

2016-09-19 02:44:26,188 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512]- EventThread shut down

 

 

 

 

注:若在一个server上部署两个zookeeperbrokermetaq),要改如下信息:

zookeeper程序解压至不同的目录(/usr/local/zookeeper{1,2}/);

zookeepermyid一定要不同(/usr/local/zookeeper1/dataDir/myid1/usr/local/zookeeper2/dataDir/myid2);

zoo.cfg配置文件中:

clientPort要不同(例如zookeeper12181zookeeper22182);

server.1=127.0.0.1:8880:7770

server.2=127.0.0.1:8881:7771

 

vim /usr/local/taobao{1,2}/metamorphosis-server-wrapper/bin/metaServer.sh

PID_FILE="$PID_DIR/.run.pid"   #pid文件不能相同)

vim /usr/local/taobao{1,2}/metamorphosis-server-wrapper/bin/env.sh

export JMX_PORT=9123   #JMX_PORT不能相同)

 

 


本文转自 chaijowin 51CTO博客,原文链接:http://blog.51cto.com/jowin/1854431,如需转载请自行联系原作者

网友评论

登录后评论
0/500
评论
技术小阿哥
+ 关注