在python中实现基于对比学习的异常表示学习,核心步骤包括数据增强、模型构建、对比损失定义、训练以及异常评分。1. 数据增强:通过生成每个样本的多个增强版本,保留语义信息并引入扰动,例如图像数据使用随机裁剪、颜色抖动等方法。2. 模型构建:模型由编码器和投影头组成,编码器提取高维特征,投影头将特征映射到低维嵌入空间。3. 对比损失定义:使用infonce loss(nt-xent loss),最大化正样本对之间一致性,最小化正样本对与负样本对之间一致性。4. 训练:使用无标签的正常数据进行训练,优化模型参数,使正常数据在嵌入空间中聚集。5. 异常评分:通过计算新样本与正常样本集群的距离、k-近邻距离或使用单类分类器进行异常检测,距离越大或密度越低则越可能是异常。

在Python中实现基于对比学习的异常表示学习,核心在于构建一个深度学习模型,通过对比损失函数(如InfoNCE Loss)来学习数据的低维、高信息量表示。这个过程通常是自监督的,模型会学习如何区分一个样本的不同增强版本(正样本对)与其他随机样本(负样本对),从而使得正常数据点在嵌入空间中紧密聚集,而异常点则因为其独特性而被推离这个“正常”集群,变得容易识别。

要实现基于对比学习的异常表示学习,我们通常会遵循以下步骤:数据增强、模型构建、对比损失定义、训练以及异常评分。
1. 数据增强 (Augmentation) 这是对比学习的基石。对于每个原始数据点,我们生成至少两个不同的“视图”或增强版本。这些增强应能保留原始样本的语义信息,同时引入一定的扰动。

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import numpy as np
# 假设是图像数据,定义一个简单的增强策略
class TwoCropsTransform:
def __init__(self, base_transform):
self.base_transform = base_transform
def __call__(self, x):
q = self.base_transform(x)
k = self.base_transform(x)
return q, k
# 基础图像增强,可以根据实际任务调整
base_transform = transforms.Compose([
transforms.RandomResizedCrop(224, scale=(0.2, 1.)),
transforms.RandomApply([
transforms.ColorJitter(0.4, 0.4, 0.4, 0.1) # not strengthened
], p=0.8),
transforms.RandomGrayscale(p=0.2),
transforms.RandomApply([transforms.GaussianBlur((23, 23), sigma=(0.1, 2.0))], p=0.5),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 对于非图像数据,需要自定义相应的增强函数
def text_augment(text):
# 示例:简单地复制两次,实际应有更复杂的NLP增强
return text + "_aug1", text + "_aug2"2. 模型构建 (Model Architecture) 通常包含一个编码器(Encoder)和一个投影头(Projection Head)。
class Encoder(nn.Module):
def __init__(self, in_channels=3, feature_dim=128):
super(Encoder, self).__init__()
# 示例:一个简单的CNN编码器,可以替换为ResNet, BERT等
self.conv1 = nn.Conv2d(in_channels, 64, kernel_size=3, stride=2, padding=1)
self.relu = nn.ReLU()
self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
self.flatten = nn.Flatten()
# 假设输入是224x224,经过两层conv+pool后,特征维度需要计算
# (224/2/2) * (224/2/2) * 128 = 56*56*128 = 401408
self.fc = nn.Linear(128 * (224 // 4 // 2) * (224 // 4 // 2), feature_dim) # 调整以匹配实际输出维度
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = self.flatten(x)
x = self.fc(x)
return x
class ProjectionHead(nn.Module):
def __init__(self, in_dim, out_dim):
super(ProjectionHead, self).__init__()
self.fc1 = nn.Linear(in_dim, in_dim)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(in_dim, out_dim)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
class ContrastiveModel(nn.Module):
def __init__(self, encoder, projection_head_dim=128):
super(ContrastiveModel, self).__init__()
self.encoder = encoder
# 假设encoder的输出维度是feature_dim
self.projection_head = ProjectionHead(encoder.fc.out_features, projection_head_dim)
def forward(self, x):
features = self.encoder(x)
projections = self.projection_head(features)
# L2归一化,这对对比学习非常重要
projections = F.normalize(projections, dim=1)
return features, projections3. 对比损失定义 (Contrastive Loss) 最常用的是InfoNCE Loss(也称为NT-Xent Loss)。它旨在最大化正样本对之间的一致性,同时最小化正样本对与负样本对之间的一致性。

class NTXentLoss(nn.Module):
def __init__(self, temperature=0.07):
super(NTXentLoss, self).__init__()
self.temperature = temperature
def forward(self, z_i, z_j):
"""
z_i, z_j: 两个增强视图的投影特征,形状为 (batch_size, projection_dim)
"""
batch_size = z_i.size(0)
# 合并所有样本的投影,用于计算所有对之间的相似度
z = torch.cat([z_i, z_j], dim=0) # shape (2*batch_size, projection_dim)
# 计算余弦相似度矩阵
sim_matrix = F.cosine_similarity(z.unsqueeze(1), z.unsqueeze(0), dim=2) # shape (2*batch_size, 2*batch_size)
# 移除对角线上的自相似度(因为一个样本和它自己不是正样本对)
logits = sim_matrix / self.temperature
# 构建标签:对角线上的元素是正样本对,其余是负样本对
# 例如,对于 batch_size=2, z_i=[z_i1, z_i2], z_j=[z_j1, z_j2]
# z = [z_i1, z_i2, z_j1, z_j2]
# 那么正样本对是 (z_i1, z_j1) 和 (z_i2, z_j2)
# 它们在 sim_matrix 中的索引是 (0, 2) 和 (1, 3)
# 还有 (z_j1, z_i1) 和 (z_j2, z_i2)
# 它们在 sim_matrix 中的索引是 (2, 0) 和 (3, 1)
labels = torch.arange(batch_size, device=z_i.device)
labels = torch.cat([labels, labels + batch_size], dim=0) # [0,1,2,3] for batch_size=2
# 排除自相似项
mask = torch.eye(labels.shape[0], dtype=torch.bool, device=z_i.device)
logits = logits[~mask].view(labels.shape[0], -1) # 移除对角线元素
# 调整标签以匹配新的logits形状
# 对于每个 z_i 的正样本是 z_j,对于每个 z_j 的正样本是 z_i
# 在新的 logits 矩阵中,每个样本的“正样本”位于特定的位置
# 例如,z_i[0] 的正样本是 z_j[0],它在 sim_matrix 中是 sim_matrix[0, batch_size]
# 在 logits_mask_self 中,它会是第 batch_size - 1 个元素
pos_mask = torch.zeros_like(sim_matrix, dtype=torch.bool, device=z_i.device)
pos_mask[torch.arange(batch_size), torch.arange(batch_size) + batch_size] = True
pos_mask[torch.arange(batch_size) + batch_size, torch.arange(batch_size)] = True
pos_mask = pos_mask[~mask].view(labels.shape[0], -1) # 移除自相似后,正样本的位置
# 收集正样本的logits
positive_logits = logits[pos_mask].view(labels.shape[0], 1)
# 计算LogSumExp,用于分母
log_prob = positive_logits - torch.logsumexp(logits, dim=1, keepdim=True)
loss = -log_prob.mean()
return loss4. 训练 (Training Loop) 使用大量无标签的正常数据进行训练。目标是让模型学习到正常数据的紧凑表示。
from torch.utils.data import DataLoader, Dataset
import random
# 假设你有一个包含正常数据的Dataset
class NormalDataset(Dataset):
def __init__(self, data_list, transform=None):
self.data_list = data_list # list of image paths or actual data
self.transform = transform
def __len__(self):
return len(self.data_list)
def __getitem__(self, idx):
# 假设data_list是PIL Image对象列表
img = self.data_list[idx]
if self.transform:
return self.transform(img)
return img
# 模拟一些正常数据
dummy_normal_data = [Image.new('RGB', (224, 224), color = (i, i, i)) for i in range(100)]
normal_dataset = NormalDataset(dummy_normal_data, transform=TwoCropsTransform(base_transform))
normal_dataloader = DataLoader(normal_dataset, batch_size=32, shuffle=True)
# 初始化模型和优化器
encoder = Encoder()
model = ContrastiveModel(encoder)
criterion = NTXentLoss(temperature=0.5)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# 训练过程
num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print("开始训练对比学习模型...")
for epoch in range(num_epochs):
model.train()
total_loss = 0
for (img1, img2) in normal_dataloader:
img1 = img1.to(device)
img2 = img2.to(device)
_, proj1 = model(img1)
_, proj2 = model(img2)
loss = criterion(proj1, proj2)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(normal_dataloader):.4f}")
print("训练完成。")5. 异常评分 (Anomaly Scoring) 训练完成后,模型学会了将正常样本映射到嵌入空间的一个特定区域。对于新的、未见过的样本,我们可以计算其嵌入与正常样本集群的距离,或者利用其在嵌入空间中的密度。
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import IsolationForest
# 假设模型已经训练好
model.eval()
# 1. 提取所有正常训练样本的嵌入
normal_embeddings = []
with torch.no_grad():
for (img1, img2) in normal_dataloader: # 只需要一个视图来提取特征
img1 = img1.to(device)
features, _ = model(img1)
normal_embeddings.append(features.cpu().numpy())
normal_embeddings = np.concatenate(normal_embeddings, axis=0)
# 2. 训练一个异常检测模型在这些嵌入上
# 这里我们使用Isolation Forest作为示例,它对高维数据效果不错
iso_forest = IsolationForest(contamination='auto', random_state=42)
iso_forest.fit(normal_embeddings)
# 3. 评估或预测新样本
# 假设你有新的测试数据,其中包含正常和异常样本
# dummy_test_normal_data = [Image.new('RGB', (224, 224), color = (i, i, i)) for i in range(10)]
# dummy_test_anomaly_data = [Image.new('RGB', (224, 224), color = (255-i, 0, 0)) for i in range(5)] # 模拟异常
# test_data = dummy_test_normal_data + dummy_test_anomaly_data
# test_labels = [0]*len(dummy_test_normal_data) + [1]*len(dummy_test_anomaly_data) # 0 for normal, 1 for anomaly
# test_dataset = NormalDataset(test_data, transform=base_transform) # 只用一个视图
# test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# test_embeddings = []
# with torch.no_grad():
# for img in test_dataloader:
# img = img.to(device)
# features, _ = model(img)
# test_embeddings.append(features.cpu().numpy())
# test_embeddings = np.concatenate(test_embeddings, axis=0)
# anomaly_scores = -iso_forest.decision_function(test_embeddings) # Isolation Forest的decision_function越小越异常,所以取负号
# # ROC AUC 是一个常用的评估指标
# # roc_auc = roc_auc_score(test_labels, anomaly_scores)
# # print(f"测试集 ROC AUC: {roc_auc:.4f}")
print("\n异常检测模型已基于学习到的嵌入训练完成。")
print("新的样本以上就是如何用Python实现基于对比学习的异常表示学习?的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号