企業集成模式:Spring Integration 和 Apache Camel
前段時間寫了一個功能,MQTT 消息轉發給 gRPC 服務端,
換成 Apache Camel 實現,試通了感覺 Camel 挺不錯。想著以前寫的 Spring 集成 MQTT 用的 Spring Integration,正好對比一下。
閱讀 Spring Integration 文檔,說靈感來源于《企業集成模式》這本書。
搜索發現 Spring Integration 和 Apache Camel 都屬于 “集成框架”,還有一個 Mule。
嘗試搜集資料,總結對比一下。
《企業集成模式》
Enterprise Integration Patterns(EIP)
作者網站
https://www.enterpriseintegrationpatterns.com/patterns/messaging/
有內容介紹,不過還是看圖比較直觀

PDF【金山文檔】 企業集成模式
https://kdocs.cn/l/cfbM5BO6gyYj
書第10頁
書上的圖不清晰,重畫了一下
Spring Integration
官網
https://docs.spring.io/spring-integration/reference/index.html
Spring Integration 的簡單介紹,感覺了解有一些名詞即可。真正體會還是需要看圖和看代碼。
Spring Integration 是一個基于 Spring 框架的企業集成解決方案,旨在簡化不同應用程序之間的消息傳遞、通信和系統集成。
它提供了一種聲明性的方式來定義和組織消息驅動的應用程序,通過一系列的消息通道、消息處理器、轉換器、過濾器等組件來構建集成流程。其核心思想是將應用程序拆分成可重用的消息處理模塊,這些模塊通過消息通道進行通信。
Spring Integration 支持多種消息傳遞方式,包括基于消息隊列、文件、HTTP、WebSocket 等,同時也提供了與外部系統集成的能力。
通過 Spring Integration,開發者可以輕松地構建復雜的集成流程,處理不同來源和形式的消息,并能夠在這些消息之間進行路由、轉換、過濾、聚合等操作,以滿足不同場景下的集成需求。
整合了 Spring 生態的優勢,讓開發者能夠更加方便地在 Spring 應用中實現消息驅動的集成解決方案。
Spring 集成框架概述
這段話來自 Spring 集成框架概述
作為 Spring 編程模型的延伸,Spring Integration 提供了各種配置選項,包括注解、帶有命名空間支持的 XML、帶有通用“bean”元素的 XML 以及直接使用底層 API。
該 API 基于明確定義的策略接口和非侵入式的委托適配器。
Spring Integration 的設計靈感來自于 Spring 中常見模式與 Gregor Hohpe 和 Bobby Woolf 在《企業集成模式》(Addison Wesley,2004)一書中描述的知名模式之間的強烈關聯。
已經閱讀過該書的開發人員應該能夠立即理解 Spring Integration 的概念和術語。
消息

消息通道

消息路由器

服務激活器

一個入站通道適配器端點,連接一個源系統到一個消息通道

一個出站通道適配器端點,連接一個消息通道到一個目標系統

