Spring Cloud消息总线

简介: Spring Cloud Bus使用场景用于广播应用状态变更到分布式系统中的各个关联的1节点。应用节点间不直接相互通讯,而通过消息总线来实现通知。默认实现AMQP(Rabbit MQ)kafka回顾Spring事件/监听package com.

Spring Cloud Bus

  • 使用场景
    用于广播应用状态变更到分布式系统中的各个关联的1节点。应用节点间不直接相互通讯,而通过消息总线来实现通知。
  • 默认实现
    AMQP(Rabbit MQ)

kafka

回顾Spring事件/监听

package com.segumentfault.spring.event;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;

/**
 * Spring 事件 Demo
 *
 */
public class SpringEventDemo {

    public static void main(String[] args) {
        // 创建 Annotation 驱动的 Spring 应用上下文
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        // 注册 EventConfiguration 到 Spring 应用上下文
        context.register(EventConfiguration.class);
        // 启动 Spring 应用上下文
        context.refresh();
        // AnnotationConfigApplicationContext 也是 ApplicationEventPublisher
        ApplicationEventPublisher publisher = context;
        // 发布一个 MyApplicationEvent
        publisher.publishEvent(new MyApplicationEvent("Hello,World"));
    }

    private static class MyApplicationEvent extends ApplicationEvent {

        public MyApplicationEvent(String message) {
            super(message);
        }
    }

    @Configuration
    public static class EventConfiguration {

        /**
         * 监听 {@link MyApplicationEvent}
         *
         * @param event {@link MyApplicationEvent}
         */
        @EventListener
        public void onEvent(MyApplicationEvent event) {
            System.out.println("监听事件 : " + event);
        }

    }

}

Spring Cloud Bus

改造user-service-client:使用 AMQP 整合 Spring Cloud Bus
1.增加 Maven 依赖

 <!-- 整合 Spring Cloud Bus : AMQP -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

2.启动依赖服务
user-service-client依赖:

  • Eureka Server(1000)
  • Config Server(7070)
  • Rabbit MQ(5672)

事件传播

如何定位 Application Context ID
通过访问http://localhost:8080/beans确认当前Application Context ID

{
    "context": "user-service-client:8080",
    "parent": "user-service-client",
    "beans": []
}

单点传播

POST http://localhost:8080/bus/refresh?destination=user-service-client:8080
执行curl:

curl -X POST http://localhost:8080/bus/refresh?destination=user-service-client:8080

日志输出:

INFO 28041 --- [nio-8080-exec-3] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request. Keys refreshed []

集群传播
POST http://localhost:8080/bus/refresh?destination=user-service-client:**
执行 curl:

curl -X POST http://localhost:8080/bus/refresh?destination=user-service-client:**

日志输出:

INFO 28041 --- [nio-8080-exec-5] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request. Keys refreshed []

通过日志可知事件监听器均为:org.springframework.cloud.bus.event.RefreshListener:

public class RefreshListener
        implements ApplicationListener<RefreshRemoteApplicationEvent> {

    private static Log log = LogFactory.getLog(RefreshListener.class);

    private ContextRefresher contextRefresher;

    public RefreshListener(ContextRefresher contextRefresher) {
        this.contextRefresher = contextRefresher;
    }

    @Override
    public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
        Set<String> keys = contextRefresher.refresh();
        log.info("Received remote refresh request. Keys refreshed " + keys);
    }
}

RefreshListener监听事件RefreshRemoteApplicationEvent
自定义RefreshRemoteApplicationEven监听器

@Configuration
public class BusConfiguration {

    @EventListener
    public void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event) {

        System.out.printf(" Source : %s , originService : %s , destinationService : %s \n",
                event.getSource(),
                event.getOriginService(),
                event.getDestinationService());

    }
}

事件跟踪
默认事件跟踪功能是失效,需要通过配置项激活:spring.cloud.bus.trace.enabled=ture
端点:/trace
事件跟踪详情

 {
    "timestamp": 1513517631139,
    "info": {
      "signal": "spring.cloud.bus.sent",
      "type": "RefreshRemoteApplicationEvent",
      "id": "938c1305-02b8-4697-9ac4-5996908eb58d",
      "origin": "user-service-client:8080",
      "destination": "user-service-client:**"
    }
  },
  {
    "timestamp": 1513517631138,
    "info": {
      "signal": "spring.cloud.bus.ack",
      "event": "RefreshRemoteApplicationEvent",
      "id": "938c1305-02b8-4697-9ac4-5996908eb58d",
      "origin": "user-service-client:8080",
      "destination": "user-service-client:**"
    }
  }

