首页 > Java > java教程 > 正文

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

霞舞
发布: 2025-09-25 12:02:19
原创
473人浏览过

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

在Apache Flink中处理JSON字符串时,开发者常遇到将String类型数据转换为JSONObject的需求。然而,直接使用JSONObject.parseObject()可能导致NullPointerException,即使字符串已正确解析。本文将揭示这一常见问题的原因,提供使用org.json库中new JSONObject(value)的正确解决方案,并强调出于性能和类型安全考虑,在生产环境中优先使用POJO进行JSON反序列化的最佳实践。

Flink中JSON字符串转换的常见问题

apache flink流处理应用中,从数据源(如kafka、文件等)获取的原始数据通常是json格式的字符串。为了进一步解析和处理这些结构化数据,我们通常需要将其转换为jsonobject对象。然而,许多开发者在尝试将string类型的数据通过processfunction或其他算子转换为jsonobject并收集输出时,可能会遇到如下异常:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!
登录后复制

尽管在调试过程中发现String已经成功解析成了JSONObject实例,但当尝试通过Collector收集这些对象时,作业却失败了。这表明问题并非出在JSON解析本身,而可能与JSONObject对象的特定实现、其与Flink内部序列化机制的兼容性,或者其内部状态有关。

原始的错误代码片段如下:

import com.alibaba.fastjson.JSONObject; // 假设使用了Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingIssue {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 尝试使用JSONObject.parseObject()
                JSONObject jsonObject = JSONObject.parseObject(value);
                out.collect(jsonObject); // 在这里抛出异常
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}
登录后复制

解决方案:使用org.json库的JSONObject构造函数

解决上述NullPointerException的关键在于选择正确的JSONObject实现及其初始化方式。经过验证,使用org.json库提供的JSONObject,并通过其构造函数直接传入JSON字符串可以避免此问题。

首先,确保您的项目中引入了org.json库的依赖:

<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20180130</version> <!-- 可以根据需要选择最新稳定版本 -->
</dependency>
登录后复制

然后,在ProcessFunction中将JSONObject.parseObject(value)替换为new JSONObject(value):

import org.json.JSONObject; // 注意这里引入的是org.json.JSONObject
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 使用org.json.JSONObject的构造函数
                JSONObject jsonObject = new JSONObject(value);
                out.collect(jsonObject); // 现在可以正常收集
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}
登录后复制

此更改后,Flink作业将能够正常运行并打印出转换后的JSONObject。这表明org.json库的JSONObject实现与Flink的内部机制(尤其是其类型序列化器)兼容性更好,或者其内部状态在被序列化和反序列化时能够保持完整性,从而避免了NullPointerException。

最佳实践:优先使用POJO进行JSON反序列化

尽管上述方法能够解决String到JSONObject的转换问题,但在实际生产环境中,特别是在处理大量数据或对性能有较高要求的Flink应用中,直接传递和处理JSONObject通常不是最佳实践。

原因如下:

Find JSON Path Online
Find JSON Path Online

Easily find JSON paths within JSON objects using our intuitive Json Path Finder

Find JSON Path Online 30
查看详情 Find JSON Path Online
  1. 序列化与反序列化开销大: JSONObject是一个通用的Map结构,其内部字段类型不固定,这使得Flink在序列化和反序列化JSONObject时,需要进行更多的元数据处理和类型推断,导致额外的CPU和内存开销。相比之下,POJO(Plain Old Java Object)具有固定的结构和明确的字段类型,Flink可以利用Kyro等高效序列化器进行快速、紧凑的序列化。
  2. 缺乏类型安全: JSONObject的操作通常基于字符串键值对,容易出现拼写错误或类型转换错误,且这些错误通常在运行时才能发现。而POJO提供了编译时类型检查,能够有效减少运行时错误。
  3. 可读性和可维护性差: 使用JSONObject意味着需要通过getString("key")、getInt("key")等方法手动提取字段,代码冗长且不易阅读。POJO则允许直接通过属性访问数据,代码更简洁、更具可读性。
  4. Schema演进: 随着业务发展,JSON数据的Schema可能会发生变化。POJO可以更优雅地处理Schema演进,例如通过添加新字段或使用@JsonIgnoreProperties(ignoreUnknown = true)注解忽略未知字段。

