本教程将指导您如何利用 MuleSoft 平台订阅 Salesforce 的 Change Data Capture (CDC) 事件,并将数据实时同步到外部 PostgreSQL 数据库。通过这个实践示例,您将学习到完整的数据集成流程。
概览
我们将完成以下核心任务:
- 在 Salesforce 中配置联系人对象的 CDC 事件发布
- 使用 MuleSoft Salesforce Connector 配置 Replay Channel Listener 以订阅这些变更事件
- 在 Anypoint Studio 中处理 CDC 事件数据
- 将数据同步写入 PostgreSQL 数据库

环境准备
在开始之前,请确保您已准备好以下环境:
- Anypoint Studio: MuleSoft 的官方开发环境 安装链接
- Salesforce Developer Edition: 免费的 Salesforce 开发环境 注册链接
- PostgreSQL 数据库: 一个可访问的 PostgreSQL 实例
详细步骤
Salesforce CDC 配置
- 使用开发者账号登录您的 Salesforce 开发者组织。
- 导航至设置页面:
- 进入
Setup->Home - 在搜索框中输入 Change Data Capture
- 进入详情页面选择
联系人对象,此处可以看到所有可发布 CDC 事件的对象列表
- 进入

选择并保存后,您就完成了联系人对象 CDC 事件的配置。当联系人记录发生变更时,系统将自动向事件总线发布相应的事件。
MuleSoft 项目配置
启动 Anypoint Studio, 点击 file, 然后选择 New > Mule Project. 项目名字可以叫 salesforce-cdc 然后点击 Finish.

创建一个 global.xml 文件来存储你的 connector 配置。右键点击 src/main/resources 文件夹。选择 New > File.

文件名为 global.yaml 然后 点击 Finish.

在 global.yaml 文件中,通过复制和粘贴以下内容,创建以下属性。
1
2
3
4
5
6
7
8
9
10
salesforce:
username: "salesforce@example.com"
password: "xxx"
token: "xxx"
postgres:
username: "peter.dong"
password: "xxx"
database: "xxx"
url: "jdbc:postgresql://xxx.compute-1.amazonaws.com:5432/database"
在 Anypoint Studio 右边的 Mule Palette, 添加 Salesforce 和 Database 组件。

然后点击 Salesforce, 将 Replay channel listener 拖拽到左边的 Flow 里。

点击 Global Elements 配置相关全局的变量。

点击 Create .

输入框搜索 Prop, 点击 OK.

在弹出的界面里,输入 global.yaml, 这么做的目的是让 Mule Application 知道配置的 properties 在 global.yaml 文件里。

接下来配置 Salesforce Configuration, 重复上面步骤,点击 Create, 搜索 salesforce 关键字,点击 OK.

根据下图所示内容填写对应的参数,Authorization URL 可以留空,因为我们是 Salesforce 开发者账号,配置默认为 login.salesforce.com.

点击 Test Connection, 测试账号是否可以正常连接 Salesforce, 连接正常,点击 OK.

接下来配置 Database Configuration, 重复上面步骤,点击 Create, 搜索 database 关键字,点击 OK.

Connection 选择 Generic Connection.

JDBC Driver 选择 Add Maven dependency, 搜索框输入 postgresql

然后在 Connection 里,填入对应的 Database 参数,点击 Test Connection, 验证数据库是否可以连接成功。

配置 message flow, 选中 Replay channel listener, 按照如图所示选择相应参数,本次示例只在联系人创建的时候触发,所以 Replay Option 选择的是 ONLY_NEW

将 Logger 组件拖出,目的用于记录 Inboud payload.

设置变量,类似 Logger 组件,将 Set Variable 拖出放在 Logger组件的后面

重复以上步骤,设置三个变量: 第一个变量可以设置如下,目的是在 Inbound change 事件中 sfdc_id 的值与 Salesforce 的 record id 对应。
1
2
3
Display Name: Set Salesforce Record Id
Name: sfdc_id
Value: payload.data.payload.ChangeEventHeader.recordIds[0]

第二个变量设置如图所示:
1
2
3
Display Name: Set Last Name
Name: last_name
Value: payload.data.payload.Name.LastName

第三个变量设置如图所示:
1
2
3
Display Name: Set Change Type
Name: change_type
Value: payload.data.payload.ChangeEventHeader.changeType

将 Choice 拖出,目的是判断当前的 CDC 事件是否为 CREATE .

设置判断条件:vars.change_type == 'CREATE'

将 Database 的 Insert 操作拖出放入第一个选项框中。

设置 sql 语句,和匹配的参数,插入数据库。

启动 Mule Application:

在 Salesforce 中创建一条 Contact 记录,Lasrt Name 为:MuleSoft CDC

我们在 MuleSoft Application 里可以看到 CDC 事件已经触发,并且 payload 日志已经打印出来 :

再来看下数据库更新情况,有一条新的记录产生,sfdc_id 为刚才在 Salesforce 创建的联系人记录。

总结
通过本教程,您已经成功实现了以下功能:
- Salesforce 联系人数据变更的实时捕获
- 使用 MuleSoft 进行数据处理和转换
- 将数据实时同步到 PostgreSQL 数据库
这个集成方案为实现系统间的实时数据同步提供了可靠的基础。您可以基于此框架扩展更多的集成场景,如添加其他对象的 CDC 监听、增加更复杂的数据转换逻辑等。