使用 MuleSoft Anypoint Studio 同步 Salesforce 数据至 Postgres

Posted by Peter Dong on January 6, 2023

在本教程中, 我们将学习如何从在 Salesforce 中订阅 Change Data Capture(CDC)事件然后利用 MuleSoft 平台将数据同步至外部数据库(本示例为 Postgres). 本教程主要包括:

  • 在 Salesforce 的联系人对象上发布一个 change data capture 事件
  • 使用 Salesforce Connector 设置 Replay Channel Listener 来订阅联系人变更事件
  • 在 Anypoint Studio 中查看 change data capture 事件的结构
  • 在 Postgres 数据库中查看同步的数据

img

在开始前需要准备的环境:

  • Anypoint Studio: 为 MuleSoft 开发者提供的开发环境, 点击安装
  • Salesforce Developer Edition Org: 一个免费的 Salesforce 开发者环境. 注册请点击
  • Salesforce Security Token: 一个区分大小写的字母数字密钥, 与密码结合使用, 以通过 API 访问 Salesforce 资源
  • Postgres: 一个关系型数据库, 因为需要连接数据库, 所以需要提供一个可访问的 PostgreSQL 数据库.

Setup CDC in Salesforce

  1. 使用您在注册时设置的用户名和密码登陆到您的 Salesforce 开发者版本 Org.

  2. 打开 SetUp -> Home, 在搜索框中输入 Change Data Capture, 点击进入详情页, 这个页面显示了可以在 Salesforce 中发布 CDC 事件通知的可用对象, 在本教程中,我们将发布联系人对象的 CDC 事件.找到联系人对象,选择它.

img

点击保存, 你现在已经指定,当一个联系人记录的数据发生变化时,一个事件将被发布到一个共享的事件 event bus.现在我们将设置一个 Mule project 来订阅这些事件.

Create Mule project

启动 Anypoint Studio, 点击 file, 然后选择 New > Mule Project. 项目名字可以叫 salesforce-cdc 然后点击 Finish.

img

创建一个 global.xml 文件来存储你的 connector 配置。 右键点击 src/main/resources 文件夹。选择 New > File.

img

文件名为 global.xml 然后 点击 Finish.

img

在 global.yaml 文件中, 通过复制和粘贴以下内容, 创建以下属性.

1
2
3
4
5
6
7
8
9
10
salesforce:
  username: "salesforce@example.com"
  password: "xxx"
  token: "xxx"
  
postgres: 
  username: "peter.dong"
  password: "xxx"
  database: "xxx"
  url:  "jdbc:postgresql://xxx.compute-1.amazonaws.com:5432/database"

在 Anypoint Studio 右边的 Mule Palette, 添加 SalesforceDatabase 组件.

img

然后点击 Salesforce, 将 Replay channel listener 拖拽到左边的 Flow 里.

img img

点击 Global Elements 配置相关全局的变量.

img

点击 Create .

img

输入框搜索 Prop, 点击 OK.

img

在弹出的界面里, 输入 global.yaml, 这么做的目的是让 Mule Application 知道配置的 properties 在 global.yaml 文件里.

img

接下来配置 Salesforce Configuration, 重复上面步骤, 点击 Create, 搜索 salesforce 关键字, 点击 OK.

img

根据下图所示内容填写对应的参数, Authorization URL 可以留空, 因为我们是 Salesforce 开发者账号, 配置默认为 login.salesforce.com.

img

点击 Test Connection, 测试账号是否可以正常连接 Salesforce, 连接正常, 点击 OK.

img

接下来配置 Database Configuration, 重复上面步骤, 点击 Create, 搜索 database 关键字, 点击 OK.

img

Connection 选择 Generic Connection.

img

JDBC Driver 选择 Add Maven dependency, 搜索框输入 postgresql

img img

然后在 Connection 里, 填入对应的 Database 参数, 点击 Test Connection, 验证数据库是否可以连接成功.

img img

配置 message flow, 选中 Replay channel listener, 按照如图所示选择相应参数, 本次示例只在联系人创建的时候触发, 所以 Replay Option 选择的是 ONLY_NEW

img

Logger 组件拖出, 目的用于记录 Inboud payload.

img

设置变量, 类似 Logger 组件, 将 Set Variable 拖出放在 Logger组件的后面

img

重复以上步骤, 设置三个变量: 第一个变量可以设置如下, 目的是在 Inbound change 事件中 sfdc_id 的值与Salesforce的 record id 对应.

1
2
3
Display Name: Set Salesforce Record Id
Name: sfdc_id
Value: payload.data.payload.ChangeEventHeader.recordIds[0]

img

第二个变量设置如图所示:

1
2
3
Display Name: Set Last Name
Name: last_name
Value: payload.data.payload.Name.LastName

img

第三个变量设置如图所示:

1
2
3
Display Name: Set Change Type
Name: change_type
Value: payload.data.payload.ChangeEventHeader.changeType

img

Choice 拖出, 目的是判断当前的CDC事件是否为 CREATE .

img img

设置判断条件: vars.change_type == 'CREATE'

img

DatabaseInsert 操作拖出放入第一个选项框中.

img

设置 sql 语句, 和匹配的参数, 插入数据库.

img

启动 Mule Application:

img img

在 Salesforce 中创建一条 Contact 记录, Lasrt Name 为: MuleSoft CDC

img

我们在 MuleSoft Application 里可以看到 CDC 事件已经触发, 并且 payload 日志已经打印出来 :

img

再来看下数据库更新情况, 有一条新的记录产生, sfdc_id 为刚才在 Salesforce 创建的联系人记录.

img


Buy Me a Coffee