在本教程中,我们将学习如何从在 Salesforce 中订阅 Change Data Capture(CDC) 事件然后利用 MuleSoft 平台将数据同步至外部数据库 (本示例为 Postgres). 本教程主要包括:
- 在 Salesforce 的联系人对象上发布一个 change data capture 事件
- 使用 Salesforce Connector 设置 Replay Channel Listener 来订阅联系人变更事件
- 在 Anypoint Studio 中查看 change data capture 事件的结构
- 在 Postgres 数据库中查看同步的数据
在开始前需要准备的环境:
- Anypoint Studio: 为 MuleSoft 开发者提供的开发环境,点击安装
- Salesforce Developer Edition Org: 一个免费的 Salesforce 开发者环境。注册请点击
- Salesforce Security Token: 一个区分大小写的字母数字密钥,与密码结合使用,以通过 API 访问 Salesforce 资源
- Postgres: 一个关系型数据库,因为需要连接数据库,所以需要提供一个可访问的 PostgreSQL 数据库。
Setup CDC in Salesforce
-
使用您在注册时设置的用户名和密码登陆到您的 Salesforce 开发者版本 Org.
-
打开
SetUp
->Home
, 在搜索框中输入 Change Data Capture, 点击进入详情页,这个页面显示了可以在 Salesforce 中发布 CDC 事件通知的可用对象,在本教程中,我们将发布联系人对象的 CDC 事件。找到联系人对象,选择它。
点击保存,你现在已经指定,当一个联系人记录的数据发生变化时,一个事件将被发布到一个共享的事件 event bus.现在我们将设置一个 Mule project 来订阅这些事件。
Create Mule project
启动 Anypoint Studio
, 点击 file
, 然后选择 New
> Mule Project
. 项目名字可以叫 salesforce-cdc
然后点击 Finish
.
创建一个 global.xml 文件来存储你的 connector 配置。右键点击 src/main/resources 文件夹。选择 New
> File
.
文件名为 global.xml
然后 点击 Finish
.
在 global.yaml 文件中,通过复制和粘贴以下内容,创建以下属性。
1
2
3
4
5
6
7
8
9
10
salesforce:
username: "salesforce@example.com"
password: "xxx"
token: "xxx"
postgres:
username: "peter.dong"
password: "xxx"
database: "xxx"
url: "jdbc:postgresql://xxx.compute-1.amazonaws.com:5432/database"
在 Anypoint Studio 右边的 Mule Palette
, 添加 Salesforce
和 Database
组件。
然后点击 Salesforce
, 将 Replay channel listener
拖拽到左边的 Flow 里。
点击 Global Elements
配置相关全局的变量。
点击 Create
.
输入框搜索 Prop
, 点击 OK
.
在弹出的界面里,输入 global.yaml
, 这么做的目的是让 Mule Application 知道配置的 properties 在 global.yaml 文件里。
接下来配置 Salesforce Configuration
, 重复上面步骤,点击 Create
, 搜索 salesforce 关键字,点击 OK
.
根据下图所示内容填写对应的参数,Authorization URL 可以留空,因为我们是 Salesforce 开发者账号,配置默认为 login.salesforce.com.
点击 Test Connection
, 测试账号是否可以正常连接 Salesforce, 连接正常,点击 OK
.
接下来配置 Database Configuration
, 重复上面步骤,点击 Create
, 搜索 database 关键字,点击 OK
.
Connection
选择 Generic Connection
.
JDBC Driver
选择 Add Maven dependency
, 搜索框输入 postgresql
然后在 Connection 里,填入对应的 Database 参数,点击 Test Connection
, 验证数据库是否可以连接成功。
配置 message flow
, 选中 Replay channel listener
, 按照如图所示选择相应参数,本次示例只在联系人创建的时候触发,所以 Replay Option 选择的是 ONLY_NEW
将 Logger
组件拖出,目的用于记录 Inboud payload.
设置变量,类似 Logger
组件,将 Set Variable
拖出放在 Logger
组件的后面
重复以上步骤,设置三个变量: 第一个变量可以设置如下,目的是在 Inbound change 事件中 sfdc_id 的值与 Salesforce 的 record id 对应。
1
2
3
Display Name: Set Salesforce Record Id
Name: sfdc_id
Value: payload.data.payload.ChangeEventHeader.recordIds[0]
第二个变量设置如图所示:
1
2
3
Display Name: Set Last Name
Name: last_name
Value: payload.data.payload.Name.LastName
第三个变量设置如图所示:
1
2
3
Display Name: Set Change Type
Name: change_type
Value: payload.data.payload.ChangeEventHeader.changeType
将 Choice
拖出,目的是判断当前的 CDC 事件是否为 CREATE .
设置判断条件:vars.change_type == 'CREATE'
将 Database
的 Insert
操作拖出放入第一个选项框中。
设置 sql 语句,和匹配的参数,插入数据库。
启动 Mule Application:
在 Salesforce 中创建一条 Contact 记录,Lasrt Name 为:MuleSoft CDC
我们在 MuleSoft Application 里可以看到 CDC 事件已经触发,并且 payload 日志已经打印出来 :
再来看下数据库更新情况,有一条新的记录产生,sfdc_id 为刚才在 Salesforce 创建的联系人记录。