内部事件类型

  • RefreshRemoteApplicationEvent
  • EnvironmentChangeRemoteApplicationEvent
  • AckRemoteApplicationEvent:ack激活
    自定义EnvironmentChangeRemoteAppliicationEvent监听
@EventListener
    public void onEnvironmentChangeRemoteApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {

        System.out.printf("EnvironmentChangeRemoteApplicationEvent - " +
                        " Source : %s , originService : %s , destinationService : %s \n",
                event.getSource(),
                event.getOriginService(),
                event.getDestinationService());

    }

POST请求 /bus/env

curl -X POST http://localhost:8080/bus/env

控制台输出:

EnvironmentChangeRemoteApplicationEvent -  Source : org.springframework.cloud.bus.endpoint.EnvironmentBusEndpoint@656c356c , originService : user-service-client:8080 , destinationService : ** 
2017-12-17 21:40:42.440  INFO 33364 --- [nio-8080-exec-3] o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {}

EnvironmentChangeListener是默认的EnvironmentChangeRemoteApplicationEvent监听器实现
/trace的变化:

{
    "timestamp": 1513518042463,
    "info": {
      "signal": "spring.cloud.bus.sent",
      "type": "EnvironmentChangeRemoteApplicationEvent",
      "id": "1af8f5a0-6d1f-440a-82cd-e09876977d33",
      "origin": "user-service-client:8080",
      "destination": "**:**"
    }
  },
  {
    "timestamp": 1513518042462,
    "info": {
      "signal": "spring.cloud.bus.ack",
      "event": "EnvironmentChangeRemoteApplicationEvent",
      "id": "1af8f5a0-6d1f-440a-82cd-e09876977d33",
      "origin": "user-service-client:8080",
      "destination": "**"
    }
  },
目录
相关文章
|
30天前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
46 0
|
1月前
|
消息中间件 Cloud Native Java
【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合
【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合
|
29天前
|
负载均衡 Java API
Spring Cloud 面试题及答案整理,最新面试题
Spring Cloud 面试题及答案整理,最新面试题
130 1
|
29天前
|
Java Nacos Sentinel
Spring Cloud Alibaba 面试题及答案整理,最新面试题
Spring Cloud Alibaba 面试题及答案整理,最新面试题
135 0
|
30天前
|
SpringCloudAlibaba Java 持续交付
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
130 0
|
30天前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
80 0
|
1月前
|
消息中间件 JSON Java
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
394 0
|
1天前
|
负载均衡 Java 开发者
细解微服务架构实践:如何使用Spring Cloud进行Java微服务治理
【4月更文挑战第17天】Spring Cloud是Java微服务治理的首选框架,整合了Eureka(服务发现)、Ribbon(客户端负载均衡)、Hystrix(熔断器)、Zuul(API网关)和Config Server(配置中心)。通过Eureka实现服务注册与发现,Ribbon提供负载均衡,Hystrix实现熔断保护,Zuul作为API网关,Config Server集中管理配置。理解并运用Spring Cloud进行微服务治理是现代Java开发者的关键技能。
|
2天前
|
Java API 对象存储
对象存储OSS产品常见问题之使用Spring Cloud Alibaba情况下文档添加水印如何解决
对象存储OSS是基于互联网的数据存储服务模式,让用户可以安全、可靠地存储大量非结构化数据,如图片、音频、视频、文档等任意类型文件,并通过简单的基于HTTP/HTTPS协议的RESTful API接口进行访问和管理。本帖梳理了用户在实际使用中可能遇到的各种常见问题,涵盖了基础操作、性能优化、安全设置、费用管理、数据备份与恢复、跨区域同步、API接口调用等多个方面。
19 2
|
16天前
|
负载均衡 网络协议 Java
构建高效可扩展的微服务架构:利用Spring Cloud实现服务发现与负载均衡
本文将探讨如何利用Spring Cloud技术实现微服务架构中的服务发现与负载均衡,通过注册中心来管理服务的注册与发现,并通过负载均衡策略实现请求的分发,从而构建高效可扩展的微服务系统。