
在高性能java应用中,尤其是在与c++等原生代码进行数据交互时,directbytebuffer(直接缓冲区)是管理非堆内存的常用机制。通过jni(java native interface),java程序可以获取并操作直接映射到共享内存的directbytebuffer,从而实现java与c++进程间的高效数据共享,避免数据在jvm堆与非堆之间的来回复制。
例如,一个典型的JNI函数可能如下所示,它将一个C++共享内存地址映射为一个Java DirectByteBuffer:
JNIEXPORT jobject JNICALL Java_service_SharedMemoryJNIService_getDirectByteBuffer
(JNIEnv *env, jclass jobject, jlong buf_addr, jint buf_len){
return env->NewDirectByteBuffer((void *)buf_addr, buf_len);
}当我们需要将这个DirectByteBuffer中的数据上传到Amazon S3等云存储服务时,传统的做法往往会引入不必要的内存复制。
许多云存储客户端库(例如jclouds的早期版本或某些简化API)在处理数据上传时,倾向于接收一个字节数组(byte[])作为数据源。这意味着,如果我们的数据存储在DirectByteBuffer中,就必须先将其内容复制到JVM堆上的一个byte[]数组中,然后再进行上传。
以下是一个典型的传统上传代码示例,它展示了这种内存复制:
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.io.payloads.ByteArrayPayload;
import java.nio.ByteBuffer;
public class S3Uploader {
// 假设 getBlobStoreContext() 方法已存在并返回 BlobStoreContext 实例
private BlobStoreContext getBlobStoreContext() {
// ... 返回 BlobStoreContext 实例
return null;
}
public String uploadByteBuffer(String container, String objectKey, ByteBuffer bb) {
BlobStoreContext context = getBlobStoreContext();
BlobStore blobStore = context.getBlobStore();
// 问题所在:将 DirectByteBuffer 的内容复制到 JVM 堆内存中的 byte[]
byte[] buf = new byte[bb.capacity()];
bb.get(buf); // 这一步产生了不必要的内存复制
ByteArrayPayload payload = new ByteArrayPayload(buf);
Blob blob = blobStore.blobBuilder(objectKey)
.payload(payload)
.contentLength(bb.capacity())
.build();
blobStore.putBlob(container, blob);
return objectKey;
}
}对于小文件而言,这种复制的开销可能不明显。然而,当处理大容量数据(如50MB甚至更大)时,将DirectByteBuffer中的数据完整复制到JVM堆内存中,会带来显著的性能下降和内存压力:
为了避免上述问题,我们可以利用jclouds等库提供的更灵活的数据源抽象——com.google.common.io.ByteSource。ByteSource是一个Guava库提供的接口,它代表一个可以提供字节流的数据源。通过实现自定义的ByteSource,我们可以直接从DirectByteBuffer中读取数据,并将其封装成InputStream,从而实现数据的流式传输,避免一次性加载到堆内存。
核心思想是创建一个ByteSource的实现类,其中openStream()方法返回一个能够直接从DirectByteBuffer读取数据的InputStream。
我们需要创建两个类:一个继承自ByteSource的ByteBufferByteSource,以及一个作为其内部类的InputStream实现ByteBufferInputStream。
import com.google.common.base.Preconditions; // 用于参数检查,如果Guava未引入,可替换为普通null检查
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
/**
* 一个 ByteSource 实现,用于直接从 ByteBuffer 读取数据。
* 这避免了在上传 DirectByteBuffer 数据时将其复制到堆内存字节数组。
*/
public class ByteBufferByteSource extends ByteSource {
private final ByteBuffer buffer; // 原始缓冲区,通常是一个 DirectByteBuffer
/**
* 构造一个 ByteBufferByteSource 实例。
* 提供的 ByteBuffer 应该处于可读状态(例如,数据写入后可能已执行 flip() 操作)。
*
* @param buffer 要读取的 ByteBuffer。不能为 null。
*/
public ByteBufferByteSource(ByteBuffer buffer) {
this.buffer = Preconditions.checkNotNull(buffer, "ByteBuffer 不能为空");
}
@Override
public InputStream openStream() {
// 创建缓冲区的副本,以确保对原始缓冲区的后续操作或对 openStream() 的多次调用
// 不会干扰此流的当前位置。副本与原始缓冲区共享底层数据,但拥有独立的 position、limit 和 mark。
return new ByteBufferInputStream(buffer.duplicate());
}
/**
* 一个 InputStream 实现,用于直接从 ByteBuffer 读取数据。
*/
private static final class ByteBufferInputStream extends InputStream {
private final ByteBuffer buffer;
private boolean closed = false;
ByteBufferInputStream(ByteBuffer buffer) {
this.buffer = buffer;
}
@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("流已关闭");
}
if (!buffer.hasRemaining()) {
return -1; // 流结束
}
try {
// 读取单个字节并将其作为整数返回 (0-255)。
// & 0xFF 确保字节被视为无符号整数。
return buffer.get() & 0xFF;
} catch (BufferUnderflowException bue) {
// 理论上在检查 hasRemaining() 后不应发生,但作为安全措施。
return -1;
}
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("流已关闭");
}
if (b == null) {
throw new NullPointerException("缓冲区 'b' 不能为 null");
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException(
String.format("无效的读取参数: b.length=%d, off=%d, len=%d", b.length, off, len));
}
if (len == 0) {
return 0; // 没有字节需要读取
}
int bytesToRead = Math.min(len, buffer.remaining());
if (bytesToRead == 0) {
return -1; // 流结束
}
// 直接将数据读取到提供的字节数组中。
buffer.get(b, off, bytesToRead);
return bytesToRead;
}
@Override
public void close() throws IOException {
super.close();
closed = true;
// 底层 ByteBuffer 不在此处关闭,因为其生命周期由外部管理。
// 它是共享内存的一个视图,因此关闭它是不合适的。
}
}
}代码说明:
有了 ByteBufferByteSource 类,我们就可以修改上传逻辑,直接使用它来创建 Blob,从而避免中间的堆内存复制。
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import java.nio.ByteBuffer;
public class OptimizedS3Uploader {
// 假设 getBlobStoreContext() 方法已存在并返回 BlobStoreContext 实例
private BlobStoreContext getBlobStoreContext() {
// ... 返回 BlobStoreContext 实例
return null;
}
/**
* 将 DirectByteBuffer 中的数据直接上传到 S3,避免中间的堆内存复制。
*
* @param container S3 存储桶名称
* @param objectKey S3 对象键
* @param directByteBuffer 包含待上传数据的 DirectByteBuffer
* @return 上传的 S3 对象键
*/
public String uploadDirectByteBufferToS3(String container, String objectKey, ByteBuffer directByteBuffer) {
BlobStoreContext context = getBlobStoreContext();
BlobStore blobStore = context.getBlobStore();
// 从 DirectByteBuffer 创建自定义的 ByteSource
ByteBufferByteSource byteSource = new ByteBufferByteSource(directByteBuffer);
// 使用 ByteSource 作为 payload,避免数据复制
Blob blob以上就是JNI DirectByteBuffer 数据免复制上传 S3 的高效策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号