博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
通过REST API更新 NiFi 数据流程
阅读量:6320 次
发布时间:2019-06-22

本文共 3067 字,大约阅读时间需要 10 分钟。

hot3.png

通过REST API更新 NiFi 数据流程

通过 NiFi 的REST API连接到服务并更新数据流程。

1、软件环境与预备知识

  • NiFi 安装并在localhost下运行,参考:
  • Groovy - 因为其 JSON builders 和 REST DSLs 非常好。如果使用Mac, 最简单的安装办法是: brew install groovy. 如果还没有安装Homebrew (Mac下的软件包管理器), 参考
  • 完整的脚本在GitHub:
  • NiFi REST API 文档:

2、创建测试流程

下面是一个测试流程:

29111314_SJoY.png

准备这个测试流程:

  • 添加 PutFile 组件到绘图面板。
  • 重命名processor 为 Save File (right-click -> Configure -> Settings -> Name field)。后面将使用这个名称通过API查找 processor。
  • 添加 GetHTTP processor,创建一个从 GetHTTP 到 ‘Save File’的连接。GetHTTP设置现在可以暂不设置,Save File processor 需要一个输入连接。
  • 设置 Save File 属性如下 (这个设置后面可以通过编程进行修改)。
  • 启动 Save File processor (这里不需要启动 GetHTTP)。

29111316_xaJU.png

注意: 对于更复杂的流程,可以使用模版来创建,参考:

3、调用REST API

下一步, 我们更新 Save File processor 使用不同的目录 (/tmp/staging) 并且设置 “Create Missing Directories”为true”。

High-level script flow:

  1. 查找 data flow中需要操作的组件。输入查找'Save File',这与Web UI中的查找框使用的是同一个 API。

    29111320_veHL.png

  2.  确认这是唯一的 processor - 我们希望修改的事希望的那一个处理节点。
  3. 与framework的状态同步 - 取得最新版本的字段值, 将会用于下面的更新语句中。这是经典的乐观锁模式的实现。
  4. 构建一个小的 JSON 文档,只包含状态改变。
  5. 执行部分更新,通过 PUT 操作进行。
  6. 重复 4-5,停止, 更新配置 (改变目录和目录属性),启动 processor。

下面我们直接执行这个脚本 (如果希望保存、修改这个脚本,从github中clone/checkout到本地):

groovy https://raw.githubusercontent.com/aperepel/nifi-rest-api-tutorial/master/reconfigure.groovy

可以看到输出信息,如下:

Looking up a component to update...Found the component, id/group: c35f1bb7-5add-427f-864a-bdd23bb4ac7f/f1a2c4e8-b106-4877-97d9-9dbca868fc16Preparing to update the flow state...	Stopping the processor to apply changes...	Updating processor...	{	"revision": {	"clientId": "my awesome script",	"version": 309	},	"processor": {	"id": "c35f1bb7-5add-427f-864a-bdd23bb4ac7f",	"config": {	"properties": {	"Directory": "/tmp/staging",	"Create Missing Directories": "true"	}	}	}	}	Updated ok.	Bringing the updated processor back online...	Ok

检查NiFi processor, 可以看到更新的目录和属性,并且创建了缺失的目录。除此之外, 每一步都被捕获下来并记录在 flow history中,如下:

29111321_hxW5.png

当在UI中看见警告信息, 简单地点击Refresh链接刷新。在本文的后面将会介绍并发控制。

4、代码详解

首先, 拉取依赖项 。这个在 maven 仓库,构建时自动获取。

 @Grab(group='org.codehaus.groovy.modules.http-builder',module='http-builder',version='0.7.1')

这个库让我们使用REST DSL,如下所示:

nifi.get(path: 'controller/search-results',query: [q: processorName])nifi.put(path: "controller/process-groups/$processGroup/processors/$processorId",    body: builder.toPrettyString(),requestContentType: JSON)

下一步, 使用Groovy的 JSON builder去构建一个JSON 文档,实现部分PUT更新。只需要指定希望改变的属性,如下:

builder {revision {    clientId 'my awesome script'    version resp.data.revision.version    }processor {    id "$processorId"    config {        properties {                'Directory' '/tmp/staging'                'Create Missing Directories' 'true'                }            }          }}

这些dot-notation变量遍历JSON文档树。为了理解如何结构化这个返回结果,启动一个 GET 请求,将获取一个完整的 state 文档。

提示: UI 通过REST API来执行所有操作, 这是一个非常好的交互学习工具。注意,UI 对于PUT 和 POST (form) requests的操作是互换的, 所以选择在于那种用起来方便。这里我们通过 PUT 和 JSON执行操作。

最后,  clientIdversion 在下一节中进行介绍。

5、在 NiFi 中的乐观锁

下图展示了几本概念。

对于update操作,提供 clientId 是必须的,以避免一致性问题(API 将返回 409 Conflict status 代码,如果开发者不知道这一点的话,将会引起困惑)。

controller/revision 返回用户的 最后修改流程的clientId,这不会总是你的 id。 最佳实践就是提供一个你的唯一ID以区分客户端。这实际上可以是任何格式的值, UUID 是框架在缺失时自动创建出来的缺省值。

29111322_QKr1.png

英文:

更多NiFi资源参考:

转载于:https://my.oschina.net/u/2306127/blog/869775

你可能感兴趣的文章
BinaryTree I
查看>>
IE6-IE9兼容性问题列表及解决办法_补充之四:HTC (Html Components) 功能逐渐被IE抛弃...
查看>>
Verilog与C/C++的一些区别
查看>>
DIV焦点事件详解 --【focus和tabIndex】
查看>>
vim php代码规范
查看>>
最最基本的Git入门 -- 本地仓库操作
查看>>
机器学习平台跃迁,AI中台才是大势所趋
查看>>
Imperva开源域目录控制器,简化活动目录集成
查看>>
微软发布预览版SQL Server跨平台开发工具
查看>>
Uber推出数据湖集成神器DBEvents,支持MySQL、Cassandra等
查看>>
Entity Framework Core 2.0的新特性
查看>>
[deviceone开发]-do_Http组件示例
查看>>
实现linux作为server时与windows间的数据同步
查看>>
Spring JDBC详解
查看>>
JAVA 线程Join
查看>>
【翻译】SQL Server索引进阶:第七级,过滤的索引
查看>>
连连看路径求解的算法
查看>>
JavaScript 的面向对象编程
查看>>
kafka的topic和分区策略——log entry和消息id索引文件
查看>>
splunk的bucket组织目录——时间序列,按照时间来组织目录
查看>>