Data Parallel (DP)

사용법

 ...
 model = resnet18().to(device)
 model = torch.nn.DataParallel(model)
 ...Copy Icon

장점

  • 위 예시처럼 매우 간단하게 사용이 가능하다

단점

  • 메모리 사용량 증가 : 각 GPU에서 모델의 복사본을 만들어 메모리를 사용하기 떄문에 GPU의 수가 증가할 수록 메모리 사용량이 증가
  • 통신 병목 현상 : 각 GPU에서 연산을 수행하고 연산 결과를 하나의 GPU로 모은 후에 모델을 업데이트 하기 때문에 GPU간 데이터를 복사하고 통신하는 데 시간이 소요된다. 또한, 하나의 GPU로 연산 결과를 모으기 때문에 GPU 수가 증가할 수록 하나의 GPU의 메모리 사용량이 증가해 효율적인 사용이 불가능하다.

참고

  1. train 할 때 DP를 사용했다면, inference할 때, 전이 학습 할 때에도 DP를 사용하여 모델을 불러와야 한다.
    • 또는 train 후 모델을 저장할 때 model의 module을 저장한다.
      • torch.save(model.module.state_dict(), save_path)
  2. 2개 이상의 GPU를 사용하고 있지만 DP를 사용했을 때 하나의 GPU에서만 동작한다면 다음을 시도해본다.
    1. device_ids를 정확히 명시해보기
      • os.environ["CUDA_VISIBLE_DEVICES"] = "0, 1"
    2. input가 하나의 tensor로 구성되어 있는지 확인하기 [참고]
  3. GPU 연산 결과를 기본적으로 0번 device에 모으는데, 이를 변경할 수 있다.
    • model = torch.nn.DataParallel(model, output_device=1)

Distributed Data Parallel (DDP)

분산 학습으로, 여러 컴퓨터(서버, 머신)의 여러 GPU를 사용하여 모델을 학습합니다.

하나의 컴퓨터의 여러 GPU를 분산 학습 할 수도 있으며, DP의 단점을 해결한 대신 코드 세팅이 복잡하다는 단점이 있습니다.

사용 전 용어 정리

1. torch.distributed.init_process_group

# init_process_group
torch.distributed.init_process_group(backend='nccl',
                                     init_method='tcp://127.0.0.1:12355',
                                     world_size=args.ngpus_per_node,
                                     rank=args.rank)Copy Icon

분산 학습을 위한 초기화 함수로, 각 프로세스마다 호출되어야 하고, 분산 학습을 위해 필요한 설정이 완료된 후에만 다음 단계로 진행할 수 있습니다.

  • backend : 사용할 분산 처리 backend
    • GPU traning : 'nccl'
    • CPU training : 'gloo'
  • init_method : 초기화 방법으로 'nccl' backend에 단일 머신 다중 GPU 학습 시 'tcp://localhost:[사용할 port]'로 지정
  • world_size : 전체 프로세스 개수 (단일 머신의 경우 GPU 개수)
  • rank : 현재 프로세스 id, 0부터 world_size - 1 까지의 값을 가진다.

2. DistributedSampler

train_sampler = DistributedSampler(dataset=train_dataset, shuffle=True)
batch_sampler_train = torch.utils.data.BatchSampler(train_sampler, opts.batch_size, drop_last=True)
train_loader = DataLoader(train_dataset, batch_sampler=batch_sampler_train, num_workers=opts.num_workers)Copy Icon
  • DistributedSampler는 분산 데이터 병렬학습(distributed data parallel training)시 각 프로세스가 미니배치를 나누어 학습할 데이터 샘플을 결정하는 역할
  • 일반적으로 각 프로세스는 전체 데이터셋을 고루 나누어 학습하지만, 이렇게 나누어 학습하는 경우 다른 프로세스에서 학습하는 데이터 샘플과 중복되는 경우가 발생할 수 있음
  • DistributedSampler는 데이터셋의 각 샘플에 대한 인덱스를 분산처리에 맞게 새로운 순서로 만들어주고, 해당 인덱스를 이용하여 프로세스들 간의 중복 없는 데이터 분배 가능

