消息总线扩展之面向消息的数据集成

简介: 最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向。这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向。 相关技术简介 XML 谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了。

最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向。这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向。

相关技术简介

XML

谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了。其实,XML的用途远不止充当配置文件这一方面。它还被广泛应用于异构系统集成、数据集成、语义/协议转换等等方面,甚至成为构建平台非常重要的基石。虽然XML一直以来被人诟病其解析效率低下以及数据量太冗余而不够精简。但目前除了它没有一种集平台无关性与良好的可读性以及严谨的约束规范于一体的标准,所以,如果你曾经阅读过一些框架的源码的话,你就会深刻了解XML的重要性(如果你了解Apache Ofbiz的话,会看到他在其框架体系中大量使用了XML)。

XSLT

XSLT(XSL转换)即扩展样式表语言转换。它是XSL(扩展样式表语言)的子语言,也是XSL中最重要的部分,它使用XPath在XML进行导航。主要用于将XML转换为XHTML以及其他形式的XML。一个通俗的比方就是:CSS基于HTML就如同XSL基于XML。W3C在制定XML之初就已经为其格式转换制定了标准,XSLT包括了编程常用的实现方式:条件分支、循环遍历,除此之外还包括模板封装,函数封装,甚至可以执行特定语言实现的函数,可谓非常强大。

通过 XSLT,您可以向或者从输出文件添加或移除元素和属性。您也可重新排列元素,执行测试并决定隐藏或显示哪个元素,等等。描述转化过程的一种通常的说法是,XSLT 把 XML 源树转换为 XML 结果树。

JAXP

Java API for XML Processing(Java 处理XML的API),提供了一些访问以及处理XML相关的技术实现,包括但不仅限于SAX、DOM以及XSLT转换等。通过JAXP提供的API可以将某个源XML格式基于XSLT转换成目标格式。

JAXB

JAXB (Java Architecture for XML Binding) 是一个业界的标准,即是一项可以根据XML Schema产生Java类的技术。该过程中,JAXB也提供了将XML实例文档反向生成Java对象树的方法,并能将Java对象树的内容重新写到XML实例文档。有多种实现。

通过介绍上面两个技术,可以知道XML到XML的数据转换是可行的,但XML通常都不是每个业务系统最终存储或表示数据的方式(现在最常用的数据存储方式是数据库)。我们当然也可以在XML跟数据库中的数据之间进行转换,但如果能够用各个平台的业务对象来作为中间的衔接,就可以在XML跟各异的数据库之间进行解耦。比如就拿Java而言,常用的做法是:database<->JavaBean(POJO)<->XML。Database跟Java bean之间交给JDBC,而Java bean跟XML之间可以交给JAXB或某种基于Java对象跟XML之间进行序列化或反序列化的库。因此就Java平台而言,我们只需要通过JAXB实现XML与Java Bean之间的转换就可以了。当然对于其他平台(比如dot net),其思路也是一样的,并且XML在各个平台上都存在非常成熟的序列化与反序列化库。

基于消息总线实现数据集成

《企业集成模式:设计、构建及部署消息传递解决方案》一书的消息转换章节(3.6节)对以上基于XSLT的消息转换做了介绍,然后在Oracle的服务总线上看到它有提供这一功能,并且所用技术也是XSLT,另外Mule ESB也支持XSLT来对XML格式的数据进行格式转换。

服务总线可以进行数据转换消息总线是否可行?其实,这主要取决于你从什么角度来看待它们。服务总线基于SOAP以及HTTP协议提供服务的访问。而消息总线提供面向消息的通信机制,从技术层面上来讲其实都是提供通信机制。因此,从技术角度上来讲是没有问题的。下面我们来探讨一下通过消息总线如何进行数据格式转换。

技术原理