推荐做法:将JSON字符串反序列化为POJO

在Flink中,最佳实践是将JSON字符串反序列化为定义好的POJO类。这通常通过自定义DeserializationSchema或使用Flink提供的JSON格式(如JsonRowSerializationSchema)来实现。

例如,对于上述JSON数据,我们可以定义一个POJO类:

import java.io.Serializable;

public class BillInfo implements Serializable {
    private String ADD_TIME;
    private String ORDER_ID;
    private String ADDER_NO;
    private String UPDATER_NO;
    private String S_USER_ID;
    private String B_USER_ID;
    private String BILL_ID;
    private String ADDER_NAME;
    private String UPDATE_TIME;
    private String UPDATER_NAME;

    // 必须有无参构造函数
    public BillInfo() {}

    // Getter和Setter方法
    public String getADD_TIME() { return ADD_TIME; }
    public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
    public String getORDER_ID() { return ORDER_ID; }
    public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
    public String getADDER_NO() { return ADDER_NO; }
    public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
    public String getUPDATER_NO() { return UPDATER_NO; }
    public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
    public String getS_USER_ID() { return S_USER_ID; }
    public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
    public String getB_USER_ID() { return B_USER_ID; }
    public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
    public String getBILL_ID() { return BILL_ID; }
    public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
    public String getADDER_NAME() { return ADDER_NAME; }
    public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
    public String getUPDATE_TIME() { return UPDATE_TIME; }
    public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
    public String getUPDATER_NAME() { return UPDATER_NAME; }
    public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }

    @Override
    public String toString() {
        return "BillInfo{" +
               "ADD_TIME='" + ADD_TIME + '\'' +
               ", ORDER_ID='" + ORDER_ID + '\'' +
               ", ADDER_NO='" + ADDER_NO + '\'' +
               ", UPDATER_NO='" + UPDATER_NO + '\'' +
               ", S_USER_ID='" + S_USER_ID + '\'' +
               ", B_USER_ID='" + B_USER_ID + '\'' +
               ", BILL_ID='" + BILL_ID + '\'' +
               ", ADDER_NAME='" + ADDER_NAME + '\'' +
               ", UPDATE_TIME='" + UPDATE_TIME + '\'' +
               ", UPDATER_NAME='" + UPDATER_NAME + '\'' +
               '}';
    }
}

// 如果JSON结构更复杂,包含嵌套对象,则需要定义相应的嵌套POJO
public class RootData implements Serializable {
    private BillInfo bill_info;

    public RootData() {}

    public BillInfo getBill_info() { return bill_info; }
    public void setBill_info(BillInfo bill_info) { this.bill_info = bill_info; }

    @Override
    public String toString() {
        return "RootData{" +
               "bill_info=" + bill_info +
               '}';
    }
}
登录后复制

然后,可以使用Jackson或Gson等库在ProcessFunction中将字符串反序列化为POJO:

import com.fasterxml.jackson.databind.ObjectMapper; // 引入Jackson库
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkPojoProcessing {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<RootData> pojoDS = inputDS.process(new ProcessFunction<String, RootData>() {
            // ObjectMapper 是线程安全的,可以作为类成员或静态成员
            private transient ObjectMapper objectMapper; 

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                super.open(parameters);
                objectMapper = new ObjectMapper();
            }

            @Override
            public void processElement(String value, ProcessFunction<String, RootData>.Context ctx, Collector<RootData> out) throws Exception {
                RootData rootData = objectMapper.readValue(value, RootData.class);
                out.collect(rootData);
            }
        });
        pojoDS.print();

        env.execute();
    }
}
登录后复制

为了使用Jackson,需要添加以下依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.4</version> <!-- 根据需要选择最新稳定版本 -->
</dependency>
登录后复制

总结

在Flink中将JSON字符串转换为JSONObject时,如果遇到NullPointerException,尝试使用org.json库的new JSONObject(value)构造函数通常可以解决问题。然而,从长期维护和性能优化的角度来看,强烈建议将JSON字符串反序列化为POJO。POJO不仅提供了更好的类型安全和代码可读性,还能显著提高Flink应用的序列化和反序列化效率,是构建健壮、高性能流处理应用的基石。

以上就是Flink中JSON字符串到JSONObject转换的陷阱与最佳实践的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号