Spring Cloud Stream Binder 实现

简介: Spring Cloud Stream Binder 实现JMS 实现 ActiveMQ1.增加Maven依赖<!-- 整合 Sprig Boot Starter ActiveMQ --> <!-- 间接依赖: spring jms ...

Spring Cloud Stream Binder 实现

JMS 实现 ActiveMQ

1.增加Maven依赖

<!-- 整合 Sprig Boot Starter ActiveMQ -->
        <!-- 间接依赖:
            spring jms
            jms api
            activemq
            spring boot jms
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

2.启动ActiveMQ broker


activemq console

3.原生API:生产消息
请注意启动后的控制台输出

private static void sendMessage() throws Exception {
        // 创建 ActiveMQ 链接,设置 Broker URL
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创造 JMS 链接
        Connection connection = connectionFactory.createConnection();
        // 创建会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的 - Queue 名称为 "TEST"
        Destination destination = session.createQueue("TEST");
        // 创建消息生产者
        MessageProducer producer = session.createProducer(destination);
        // 创建消息 - 文本消息
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("Hello,World");
        // 发送文本消息
        producer.send(message);

        // 关闭消息生产者
        producer.close();
        // 关闭会话
        session.close();
        // 关闭连接
        connection.close();
    }

4.原生API:消费消息

 private static void receiveMessage() throws Exception {

        // 创建 ActiveMQ 链接,设置 Broker URL
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创造 JMS 链接
        Connection connection = connectionFactory.createConnection();
        // 启动连接
        connection.start();
        // 创建会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的 - Queue 名称为 "TEST"
        Destination destination = session.createQueue("TEST");
        // 创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        // 获取消息
        Message message = messageConsumer.receive(100);

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("消息消费内容:" + textMessage.getText());
        }

        // 关闭消息消费者
        messageConsumer.close();
        // 关闭会话
        session.close();
        // 关闭连接
        connection.stop();
        connection.close();
    }

Spring Boot JMS+ActiveMQ

1.Maven依赖

<!-- 整合 Sprig Boot Starter ActiveMQ -->
        <!-- 间接依赖:
            spring jms
            jms api
            activemq
            spring boot jms
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

2.配置ActiveMQ属性
application.properties

## ActiveMQ 配置
spring.activemq.brokerUrl = tcp://localhost:61616

3.配置JMS属性
application.properties

## JMS 配置
spring.jms.template.defaultDestination = sf-users-activemq

4.改造user-service-client:实现ActiveMQ User对象消息生产
UserServiceClientController.java

@Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping("/user/save/message/activemq")
    public boolean saveUserByActiveMQMessage(@RequestBody User user) throws Exception {
        jmsTemplate.convertAndSend(user);
        return true;
    }

5.启动user-service-client
预先启动“eureka-server”以及“config-server”
6.改造user-service-provider:实现ActiveMQ User对象信息消费

  @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping("/user/poll")
    public Object pollUser() {
        // 获取消息队列中,默认 destination = sf-users-activemq
        return jmsTemplate.receiveAndConvert();
    }

ActiveMQ Spring Cloud Stream Binder实现

创建spring-cloud-stream-binder-activemq 工程
引入Maven
实现Binder接口-仅实现消息发送

package com.segumentfault.spring.cloud.stream.binder.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

/**
 * Active MQ MessageChannel Binder 实现
 *
 * @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
 * @since 0.0.1
 */
public class ActiveMQMessageChannelBinder implements
        Binder<MessageChannel, ConsumerProperties, ProducerProperties> {

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * 接受 ActiveMQ 消息
     *
     * @param name
     * @param group
     * @param inboundBindTarget
     * @param consumerProperties
     * @return
     */
    @Override
    public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inboundBindTarget, ConsumerProperties consumerProperties) {
        // TODO: 实现消息消费
        return () -> {
        };
    }

    /**
     * 负责发送消息到 ActiveMQ
     *
     * @param name
     * @param outputChannel
     * @param producerProperties
     * @return
     */
    @Override
    public Binding<MessageChannel> bindProducer(String name, MessageChannel outputChannel, ProducerProperties producerProperties) {
        Assert.isInstanceOf(SubscribableChannel.class, outputChannel,
                "Binding is supported only for SubscribableChannel instances");

        SubscribableChannel subscribableChannel = (SubscribableChannel) outputChannel;

        subscribableChannel.subscribe(message -> {
            // 接受内部管道消息,来自于 MessageChannel#send(Message)
            // 实际并没有发送消息,而是此消息将要发送到 ActiveMQ Broker
            Object messageBody = message.getPayload();
            jmsTemplate.convertAndSend(name, messageBody);

        });

        return () -> {
            System.out.println("Unbinding");
        };
    }
}

实现 Spring Cloud Stream Binder 自动装配
package com.segumentfault.spring.cloud.stream.binder.activemq;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * ActiveMQ Stream Binder 自动装配
 *
 */
@Configuration
@ConditionalOnMissingBean(Binder.class)
public class ActiveMQStreamBinderAutoConfiguration {

    @Bean
    public ActiveMQMessageChannelBinder activeMQMessageChannelBinder() {
        return new ActiveMQMessageChannelBinder();
    }
}

2.配置META-INF/spring.binders