集成 MQTT
簡單的示例,介紹一下用法
Java DSL(Domain Specific Language,領域特定語言)是 Spring Integration 提供的一種編程方式,用于以流暢的方式配置消息通道、消息處理器、消息端點等集成組件。
它提供了一種直觀、流暢的方法來定義整個集成流程,使得代碼更易讀、更易維護。使用 Java DSL 配置 Spring Integration 可以讓你通過編寫代碼來定義整個消息通道、消息處理器和消息端點之間的流程。
這種方式相比于 XML 配置更加靈活,并且可以利用 Java 的編程能力來完成集成流程的定義。
pom.xml 里的依賴
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- Spring Integration Stream -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
使用 IntegrationFlow
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
@Configuration
public class MqttIntegrationConfig {
@Bean
public IntegrationFlow mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
"tcp://localhost:1883",
"My-ClientID-ioufev",
"topic1", "topic2");
adapter.setBeanName("myIntegrationFlowMqttPahoAdapter");
return IntegrationFlows.from(adapter)
.handle(message -> {
// 處理收到的 MQTT 消息
String payload = message.getPayload().toString();
System.out.println("Received message: " + payload);
// 在這里添加你的處理邏輯
})
.get();
}
}
說明
1、MQTT 底層實現肯定是 Eclipse Paho,Spring Integration 只是封裝了一層方便調用。
2、使用 Java DSL 配置的寫法確實比較直觀
Apache Camel
官網
https://camel.apache.org/
https://github.com/apache/camel/
https://zh.wikipedia.org/zh-cn/Apache_Camel
Camel is an Open Source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data.
Camel 是一個開源集成框架,使您能夠快速輕松地集成各種消費或生產數據的系統。Apache Camel 是一個基于規則路由和中介引擎,提供企業集成模式的 Java 對象(POJO)的實現,通過應用程序接口(或稱為陳述式的 Java 領域特定語言(DSL))來配置路由和中介的規則。
領域特定語言意味著 Apache Camel 支持你在的集成開發工具中使用平常的,類型安全的,可自動補全的 Java 代碼來編寫路由規則,而不需要大量的 XML 配置文件。
同時,也支持在 Spring 中使用 XML 配置定義路由和中介規則。
文章
復制文件的例子,很好的切入點,路由的功能很好的描述了
這個入門介紹簡潔直接有條理。
概念
以下是 Apache Camel 的一些概念:
- 路由(Route)- Camel 中的核心概念,它描述了如何將消息從一個端點(endpoint)傳輸到另一個端點,并應用一系列的轉換和處理。
- 組件(Component)- Camel 中的組件是一組用于連接到不同數據源或目標的預定義端點。例如,JMS 組件用于連接到 JMS 隊列或主題,HTTP 組件用于連接到 HTTP 服務器等。
- 處理器(Processor)- Camel 中的處理器用于對消息進行轉換和處理。例如,過濾器、轉換器、聚合器等等。
- 謂詞(Predicate)- Camel 中的謂詞是一個用于測試消息是否滿足某些條件的函數。例如,判斷消息是否為空,或者是否滿足某些標準。
- 消息(Message)- Camel 中的消息是路由中的基本單元。消息通常包含一些元數據和有效負載數據。
- Exchange - Exchange 是在 Camel 路由中傳遞的消息容器。Exchange 包含消息、消息頭和其他元數據。它還提供了一個機制,用于在處理器之間傳遞附加信息。
- 聚合器(Aggregator)- Camel 中的聚合器用于合并一組相關的消息。例如,將一組相關的訂單合并為單個訂單。
- 路由策略(Routing Policy)- Camel 中的路由策略用于控制路由中消息的流動。例如,從一組路由中選擇一個,或者使用負載均衡策略。
- 觸發器(Trigger)- Camel 中的觸發器用于觸發路由中的操作。例如,基于時間的觸發器可用于定期執行某些任務。
- 轉換器(Type Converter)- Camel 中的轉換器用于將一種類型的消息轉換為另一種類型。例如,將字符串轉換為數字。
Camel 架構圖