3. DistributedDataParallel

model = DistributedDataParallel(module=model, device_ids=[local_gpu_id])Copy Icon
  • 각각의 GPU에 데이터와 모델을 분배
  • 각각의 GPU에서 계산된 Gradient들이 전체적으로 동기화되어 합쳐짐
  • 이를 통해 각 GPU에서 계산된 Gradient가 Master GPU에서 처리되어 업데이트

4. torch.multiprocessing.spawn

def train(rank, world_size):
    # 분산 학습 코드 작성
    pass

if __name__ == '__main__':
    args.ngpus_per_node = torch.cuda.device_count()
    args.gpu_ids = list(range(args.ngpus_per_node))
    args.num_workers = 16
    
    torch.multiprocessing.spawn(train,
            args=(args,),
            nprocs=args.ngpus_per_node,
            join=True)Copy Icon

분산 학습을 위한 프로세스 그룹을 생성하는 함수로, 분산 학습을 위해 여러 프로세스를 실행하는 데 사용

  • fn (train) : 실행 할 함수. 함수의 첫 번째 파라미터는 rank
  • args : 함수에 전달할 인자
  • nprocs : 실행할 프로세스 개수

전체 코드 요약

import os
import time
import math
from argparse import ArgumentParser

import torch
from torch import cuda
from torch.utils.data import DataLoader
from torch.optim import lr_scheduler
from tqdm import tqdm

import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data.distributed import DistributedSampler


def parse_args():
    parser = ArgumentParser()

    # Conventional args
	...
    
    args = parser.parse_args()
    return args
    
def train(rank, args):
    init_distributed(rank, args)
    local_gpu_id = args.gpu

    dataset = CustomtDataset(...)
    # sampler
    sampler = DistributedSampler(dataset, shuffle=True)
    batch_sampler = torch.utils.data.BatchSampler(sampler, args.batch_size, drop_last=True)
    train_loader = DataLoader(
        dataset,
        batch_size=args.batch_size,
        shuffle=False,
        num_workers=args.num_workers,
        pin_memory=True,
        batch_sampler=batch_sampler # sampler
    )

    model = MyModel()
    model.cuda(local_gpu_id)    
    model = DistributedDataParallel(model, device_ids=[local_gpu_id])

    optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate)
    scheduler = lr_scheduler.MultiStepLR(optimizer, milestones=[args.max_epoch // 2], gamma=0.1)
	
    ...
    
    model.train()
    for epoch in range(args.max_epoch):
        # ddp sampler set epoch
        sampler.set_epoch(epoch)
        
        ...
        
def init_distributed(rank, args):
    # 1. setting for distributed training
    args.rank = rank
    args.gpu = args.rank % torch.cuda.device_count()
    local_gpu_id = int(args.gpu_ids[args.rank])
    torch.cuda.set_device(local_gpu_id)
    
    if args.rank is not None:
        print("Use GPU: {} for training".format(local_gpu_id))

    # 2. init_process_group
    torch.distributed.init_process_group(backend='nccl',
                                         init_method='tcp://127.0.0.1:12355',
                                         world_size=args.ngpus_per_node,
                                         rank=args.rank)

    # if put this function, the all processes block at all.
    # ? : https://stackoverflow.com/questions/59760328/how-does-torch-distributed-barrier-work
    torch.distributed.barrier()

if __name__ == '__main__':
    args = parse_args()

    args.ngpus_per_node = torch.cuda.device_count()
    args.gpu_ids = list(range(args.ngpus_per_node))
    args.num_workers = 16
    torch.multiprocessing.spawn(train,
                args=(args,),
                nprocs=args.ngpus_per_node,
                join=True)Copy Icon

 

참고

'ML & DL > PyTorch' 카테고리의 다른 글

[PyTorch] torchvision & transform  (0) 2023.03.15
[PyTorch] Dataset & DataLoader  (0) 2023.03.15
[PyTorch] nn.Module  (0) 2023.03.15
[PyTorch] torch.nn  (0) 2023.03.15
[PyTorch] Optimization, 최적화  (0) 2023.03.13
욱근욱