首先,我们认为每个队列只处理一种消息格式是一种比较好的实践,并且我们假设通信双方的数据表示格式都是XML。然后我们针对每个队列为其处理的消息(XML数据)定义一个Schema,这样用于集成的两个队列就会存在两个Schema(消息发送方跟消息接收方)。在跟目标队列通信之前,消息发送方根据目标队列的schema,编写一个适配它的XSLT文件,这个XSLT你可以简单地将其看做是针对消息接收方的适配器,它会跟当前通信链路建立关联。下面我们来看一下这种需求场景下消息通信的大致流程:

消息发送方无需针对目标队列识别的消息格式进行任何处理,它只需发送自己格式的消息。然后pubsuber服务器会获取到当前队列定义的schema,对消息进行验证,如果验证通过,根据发送时指定的链路token获取到当前队列针对目标队列的XSLT定义,消息总线客户端在内部对发送消息基于XSLT进行转换,即可将待发送的消息在发送端转换成目标队列可识别的消息,然后发送出去。在接收端,目标队列消费者消费的消息就都是适配过的可识别消息,当然消息总线内部会对接受到的消息用其预先定义的schema进行验证。

流程图大致如下:


消息总线在这里所起到的作用主要是底层通信支持。这里对于队列的schema以及XSLT的获取得益于消息总线的一个配置&推送中心——pubsuber。消息总线的客户端在发送端拦截了消息,然后在发送之前根据XSLT对消息进行转换。

代码实现

由于篇幅限制,这里我们暂且假设某个要集成的数据已经由原始数据被转换成了XML格式表示的数据(这不是技术上的问题,每个语言平台上都存在各种在对象与XML之间进行序列化与反序列化的工具)。现在我们有一个发送端学生数据的Schema:

<?xml version="1.0" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="student">
        <xs:complexType>
            <xs:sequence>
                <xs:element name="studentId">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="15"/>
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="name">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="10"/>
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="sex">
                    <xs:simpleType>
                        <xs:restriction base="xs:unsignedShort">
                            <xs:maxLength value="1" />
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="birthday">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="20" />
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="major" type="xs:string">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="100"/>
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
            </xs:sequence>
        </xs:complexType>
    </xs:element>
</xs:schema>

我们需要与另一个系统进行数据集成,另一个系统的接受队列定义的处理数据的schema如下:

<?xml version="1.0" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="XS">
        <xs:complexType>
            <xs:sequence>
                <xs:element name="XSBH">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="15"/>
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="XSXM">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="10"/>
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="XSXB">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="10" />
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="CSNF">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="10" />
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="CSYF">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="10" />
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="CSRQ">
                    <xs:simpleType>
                        <xs:restriction base="xs:string">
                            <xs:maxLength value="10" />
                        </xs:restriction>
                    </xs:simpleType>
                </xs:element>
                <xs:element name="SFTZ" type="xs:unsignedShort">
                </xs:element>
            </xs:sequence>
        </xs:complexType>
    </xs:element>
</xs:schema>

很明显,它们之间如果不进行一些必要的数据转换是无法进行对接的。首先源数据的元素名称是基于英文定义的,而目标数据的元素名称基于所谓的国家标准(汉语拼音的首字母缩写)。另外源数据的出生日期用一个元素来表达,而目标数据将其年月日拆分成三个元素来表达。源数据中提供了“专业”元素,但目标数据中不需要该元素;而目标数据中需要“是否统招”元素,而源数据中没有该元素。因此我们可以针对另一个系统定义的Schema来编写一个XSLT对源数据进行适应性转换。XSLT如下:

<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
    <xsl:output method="xml" indent="yes"/>
    <xsl:template match="/">
        <XS>
            <XSBH>
                <xsl:value-of select="student/studentId"/>
            </XSBH>
            <XSXM>
                <xsl:value-of select="student/name"/>
            </XSXM>
            <XSXB>
                <xsl:choose>
                    <xsl:when test="sex < 1">女</xsl:when>
                    <xsl:otherwise>男</xsl:otherwise>
                </xsl:choose>
            </XSXB>
            <CSNF>
                <xsl:value-of select="substring(student/birthday, 1, 4)"/>
            </CSNF>
            <CSYF>
                <xsl:value-of select="substring(student/birthday, 6, 7)"/>
            </CSYF>
            <CSRQ>
                <xsl:value-of select="substring(student/birthday, 9, 10)"/>
            </CSRQ>
            <SFTZ>
                1
            </SFTZ>
        </XS>
    </xsl:template>