圖片來源
在高層次上看 Camel 的架構非常簡單。
CamelContext 表示 Camel 運行時系統,它連接不同的概念,如路由、組件或端點。
在此之下,處理器處理端點之間的路由和交換,而端點則集成不同的系統。
- CamelContext:它是 Camel 運行時的核心部分,負責管理和運行路由。在圖中,它連接了不同的路由(如 Route 1、Route 2 等),并通過 DSL(領域特定語言)定義路由的行為。
- 路由 (Routes):路由定義了消息從一個端點流向另一個端點的路徑,并且可以包含不同的處理器來操作消息。在圖中,路由通過 DSL 描述,例如
.from()、.filter()、.xpath()和.to(),用于定義從哪里接收消息、如何處理消息以及將消息發送到哪里。 - 處理器 (Processors):處理器用于在端點之間對消息進行處理。圖中顯示了兩個處理器示例:Message filter processor(消息過濾處理器)和 Content-based router processor(基于內容的路由處理器)。它們分別用于根據條件過濾消息或根據消息內容進行路由。
- 端點 (Endpoints):端點代表消息的源或目的地,可能是文件系統、JMS(Java 消息服務)、HTTP 等。它們通過組件來抽象實現,組件為各種技術提供了統一的接口,便于與不同系統集成。
- 組件 (Components):組件負責創建端點,用于處理不同的協議或技術。在圖中,File、JMS、HTTP 都是組件的例子。
集成 MQTT
簡單的示例,介紹一下用法
Apache Camel 是一個開源的集成框架,它提供了多種 DSL(領域特定語言)來簡化集成開發。
Camel 提供了多種 DSL,其中包括 Java DSL、Spring DSL、Blueprint DSL 等,用于配置和定義路由規則、消息轉換、數據路由等。Camel 的 DSL 是為了簡化和表達集成模式而設計的,讓開發者能夠以一種更加直觀和易于理解的方式來定義路由和處理數據。
它支持不同級別的抽象,允許使用不同的語法風格來表達路由規則。
pom.xml 里的依賴
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>3.20.6</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-paho-starter</artifactId>
<version>3.20.6</version>
</dependency>
配置文件 application.yml
camel:
springboot:
main-run-controller: true
component:
direct:
enabled: true
paho:
broker-url: tcp://127.0.0.1
qos: 0
user-name:
password:
示例代碼
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class MqttToFileRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// 創建一個從 MQTT 主題接收消息的路由
// paho 指的是 Paho MQTT 組件,要在配置文件中配置相關參數
from("paho:topic1")
.log("Received message: ${body}")
.process(exchange -> {
// 處理消息
});
// 轉發主題到另一個主題
from("paho:topic2")
.to("paho:topic3");
// 將收到的消息寫入到文本文件
from("paho:topic4")
.convertBodyTo(String.class)
.to("file:E:\\Data\\Camel?fileName=mqtt_messages.txt");
}
}
說明
1、MQTT 底層實現肯定是 Eclipse Paho,Apache Camel 只是封裝了一層方便調用。
2、路由配置簡單直接,from 和 to 方法簡單直接到你覺得本來就應該是這樣子。
Camel 組件
Camel 組件,真是非常多,我看到的感興趣的有
- 實現 MQTT 的 Paho
- 實現 OPC UA 的 OPC UA CLIENT,當然底層驅動是 Eclipse milo
- KAFKA
- gRPC
- PLC4X,PLC4X 是 Apache 的開源下項目,號稱“通用協議適配器”,感覺和 OPC 的面向對象映射的思路是一致的,想通過增加一層統一的接口或者模型,來調用不同的工業協議驅動來連接 PLC
- COAP
組織方式差異
Spring Integration:以消息通道為核心
- Spring Integration 的核心抽象是消息通道(Message Channels)。開發者需要顯式地定義這些通道,用于在系統中發送和接收消息。
- 消息通道連接了各種端點(比如過濾器、路由器等),通過這些通道,消息得以在系統中流動和處理。
- 這種設計的好處在于,它提供了對消息流的細粒度控制,但同時也要求開發者對通道的概念和實現有較深入的理解。
Apache Camel:以路由為核心
- Apache Camel 的核心概念是路由(Routes)。它使用內部的交換(Exchanges)來處理消息的流動和邏輯,開發者通過定義路由來指定消息的路徑。
- 在這種方式下,消息通道的復雜性被隱藏了起來,開發者無需直接操作通道,而是專注于路由的定義和業務邏輯的實現。
- 這使得 Camel 在快速開發和處理復雜集成場景時更加簡便。
Spring 官方文檔的說明
根據 Spring Integration Reference: Camel 的內容:
- 關于 Spring Integration:
- 文檔中寫道:“Spring Integration fully relies on a dependency injection container from Spring Core... It also uses the MessageChannel abstraction as a first class citizen of which developers need to be aware of, when composing their integration flows.”
- 翻譯過來就是:“Spring Integration 完全依賴 Spring Core 的依賴注入容器……它將消息通道作為首要抽象,開發者在構建集成流程時需要明確了解這一點。”
- 關于 Apache Camel:
- 文檔中寫道:“Apache Camel, on the other hand, does not provide a first class citizen abstraction of a message channel and proposes to compose its routes via internal exchanges, hidden from the API.”
- 翻譯過來就是:“另一方面,Apache Camel 并未將消息通道作為首要抽象,而是通過內部的交換機制來組成其路由,這些細節對 API 用戶是隱藏的。”
這些描述直接表明了觀點:Spring Integration 強調消息通道,而 Apache Camel 強調路由。
組織方式驅動的實現思路
Spring Integration 的組織方式與實現思路
Spring Integration 的核心組織方式是消息通道(Message Channels),它的實現思路是通過通道解耦消息的生產者和消費者,實現異步、靈活的消息傳遞和處理。
為什么要有消息通道(Message Channels)?
- 作用:消息通道是消息傳輸的媒介,負責將消息從生產者傳遞到消費者。
- 原因:通過通道,生產者和消費者無需直接依賴對方,實現了邏輯上的解耦。這種設計支持異步處理,生產者可以將消息發送到通道后立即返回,而消費者可以在合適的時間從通道獲取消息。
- 類型:支持多種通道類型,例如點對點(
DirectChannel)和發布-訂閱(PublishSubscribeChannel),滿足不同場景需求。
為什么要有入站適配器(Inbound Adapters)?
- 作用:入站適配器從外部系統(如 JMS 隊列、文件、HTTP 請求)接收數據,轉換為 Spring Integration 的消息格式,并發送到消息通道。
- 原因:外部系統的數據格式和協議各異,入站適配器作為橋梁,將這些數據適配到框架內部,保持與外部系統的解耦。
為什么要有出站適配器(Outbound Adapters)?
- 作用:出站適配器將 Spring Integration 的消息發送到外部系統(如數據庫、文件、郵件服務器)。
- 原因:與入站適配器類似,出站適配器負責將內部消息適配到外部系統,確保消息能夠正確輸出,同時保持解耦。
為什么要有服務激活器(@ServiceActivator 注解)?
- 作用:
@ServiceActivator標記一個方法,使其成為消息處理的服務激活器,當消息到達指定通道時自動調用該方法。 - 原因:通過注解驅動的方式,開發者無需手動配置消息處理邏輯,簡化開發流程,提高效率,同時保持代碼的可讀性。
消息流動特性與流程
- 流程:
- 消息通過入站適配器從外部系統進入某個消息通道。
- 消息在通道中流動,可能經過轉換器、過濾器等組件處理。
- 消息到達指定通道后,觸發服務激活器處理,或通過出站適配器發送到外部系統。
- 特性:
- 解耦:生產者和消費者通過通道隔離。
- 異步:消息傳遞無需同步等待。
- 可控:支持多種通道和組件,開發者可以精確控制消息流。
Apache Camel 的組織方式與實現思路
Apache Camel 的核心組織方式是路由(Routes),它的實現思路是通過簡潔的路由定義來管理消息的流動路徑和處理邏輯。
為什么要有路由(Routes)?
- 作用:路由定義了消息的流動路徑,從來源(
from)到目的地(to),并指定中間的處理步驟。 - 原因:路由提供了一種直觀的方式來描述消息流,開發者通過 DSL(領域特定語言)就能快速定義復雜的集成邏輯,簡化配置。
為什么要有端點(Endpoints)?
- 作用:端點是消息的來源或目的地,支持多種技術(如文件、JMS、HTTP),通過 URI 配置。
- 原因:端點抽象了外部系統的接入方式,使得 Camel 能夠無縫集成不同技術,同時保持與外部系統的解耦。
為什么要有處理器(Processors)?
- 作用:處理器在路由中對消息進行轉換、過濾或自定義處理。
- 原因:消息在流動中往往需要加工,處理器提供了靈活的擴展點,開發者可以插入自定義邏輯,滿足復雜需求。
為什么要有過濾器(Filters)?
- 作用:過濾器根據條件決定消息是否繼續在路由中流動。
- 原因:通過條件判斷,過濾器支持動態路由,增強了消息處理的靈活性,例如只處理符合條件的消息。
消息流動特性與流程
- 流程:
- 消息從端點(
from)進入路由。 - 消息在路由中經過一系列處理器和過濾器處理。
- 最終,消息通過端點(
to)發送到外部系統,或繼續流向其他路由。
- 消息從端點(
- 特性:
- 簡潔:路由 DSL 易于理解和維護。
- 靈活:支持多種組件和自定義處理器。
- 動態:過濾器和條件路由增強了適應性。
對比分析
組織方式的差異
- Spring Integration:
- 以消息通道為核心,組件(如適配器)和注解(如
@ServiceActivator)圍繞通道設計,強調消息的傳遞和解耦。
- 以消息通道為核心,組件(如適配器)和注解(如
- Apache Camel:
- 以路由為核心,組件(如端點、處理器)圍繞路由設計,強調消息流動路徑的定義和簡潔性。
實現思路的差異
- Spring Integration:
- 通過通道解耦生產者和消費者,借助適配器和注解實現與外部系統的集成和消息處理,適合需要精確控制的場景。
- Apache Camel:
- 通過路由定義消息流,借助端點和處理器支持多種技術和靈活邏輯,適合快速開發和復雜集成。
消息流動特性的差異
- Spring Integration:
- 消息在通道中流動,流程為:入站適配器 → 通道 → 服務激活器/出站適配器,強調解耦和異步。
- Apache Camel:
- 消息在路由中流動,流程為:端點(from) → 處理器/過濾器 → 端點(to),強調簡潔和動態性。
總結
- Spring Integration 通過消息通道實現了解耦和異步,組件和注解支持精確的消息流控制,適合需要細粒度管理的場景。
- Apache Camel 通過路由實現簡潔和靈活,端點和處理器支持快速集成和動態處理,適合復雜而快速開發的場景。
通過以上分析,我們可以清晰理解兩者的組織方式如何驅動其實現思路,以及它們在消息流動中的不同流程和特性。
| 方面 | Spring Integration | Apache Camel |
|---|---|---|
| 組織方式 | 圍繞通道(channels),組件通過通道連接 | 圍繞路由(routes),線性定義消息流 |
| 配置方式 | XML、Java DSL、Groovy DSL、Kotlin DSL | Java DSL、XML DSL、YAML DSL |
| EIP 實現 | 強調通道、適配器、橋接 | 強調路由、端點、處理器 |
| 消息流清晰度 | XML 配置可能分散,Java DSL 更直觀 | 路由定義直觀,易于追蹤復雜流程 |
| 易用性 | 適合 Spring 項目,配置可能較繁瑣 | 靈活,DSL 易用,尤其在復雜場景 |
| 典型用例 | 基本集成(如 File、JMS),Spring 生態擴展 | 復雜路由、多種協議支持,跨框架使用 |

浙公網安備 33010602011771號