通过REST API更新 NiFi 数据流程
通过 NiFi 的REST API连接到服务并更新数据流程。
1、软件环境与预备知识
- NiFi 安装并在localhost下运行,参考:
- Groovy - 因为其 JSON builders 和 REST DSLs 非常好。如果使用Mac, 最简单的安装办法是: brew install groovy. 如果还没有安装Homebrew (Mac下的软件包管理器), 参考
- 完整的脚本在GitHub:
- NiFi REST API 文档:
2、创建测试流程
下面是一个测试流程:
准备这个测试流程:
- 添加 PutFile 组件到绘图面板。
- 重命名processor 为 Save File (right-click -> Configure -> Settings -> Name field)。后面将使用这个名称通过API查找 processor。
- 添加 GetHTTP processor,创建一个从 GetHTTP 到 ‘Save File’的连接。GetHTTP设置现在可以暂不设置,Save File processor 需要一个输入连接。
- 设置 Save File 属性如下 (这个设置后面可以通过编程进行修改)。
- 启动 Save File processor (这里不需要启动 GetHTTP)。
注意: 对于更复杂的流程,可以使用模版来创建,参考:
3、调用REST API
下一步, 我们更新 Save File processor 使用不同的目录 (/tmp/staging) 并且设置 “Create Missing Directories”为 “true”。
High-level script flow:
- 查找 data flow中需要操作的组件。输入查找'Save File',这与Web UI中的查找框使用的是同一个 API。
- 确认这是唯一的 processor - 我们希望修改的事希望的那一个处理节点。
- 与framework的状态同步 - 取得最新版本的字段值, 将会用于下面的更新语句中。这是经典的乐观锁模式的实现。
- 构建一个小的 JSON 文档,只包含状态改变。
- 执行部分更新,通过 PUT 操作进行。
- 重复 4-5,停止, 更新配置 (改变目录和目录属性),启动 processor。
下面我们直接执行这个脚本 (如果希望保存、修改这个脚本,从github中clone/checkout到本地):
groovy https://raw.githubusercontent.com/aperepel/nifi-rest-api-tutorial/master/reconfigure.groovy
可以看到输出信息,如下:
Looking up a component to update...Found the component, id/group: c35f1bb7-5add-427f-864a-bdd23bb4ac7f/f1a2c4e8-b106-4877-97d9-9dbca868fc16Preparing to update the flow state... Stopping the processor to apply changes... Updating processor... { "revision": { "clientId": "my awesome script", "version": 309 }, "processor": { "id": "c35f1bb7-5add-427f-864a-bdd23bb4ac7f", "config": { "properties": { "Directory": "/tmp/staging", "Create Missing Directories": "true" } } } } Updated ok. Bringing the updated processor back online... Ok
检查NiFi processor, 可以看到更新的目录和属性,并且创建了缺失的目录。除此之外, 每一步都被捕获下来并记录在 flow history中,如下:
当在UI中看见警告信息, 简单地点击Refresh链接刷新。在本文的后面将会介绍并发控制。
4、代码详解
首先, 拉取依赖项 。这个在 maven 仓库,构建时自动获取。
@Grab(group='org.codehaus.groovy.modules.http-builder',module='http-builder',version='0.7.1')
这个库让我们使用REST DSL,如下所示:
nifi.get(path: 'controller/search-results',query: [q: processorName])nifi.put(path: "controller/process-groups/$processGroup/processors/$processorId", body: builder.toPrettyString(),requestContentType: JSON)
下一步, 使用Groovy的 JSON builder去构建一个JSON 文档,实现部分PUT更新。只需要指定希望改变的属性,如下:
builder {revision { clientId 'my awesome script' version resp.data.revision.version }processor { id "$processorId" config { properties { 'Directory' '/tmp/staging' 'Create Missing Directories' 'true' } } }}
这些dot-notation变量遍历JSON文档树。为了理解如何结构化这个返回结果,启动一个 GET 请求,将获取一个完整的 state 文档。
提示: UI 通过REST API来执行所有操作, 这是一个非常好的交互学习工具。注意,UI 对于PUT 和 POST (form) requests的操作是互换的, 所以选择在于那种用起来方便。这里我们通过 PUT 和 JSON执行操作。
最后, clientId 和 version 在下一节中进行介绍。
5、在 NiFi 中的乐观锁
下图展示了几本概念。
对于update操作,提供 clientId 是必须的,以避免一致性问题(API 将返回 409 Conflict status 代码,如果开发者不知道这一点的话,将会引起困惑)。
controller/revision 返回用户的 最后修改流程的clientId,这不会总是你的 id。 最佳实践就是提供一个你的唯一ID以区分客户端。这实际上可以是任何格式的值, UUID 是框架在缺失时自动创建出来的缺省值。
英文:
更多NiFi资源参考: