
在apache flink流处理应用中,从数据源(如kafka、文件等)获取的原始数据通常是json格式的字符串。为了进一步解析和处理这些结构化数据,我们通常需要将其转换为jsonobject对象。然而,许多开发者在尝试将string类型的数据通过processfunction或其他算子转换为jsonobject并收集输出时,可能会遇到如下异常:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.lang.RuntimeException: Assigned key must not be null! Caused by: java.lang.NullPointerException: Assigned key must not be null!
尽管在调试过程中发现String已经成功解析成了JSONObject实例,但当尝试通过Collector收集这些对象时,作业却失败了。这表明问题并非出在JSON解析本身,而可能与JSONObject对象的特定实现、其与Flink内部序列化机制的兼容性,或者其内部状态有关。
原始的错误代码片段如下:
import com.alibaba.fastjson.JSONObject; // 假设使用了Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class FlinkJsonProcessingIssue {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.fromElements(
"{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
);
SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
// 尝试使用JSONObject.parseObject()
JSONObject jsonObject = JSONObject.parseObject(value);
out.collect(jsonObject); // 在这里抛出异常
}
});
jsonObjDS.print();
env.execute();
}
}解决上述NullPointerException的关键在于选择正确的JSONObject实现及其初始化方式。经过验证,使用org.json库提供的JSONObject,并通过其构造函数直接传入JSON字符串可以避免此问题。
首先,确保您的项目中引入了org.json库的依赖:
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version> <!-- 可以根据需要选择最新稳定版本 -->
</dependency>然后,在ProcessFunction中将JSONObject.parseObject(value)替换为new JSONObject(value):
import org.json.JSONObject; // 注意这里引入的是org.json.JSONObject
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class FlinkJsonProcessingSolution {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.fromElements(
"{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
);
SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
// 使用org.json.JSONObject的构造函数
JSONObject jsonObject = new JSONObject(value);
out.collect(jsonObject); // 现在可以正常收集
}
});
jsonObjDS.print();
env.execute();
}
}此更改后,Flink作业将能够正常运行并打印出转换后的JSONObject。这表明org.json库的JSONObject实现与Flink的内部机制(尤其是其类型序列化器)兼容性更好,或者其内部状态在被序列化和反序列化时能够保持完整性,从而避免了NullPointerException。
尽管上述方法能够解决String到JSONObject的转换问题,但在实际生产环境中,特别是在处理大量数据或对性能有较高要求的Flink应用中,直接传递和处理JSONObject通常不是最佳实践。
原因如下:
Easily find JSON paths within JSON objects using our intuitive Json Path Finder
30
推荐做法:将JSON字符串反序列化为POJO
在Flink中,最佳实践是将JSON字符串反序列化为定义好的POJO类。这通常通过自定义DeserializationSchema或使用Flink提供的JSON格式(如JsonRowSerializationSchema)来实现。
例如,对于上述JSON数据,我们可以定义一个POJO类:
import java.io.Serializable;
public class BillInfo implements Serializable {
private String ADD_TIME;
private String ORDER_ID;
private String ADDER_NO;
private String UPDATER_NO;
private String S_USER_ID;
private String B_USER_ID;
private String BILL_ID;
private String ADDER_NAME;
private String UPDATE_TIME;
private String UPDATER_NAME;
// 必须有无参构造函数
public BillInfo() {}
// Getter和Setter方法
public String getADD_TIME() { return ADD_TIME; }
public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
public String getORDER_ID() { return ORDER_ID; }
public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
public String getADDER_NO() { return ADDER_NO; }
public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
public String getUPDATER_NO() { return UPDATER_NO; }
public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
public String getS_USER_ID() { return S_USER_ID; }
public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
public String getB_USER_ID() { return B_USER_ID; }
public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
public String getBILL_ID() { return BILL_ID; }
public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
public String getADDER_NAME() { return ADDER_NAME; }
public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
public String getUPDATE_TIME() { return UPDATE_TIME; }
public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
public String getUPDATER_NAME() { return UPDATER_NAME; }
public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }
@Override
public String toString() {
return "BillInfo{" +
"ADD_TIME='" + ADD_TIME + '\'' +
", ORDER_ID='" + ORDER_ID + '\'' +
", ADDER_NO='" + ADDER_NO + '\'' +
", UPDATER_NO='" + UPDATER_NO + '\'' +
", S_USER_ID='" + S_USER_ID + '\'' +
", B_USER_ID='" + B_USER_ID + '\'' +
", BILL_ID='" + BILL_ID + '\'' +
", ADDER_NAME='" + ADDER_NAME + '\'' +
", UPDATE_TIME='" + UPDATE_TIME + '\'' +
", UPDATER_NAME='" + UPDATER_NAME + '\'' +
'}';
}
}
// 如果JSON结构更复杂,包含嵌套对象,则需要定义相应的嵌套POJO
public class RootData implements Serializable {
private BillInfo bill_info;
public RootData() {}
public BillInfo getBill_info() { return bill_info; }
public void setBill_info(BillInfo bill_info) { this.bill_info = bill_info; }
@Override
public String toString() {
return "RootData{" +
"bill_info=" + bill_info +
'}';
}
}然后,可以使用Jackson或Gson等库在ProcessFunction中将字符串反序列化为POJO:
import com.fasterxml.jackson.databind.ObjectMapper; // 引入Jackson库
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class FlinkPojoProcessing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.fromElements(
"{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
);
SingleOutputStreamOperator<RootData> pojoDS = inputDS.process(new ProcessFunction<String, RootData>() {
// ObjectMapper 是线程安全的,可以作为类成员或静态成员
private transient ObjectMapper objectMapper;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
objectMapper = new ObjectMapper();
}
@Override
public void processElement(String value, ProcessFunction<String, RootData>.Context ctx, Collector<RootData> out) throws Exception {
RootData rootData = objectMapper.readValue(value, RootData.class);
out.collect(rootData);
}
});
pojoDS.print();
env.execute();
}
}为了使用Jackson,需要添加以下依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version> <!-- 根据需要选择最新稳定版本 -->
</dependency>在Flink中将JSON字符串转换为JSONObject时,如果遇到NullPointerException,尝试使用org.json库的new JSONObject(value)构造函数通常可以解决问题。然而,从长期维护和性能优化的角度来看,强烈建议将JSON字符串反序列化为POJO。POJO不仅提供了更好的类型安全和代码可读性,还能显著提高Flink应用的序列化和反序列化效率,是构建健壮、高性能流处理应用的基石。
以上就是Flink中JSON字符串到JSONObject转换的陷阱与最佳实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号