</xsl:stylesheet>

而从源数据通过XSLT转换为目标数据的格式的代码非常简单(真实的实现中,这应该在消息总线客户端内实现,此处只是展示实现代码):

public static void main(String[] args) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

        Source xmlSource = new StreamSource(classLoader.getResourceAsStream("producerXml.xml"));
        Source xsltSource = new StreamSource(classLoader.getResourceAsStream("pToc.xsl"));

        TransformerFactory transFact = TransformerFactory.newInstance();
        try {
            Transformer trans = transFact.newTransformer(xsltSource);
            trans.transform(xmlSource, new StreamResult(System.out));
        } catch (TransformerException e) {
            e.printStackTrace();
        }
    }

得益于XSLT充当适配器的角色,这种方式对通信双方代码的侵入性极低,几乎可以说事透明转换的。其实无论是否是基于消息总线,面向消息的数据集成都是可行的方案。而消息总线给这种集成带来了更多的便利:比如适合集成方与被集成方的pub/sub的通信模型、pubsuber作为Schema以及XSLT的元数据管理中心、受控的通信行为等。



原文发布时间为:2015-04-05


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

目录
相关文章
|
2月前
|
消息中间件 网络协议 Ubuntu
实现高效消息传递:使用RabbitMQ构建可复用的企业级消息系统
RabbitMQ是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
|
4月前
|
消息中间件 存储 缓存
分布式实时消息队列Kafka(三)生产分区规则
分布式实时消息队列Kafka(三)生产分区规则
37 0
分布式实时消息队列Kafka(三)生产分区规则
|
7月前
|
消息中间件 Serverless Kafka
基于 EventBridge 轻松搭建消息集成应用
基于 EventBridge 轻松搭建消息集成应用
22071 3
|
7月前
|
消息中间件 存储 Kafka
消息队列和应用工具产品体系-消息队列 Kafka 版的特征及基本使用
消息队列和应用工具产品体系-消息队列 Kafka 版的特征及基本使用
417 0
消息队列和应用工具产品体系-消息队列 Kafka 版的特征及基本使用
|
7月前
|
消息中间件 存储 中间件
消息组件选型分析
消息组件选型分析
|
消息中间件 存储 运维
RocketMQ 消息集成:多类型业务消息-普通消息
本篇将从业务集成场景的诉求开始,介绍 RocketMQ 作为业务消息集成方案的核心能力和优势,通过功能场景、应用案例以及最佳实践等角度介绍 RocketMQ 普通消息类型的使用。
220 0
RocketMQ  消息集成:多类型业务消息-普通消息
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
427 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 存储 Java
|
消息中间件 运维 自然语言处理
EventBridge消息路由|高效构建消息路由能力
企业数字化转型过程中,天然会遇到消息路由,异地多活,协议适配,消息备份等场景。本篇主要通过 EventBridge 消息路由的应用场景和应用实验介绍,帮助大家了解如何通过 EventBridge 的消息路由高效构建消息路由能力。
176 0
EventBridge消息路由|高效构建消息路由能力
|
消息中间件 存储 Prometheus
消息队列 RocketMQ 遇上可观测:业务核心链路可视化
本篇文章主要介绍 RocketMQ 的可观测性工具在线上生产环境的最佳实践。RocketMQ的可观测性能力领先业界同类产品,RocketMQ 的 Dashboard 和消息轨迹等功能为业务核心链路保驾护航,有效应对线上大规模生产使用过程中遇到的容量规划、消息收发问题排查以及自定义监控等场景。
294 0
消息队列 RocketMQ  遇上可观测:业务核心链路可视化