温馨提示×

pytorch服务器怎样处理数据并行

小樊
81
2024-12-26 23:33:03
栏目: 云计算

PyTorch服务器处理数据并行的关键在于使用torch.nn.DataParalleltorch.nn.parallel.DistributedDataParallel。以下是两种方法的简要说明和示例:

  1. 使用torch.nn.DataParallel

DataParallel可以将模型和数据并行化,以便在多个GPU上训练。首先,确保你有多个GPU设备,然后按照以下步骤操作:

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms

# 定义一个简单的模型
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(784, 10)

    def forward(self, x):
        return self.fc(x)

# 创建模型实例
model = SimpleModel()

# 使用DataParallel包装模型
if torch.cuda.device_count() > 1:
    print("Using", torch.cuda.device_count(), "GPUs")
    model = nn.DataParallel(model)

# 将模型放到GPU上
model.cuda()

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 数据预处理
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

# 加载数据集
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# 训练模型
for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

    print("Epoch", epoch, "Loss:", loss.item())
  1. 使用torch.nn.parallel.DistributedDataParallel

DistributedDataParallelDataParallel的扩展,支持多节点分布式训练。首先,确保你的系统配置正确,然后按照以下步骤操作:

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import torch.distributed as dist
import torch.multiprocessing as mp

def setup(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(784, 10)

    def forward(self, x):
        return self.fc(x)

def train(rank, world_size):
    setup(rank, world_size)

    model = SimpleModel()
    if torch.cuda.device_count() > 1:
        print("Using", torch.cuda.device_count(), "GPUs")
        model = nn.DataParallel(model)
    model.cuda(rank)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, sampler=train_sampler)

    for epoch in range(10):
        train_sampler.set_epoch(epoch)
        for data, target in train_loader:
            data, target = data.cuda(rank), target.cuda(rank)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

        print("Rank", rank, "Epoch", epoch, "Loss:", loss.item())

    cleanup()

def main():
    world_size = 4
    mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)

if __name__ == "__main__":
    main()

这个示例使用了nccl后端,但你也可以根据你的系统选择其他后端。注意,DistributedDataParallel需要更多的设置和配置,但它提供了更好的性能和扩展性。

0