
ExecuteScript处理器概述
nifi的executescript处理器是一个高度灵活的组件,允许用户在数据流中执行自定义脚本,以实现复杂的数据转换、路由逻辑或与nifi api的交互。它支持多种脚本语言,为nifi流程带来了极大的扩展性。
执行环境:JVM内部运行
与某些通过操作系统fork子进程来执行外部命令的处理器(如ExecuteStreamCommand)不同,ExecuteScript处理器是在Nifi的Java虚拟机(JVM)内部直接执行其脚本的。 这意味着:
- 共享JVM资源: 脚本与Nifi本身运行在同一个JVM进程中,共享Nifi的内存空间和CPU资源。
- 直接API访问: 脚本可以直接访问Nifi的Java API,例如通过session对象操作FlowFile,通过log对象记录日志,以及访问处理器上下文中的其他Nifi服务。
- 无进程开销: 由于不涉及操作系统级别的进程创建和销毁,ExecuteScript的执行开销相对较低,通常性能更优。
与ExecuteStreamCommand的对比: ExecuteStreamCommand处理器会通过操作系统fork一个全新的子进程来执行外部可执行文件或脚本(例如,Python解释器、Bash脚本等)。它通过标准输入/输出流与外部进程进行通信。这种方式的优点是可以运行任何操作系统支持的程序,但缺点是每次执行都会有额外的进程创建和销销毁开销,并且无法直接访问Nifi的内部API。
支持的脚本语言
ExecuteScript处理器支持的脚本语言必须是JVM兼容的。这意味着它们能够直接在Java虚拟机上运行,或者有相应的JVM实现。常见的支持语言包括:
- Groovy: 一种强大的、可选静态类型和动态类型的编程语言,针对Java平台,与Java语法高度兼容。
- Jython: Python语言在Java平台上的实现。它允许Python代码直接访问Java类库,并在JVM上运行。
- JRuby: Ruby语言在Java平台上的实现,同样允许Ruby代码与Java代码无缝交互。
- Nashorn / GraalJS: 用于在JVM上执行JavaScript代码。
注意事项: 当您选择Python作为脚本语言时,实际上使用的是Jython。这意味着您编写的Python代码必须符合Jython的规范,并且可以直接调用Java类。一些原生Python库(尤其是那些依赖C扩展的库)可能无法在Jython环境中正常工作。
示例代码:Groovy脚本操作FlowFile
以下是一个简单的Groovy脚本示例,演示了如何在ExecuteScript处理器中获取FlowFile内容并添加一个属性:
本次升级更新内容:优化分类置顶功能处理机制;修复域名变化带来的cookie域问题;文件上传js的兼容ie9,ie10问题;更新内容编辑器版本;会员服务权限新增求购信息的发布总量限制,求购信息的每日发布量限制;新增供应信息的每日发布量限制;新增分类信息的审核机制控制;新增分类信息的每日发布量限制;新增分类信息的重发刷新功能;优化会员中心的服务类型内容;优化模板运行处理机制;优化会员商铺模板运行机制;
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
// 获取当前FlowFile
def flowFile = session.get()
if (flowFile != null) {
// 读取FlowFile内容并转换为字符串
def content = new StringBuilder()
session.read(flowFile, { inputStream ->
content.append(new String(inputStream.bytes, StandardCharsets.UTF_8))
} as StreamCallback)
log.info("Original FlowFile content: ${content.toString()}")
// 添加一个属性
flowFile = session.putAttribute(flowFile, "my.custom.attribute", "processed_by_groovy")
// 更新FlowFile内容(可选)
// def newContent = "Modified: " + content.toString()
// flowFile = session.write(flowFile, { outputStream ->
// outputStream.write(newContent.getBytes(StandardCharsets.UTF_8))
// } as StreamCallback)
// 转移FlowFile到成功关系
session.transfer(flowFile, REL_SUCCESS)
} else {
// 如果没有FlowFile,则停止处理
log.warn("No FlowFile available for processing.")
}代码说明:
- session.get():获取当前传入的FlowFile。
- session.read(flowFile, ...):读取FlowFile的内容。StreamCallback用于处理输入流。
- session.putAttribute(flowFile, key, value):为FlowFile添加或更新属性。
- session.transfer(flowFile, REL_SUCCESS):将处理后的FlowFile路由到“成功”关系。REL_FAILURE或REL_ORIGINAL等也是常见选项。
- log.info(...):使用Nifi的日志系统记录信息。
总结
Nifi的ExecuteScript处理器是一个强大且高效的工具,用于在Nifi数据流中嵌入自定义逻辑。其核心优势在于在Nifi JVM内部执行,从而实现低开销、高效率以及对Nifi API的直接访问。理解这一执行机制,特别是与ExecuteStreamCommand的差异,对于选择合适的处理器、优化Nifi流程性能以及有效利用其支持的JVM兼容脚本语言至关重要。在编写脚本时,务必考虑所选语言的JVM实现特性(例如Jython对原生Python库的兼容性),以确保脚本的稳定和高效运行。