activemq :\
com.segumentfault.spring.cloud.stream.binder.activemq.ActiveMQStreamBinderAutoConfiguration

3.整合消息生产者user-service-client
引入 ActiveMQ Spring Cloud Stream Binder Maven 依赖

 <!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
        <dependency>
            <groupId>com.segumentfault</groupId>
            <artifactId>spring-cloud-stream-binder-activemq</artifactId>
            <version>${project.version}</version>
        </dependency>

4.配置ActiveMQ Spring Cloud Stream Binder 属性

## Spring Cloud Stream 默认 Binder
spring.cloud.stream.defaultBinder=rabbit

### 消息管道 activemq-out 配置
spring.cloud.stream.bindings.activemq-out.binder = activemq
spring.cloud.stream.bindings.activemq-out.destination = sf-users-activemq

5.实现Binder接口 - 实现消息消费

 @Override
    public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties consumerProperties) {

        ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
        try {
            // 创造 JMS 链接
            Connection connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话 Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建消息目的
            Destination destination = session.createQueue(name);
            // 创建消息消费者
            MessageConsumer messageConsumer = session.createConsumer(destination);

            messageConsumer.setMessageListener(message -> {
                // message 来自于 ActiveMQ
                if (message instanceof ObjectMessage) {
                    ObjectMessage objectMessage = (ObjectMessage) message;
                    try {
                        Object object = objectMessage.getObject();
                        inputChannel.send(new GenericMessage<Object>(object));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }

        return () -> {
        };
    }

6.整合消息消费者- user-service-provider
引入 ActiveMQ Spring Cloud Stream Binder Maven 依赖

<!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
        <dependency>
            <groupId>com.segumentfault</groupId>
            <artifactId>spring-cloud-stream-binder-activemq</artifactId>
            <version>${project.version}</version>
        </dependency>

7.配置ActiveMQ Spring Cloud Stream Binder 属性

## Spring Cloud Stream 默认 Binder
spring.cloud.stream.defaultBinder=rabbit

### 消息管道 activemq-out 配置
spring.cloud.stream.bindings.activemq-in.binder = activemq
spring.cloud.stream.bindings.activemq-in.destination = sf-users-activemq

8.实现User消息监听

 @StreamListener("activemq-in")
    public void onUserMessage(User user) throws IOException {
        System.out.println("Subscribe by @StreamListener");
        userService.saveUser(user);
    }

    // 监听 ActiveMQ Stream
    userMessage.activeMQIn().subscribe(message -> {

        if (message instanceof GenericMessage) {
            GenericMessage genericMessage = (GenericMessage) message;
            User user = (User) genericMessage.getPayload();
            userService.saveUser(user);
        }
    });
目录
相关文章
|
1月前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
46 0
|
30天前
|
负载均衡 Java API
Spring Cloud 面试题及答案整理,最新面试题
Spring Cloud 面试题及答案整理,最新面试题
132 1
|
30天前
|
Java Nacos Sentinel
Spring Cloud Alibaba 面试题及答案整理,最新面试题
Spring Cloud Alibaba 面试题及答案整理,最新面试题
138 0
|
1月前
|
SpringCloudAlibaba Java 持续交付
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
131 0
|
1月前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
81 0
|
1月前
|
消息中间件 JSON Java
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
404 0
|
2天前
|
负载均衡 Java 开发者
细解微服务架构实践:如何使用Spring Cloud进行Java微服务治理
【4月更文挑战第17天】Spring Cloud是Java微服务治理的首选框架,整合了Eureka(服务发现)、Ribbon(客户端负载均衡)、Hystrix(熔断器)、Zuul(API网关)和Config Server(配置中心)。通过Eureka实现服务注册与发现,Ribbon提供负载均衡,Hystrix实现熔断保护,Zuul作为API网关,Config Server集中管理配置。理解并运用Spring Cloud进行微服务治理是现代Java开发者的关键技能。
|
3天前
|
Java API 对象存储
对象存储OSS产品常见问题之使用Spring Cloud Alibaba情况下文档添加水印如何解决
对象存储OSS是基于互联网的数据存储服务模式,让用户可以安全、可靠地存储大量非结构化数据,如图片、音频、视频、文档等任意类型文件,并通过简单的基于HTTP/HTTPS协议的RESTful API接口进行访问和管理。本帖梳理了用户在实际使用中可能遇到的各种常见问题,涵盖了基础操作、性能优化、安全设置、费用管理、数据备份与恢复、跨区域同步、API接口调用等多个方面。
22 2
|
17天前
|
负载均衡 网络协议 Java
构建高效可扩展的微服务架构:利用Spring Cloud实现服务发现与负载均衡
本文将探讨如何利用Spring Cloud技术实现微服务架构中的服务发现与负载均衡,通过注册中心来管理服务的注册与发现,并通过负载均衡策略实现请求的分发,从而构建高效可扩展的微服务系统。
|
18天前
|
开发框架 负载均衡 Java
Spring boot与Spring cloud之间的关系
总之,Spring Boot和Spring Cloud之间的关系是一种构建和扩展的关系,Spring Boot提供了基础,而Spring Cloud在此基础上提供了分布式系统和微服务架构所需的扩展和工具。
16 4
Spring boot与Spring cloud之间的关系