本教程将指导您如何利用 MuleSoft 平台订阅 Salesforce 的 Change Data Capture (CDC) 事件,并将数据实时同步到外部 PostgreSQL 数据库。通过这个实践示例,您将学习到完整的数据集成流程。
概览
我们将完成以下核心任务:
- 在 Salesforce 中配置联系人对象的 CDC 事件发布
- 使用 MuleSoft Salesforce Connector 配置 Replay Channel Listener 以订阅这些变更事件
- 在 Anypoint Studio 中处理 CDC 事件数据
- 将数据同步写入 PostgreSQL 数据库
环境准备
在开始之前,请确保您已准备好以下环境:
- Anypoint Studio: MuleSoft 的官方开发环境 安装链接
- Salesforce Developer Edition: 免费的 Salesforce 开发环境 注册链接
- PostgreSQL 数据库: 一个可访问的 PostgreSQL 实例
详细步骤
Salesforce CDC 配置
- 使用开发者账号登录您的 Salesforce 开发者组织。
- 导航至设置页面:
- 进入
Setup
->Home
- 在搜索框中输入 Change Data Capture
- 进入详情页面选择
联系人
对象,此处可以看到所有可发布 CDC 事件的对象列表
- 进入
选择并保存后,您就完成了联系人对象 CDC 事件的配置。当联系人记录发生变更时,系统将自动向事件总线发布相应的事件。
MuleSoft 项目配置
启动 Anypoint Studio
, 点击 file
, 然后选择 New
> Mule Project
. 项目名字可以叫 salesforce-cdc
然后点击 Finish
.
创建一个 global.xml
文件来存储你的 connector 配置。右键点击 src/main/resources 文件夹。选择 New
> File
.
文件名为 global.yaml
然后 点击 Finish
.
在 global.yaml 文件中,通过复制和粘贴以下内容,创建以下属性。
1
2
3
4
5
6
7
8
9
10
salesforce:
username: "salesforce@example.com"
password: "xxx"
token: "xxx"
postgres:
username: "peter.dong"
password: "xxx"
database: "xxx"
url: "jdbc:postgresql://xxx.compute-1.amazonaws.com:5432/database"
在 Anypoint Studio 右边的 Mule Palette
, 添加 Salesforce
和 Database
组件。
然后点击 Salesforce
, 将 Replay channel listener
拖拽到左边的 Flow 里。
点击 Global Elements
配置相关全局的变量。
点击 Create
.
输入框搜索 Prop
, 点击 OK
.
在弹出的界面里,输入 global.yaml
, 这么做的目的是让 Mule Application 知道配置的 properties 在 global.yaml 文件里。
接下来配置 Salesforce Configuration
, 重复上面步骤,点击 Create
, 搜索 salesforce 关键字,点击 OK
.
根据下图所示内容填写对应的参数,Authorization URL 可以留空,因为我们是 Salesforce 开发者账号,配置默认为 login.salesforce.com.
点击 Test Connection
, 测试账号是否可以正常连接 Salesforce, 连接正常,点击 OK
.
接下来配置 Database Configuration
, 重复上面步骤,点击 Create
, 搜索 database 关键字,点击 OK
.
Connection
选择 Generic Connection
.
JDBC Driver
选择 Add Maven dependency
, 搜索框输入 postgresql
然后在 Connection 里,填入对应的 Database 参数,点击 Test Connection
, 验证数据库是否可以连接成功。
配置 message flow
, 选中 Replay channel listener
, 按照如图所示选择相应参数,本次示例只在联系人创建的时候触发,所以 Replay Option 选择的是 ONLY_NEW
将 Logger
组件拖出,目的用于记录 Inboud payload.
设置变量,类似 Logger
组件,将 Set Variable
拖出放在 Logger
组件的后面
重复以上步骤,设置三个变量: 第一个变量可以设置如下,目的是在 Inbound change 事件中 sfdc_id 的值与 Salesforce 的 record id 对应。
1
2
3
Display Name: Set Salesforce Record Id
Name: sfdc_id
Value: payload.data.payload.ChangeEventHeader.recordIds[0]
第二个变量设置如图所示:
1
2
3
Display Name: Set Last Name
Name: last_name
Value: payload.data.payload.Name.LastName
第三个变量设置如图所示:
1
2
3
Display Name: Set Change Type
Name: change_type
Value: payload.data.payload.ChangeEventHeader.changeType
将 Choice
拖出,目的是判断当前的 CDC 事件是否为 CREATE .
设置判断条件:vars.change_type == 'CREATE'
将 Database
的 Insert
操作拖出放入第一个选项框中。
设置 sql 语句,和匹配的参数,插入数据库。
启动 Mule Application:
在 Salesforce 中创建一条 Contact 记录,Lasrt Name 为:MuleSoft CDC
我们在 MuleSoft Application 里可以看到 CDC 事件已经触发,并且 payload 日志已经打印出来 :
再来看下数据库更新情况,有一条新的记录产生,sfdc_id 为刚才在 Salesforce 创建的联系人记录。
总结
通过本教程,您已经成功实现了以下功能:
- Salesforce 联系人数据变更的实时捕获
- 使用 MuleSoft 进行数据处理和转换
- 将数据实时同步到 PostgreSQL 数据库
这个集成方案为实现系统间的实时数据同步提供了可靠的基础。您可以基于此框架扩展更多的集成场景,如添加其他对象的 CDC 监听、增加更复杂的数据转换逻辑等。