mirror of
https://github.com/saymrwulf/pytorch.git
synced 2026-05-15 21:00:47 +00:00
Summary: This PR stop common_utils.py from setting the default tensor type when it's imported. See issue https://github.com/pytorch/pytorch/issues/27355. This is a frequent source of confusion for test writers. Many tests relied on this setting (whether they knew it or not), and this PR also updates the test suite to pass without common_utils.py setting the default tensor type. Some larger test files now set the default floating dtype themselves, however. These test files are: - test_autograd.py - test_distributions.py - test_jit.py - test_nn.py This is still a significant improvement from today, however. First, these files set the default floating dtype much more clearly than importing it from common_utils. Second, the rest of the test suite no longer sets this globally. Third, this PR is a springboard to updating those tests, too. In particular, as tests are made generic they can be moved aways from relying on this global setting. Notable technical changes in this PR are: - Significant updates to test_torch.py to make it pass without setting the default floating dtype globally. - The default_floating_dtype decorator is now defined in common_utils, a couple versions of this operator were defined in test files previously. - test_torch-specific parts of common_utils were refactored into test_torch. - tensor creation methods in common_utils were updated to accept an optional dtype and device. Pull Request resolved: https://github.com/pytorch/pytorch/pull/27444 Differential Revision: D17795235 Pulled By: mruberry fbshipit-source-id: 7f77271c0c836e69f183ad9057a2c4b29f09d2e1
1497 lines
67 KiB
Python
1497 lines
67 KiB
Python
import warnings
|
|
import math
|
|
import unittest
|
|
import functools
|
|
from copy import deepcopy
|
|
import torch
|
|
from torch._six import inf
|
|
import torch.optim as optim
|
|
import torch.nn.functional as F
|
|
from torch.optim import SGD
|
|
from torch.autograd import Variable
|
|
from torch import sparse
|
|
from torch.optim.lr_scheduler import LambdaLR, StepLR, \
|
|
MultiStepLR, ExponentialLR, CosineAnnealingLR, ReduceLROnPlateau, \
|
|
_LRScheduler, CyclicLR, CosineAnnealingWarmRestarts, OneCycleLR
|
|
from common_utils import TestCase, run_tests, TEST_WITH_UBSAN, load_tests, \
|
|
skipIfRocm
|
|
|
|
# load_tests from common_utils is used to automatically filter tests for
|
|
# sharding on sandcastle. This line silences flake warnings
|
|
load_tests = load_tests
|
|
|
|
|
|
def rosenbrock(tensor):
|
|
x, y = tensor
|
|
return (1 - x) ** 2 + 100 * (y - x ** 2) ** 2
|
|
|
|
|
|
def drosenbrock(tensor):
|
|
x, y = tensor
|
|
return torch.Tensor((-400 * x * (y - x ** 2) - 2 * (1 - x), 200 * (y - x ** 2)))
|
|
|
|
|
|
class TestOptim(TestCase):
|
|
def _test_rosenbrock_sparse(self, constructor, scheduler_constructors=None,
|
|
sparse_only=False):
|
|
if scheduler_constructors is None:
|
|
scheduler_constructors = []
|
|
params_t = torch.Tensor([1.5, 1.5])
|
|
|
|
params = Variable(params_t, requires_grad=True)
|
|
optimizer = constructor([params])
|
|
schedulers = []
|
|
for scheduler_constructor in scheduler_constructors:
|
|
schedulers.append(scheduler_constructor(optimizer))
|
|
|
|
if not sparse_only:
|
|
params_c = Variable(params_t.clone(), requires_grad=True)
|
|
optimizer_c = constructor([params_c])
|
|
|
|
solution = torch.Tensor([1, 1])
|
|
initial_dist = params.data.dist(solution)
|
|
|
|
def eval(params, sparse_grad, w):
|
|
# Depending on w, provide only the x or y gradient
|
|
optimizer.zero_grad()
|
|
loss = rosenbrock(params)
|
|
loss.backward()
|
|
grad = drosenbrock(params.data)
|
|
# NB: We torture test the optimizer by returning an
|
|
# uncoalesced sparse tensor
|
|
if w:
|
|
i = torch.LongTensor([[0, 0]])
|
|
x = grad[0]
|
|
v = torch.Tensor([x / 4., x - x / 4.])
|
|
else:
|
|
i = torch.LongTensor([[1, 1]])
|
|
y = grad[1]
|
|
v = torch.Tensor([y - y / 4., y / 4.])
|
|
x = sparse.DoubleTensor(i, v, torch.Size([2])).to(dtype=v.dtype)
|
|
with torch.no_grad():
|
|
if sparse_grad:
|
|
params.grad = x
|
|
else:
|
|
params.grad = x.to_dense()
|
|
return loss
|
|
|
|
for i in range(2000):
|
|
# Do cyclic coordinate descent
|
|
w = i % 2
|
|
optimizer.step(functools.partial(eval, params, True, w))
|
|
for scheduler in schedulers:
|
|
if isinstance(scheduler, ReduceLROnPlateau):
|
|
scheduler.step(rosenbrock(params))
|
|
else:
|
|
scheduler.step()
|
|
if not sparse_only:
|
|
optimizer_c.step(functools.partial(eval, params_c, False, w))
|
|
self.assertEqual(params.data, params_c.data)
|
|
|
|
self.assertLessEqual(params.data.dist(solution), initial_dist)
|
|
|
|
def _test_basic_cases_template(self, weight, bias, input, constructor, scheduler_constructors):
|
|
weight = Variable(weight, requires_grad=True)
|
|
bias = Variable(bias, requires_grad=True)
|
|
input = Variable(input)
|
|
optimizer = constructor(weight, bias)
|
|
schedulers = []
|
|
for scheduler_constructor in scheduler_constructors:
|
|
schedulers.append(scheduler_constructor(optimizer))
|
|
|
|
# to check if the optimizer can be printed as a string
|
|
optimizer.__repr__()
|
|
|
|
def fn():
|
|
optimizer.zero_grad()
|
|
y = weight.mv(input)
|
|
if y.is_cuda and bias.is_cuda and y.get_device() != bias.get_device():
|
|
y = y.cuda(bias.get_device())
|
|
loss = (y + bias).pow(2).sum()
|
|
loss.backward()
|
|
return loss
|
|
|
|
initial_value = fn().item()
|
|
for _i in range(200):
|
|
for scheduler in schedulers:
|
|
if isinstance(scheduler, ReduceLROnPlateau):
|
|
val_loss = fn()
|
|
scheduler.step(val_loss)
|
|
else:
|
|
scheduler.step()
|
|
optimizer.step(fn)
|
|
self.assertLess(fn().item(), initial_value)
|
|
|
|
def _test_state_dict(self, weight, bias, input, constructor):
|
|
weight = Variable(weight, requires_grad=True)
|
|
bias = Variable(bias, requires_grad=True)
|
|
input = Variable(input)
|
|
|
|
def fn_base(optimizer, weight, bias):
|
|
optimizer.zero_grad()
|
|
i = input_cuda if weight.is_cuda else input
|
|
loss = (weight.mv(i) + bias).pow(2).sum()
|
|
loss.backward()
|
|
return loss
|
|
|
|
optimizer = constructor(weight, bias)
|
|
fn = functools.partial(fn_base, optimizer, weight, bias)
|
|
|
|
# Prime the optimizer
|
|
for _i in range(20):
|
|
optimizer.step(fn)
|
|
# Clone the weights and construct new optimizer for them
|
|
weight_c = Variable(weight.data.clone(), requires_grad=True)
|
|
bias_c = Variable(bias.data.clone(), requires_grad=True)
|
|
optimizer_c = constructor(weight_c, bias_c)
|
|
fn_c = functools.partial(fn_base, optimizer_c, weight_c, bias_c)
|
|
# Load state dict
|
|
state_dict = deepcopy(optimizer.state_dict())
|
|
state_dict_c = deepcopy(optimizer.state_dict())
|
|
optimizer_c.load_state_dict(state_dict_c)
|
|
# Run both optimizations in parallel
|
|
for _i in range(20):
|
|
optimizer.step(fn)
|
|
optimizer_c.step(fn_c)
|
|
self.assertEqual(weight, weight_c)
|
|
self.assertEqual(bias, bias_c)
|
|
# Make sure state dict wasn't modified
|
|
self.assertEqual(state_dict, state_dict_c)
|
|
|
|
# Check that state dict can be loaded even when we cast parameters
|
|
# to a different type and move to a different device.
|
|
if not torch.cuda.is_available():
|
|
return
|
|
|
|
input_cuda = Variable(input.data.float().cuda())
|
|
weight_cuda = Variable(weight.data.float().cuda(), requires_grad=True)
|
|
bias_cuda = Variable(bias.data.float().cuda(), requires_grad=True)
|
|
optimizer_cuda = constructor(weight_cuda, bias_cuda)
|
|
fn_cuda = functools.partial(fn_base, optimizer_cuda, weight_cuda, bias_cuda)
|
|
|
|
state_dict = deepcopy(optimizer.state_dict())
|
|
state_dict_c = deepcopy(optimizer.state_dict())
|
|
optimizer_cuda.load_state_dict(state_dict_c)
|
|
|
|
# Make sure state dict wasn't modified
|
|
self.assertEqual(state_dict, state_dict_c)
|
|
|
|
for _i in range(20):
|
|
optimizer.step(fn)
|
|
optimizer_cuda.step(fn_cuda)
|
|
self.assertEqual(weight, weight_cuda)
|
|
self.assertEqual(bias, bias_cuda)
|
|
|
|
# validate deepcopy() copies all public attributes
|
|
def getPublicAttr(obj):
|
|
return set(k for k in obj.__dict__ if not k.startswith('_'))
|
|
self.assertEqual(getPublicAttr(optimizer), getPublicAttr(deepcopy(optimizer)))
|
|
|
|
def _test_basic_cases(self, constructor, scheduler_constructors=None,
|
|
ignore_multidevice=False):
|
|
if scheduler_constructors is None:
|
|
scheduler_constructors = []
|
|
self._test_state_dict(
|
|
torch.randn(10, 5),
|
|
torch.randn(10),
|
|
torch.randn(5),
|
|
constructor
|
|
)
|
|
self._test_basic_cases_template(
|
|
torch.randn(10, 5),
|
|
torch.randn(10),
|
|
torch.randn(5),
|
|
constructor,
|
|
scheduler_constructors
|
|
)
|
|
# non-contiguous parameters
|
|
self._test_basic_cases_template(
|
|
torch.randn(10, 5, 2)[..., 0],
|
|
torch.randn(10, 2)[..., 0],
|
|
torch.randn(5),
|
|
constructor,
|
|
scheduler_constructors
|
|
)
|
|
# CUDA
|
|
if not torch.cuda.is_available():
|
|
return
|
|
self._test_basic_cases_template(
|
|
torch.randn(10, 5).cuda(),
|
|
torch.randn(10).cuda(),
|
|
torch.randn(5).cuda(),
|
|
constructor,
|
|
scheduler_constructors
|
|
)
|
|
# Multi-GPU
|
|
if not torch.cuda.device_count() > 1 or ignore_multidevice:
|
|
return
|
|
self._test_basic_cases_template(
|
|
torch.randn(10, 5).cuda(0),
|
|
torch.randn(10).cuda(1),
|
|
torch.randn(5).cuda(0),
|
|
constructor,
|
|
scheduler_constructors
|
|
)
|
|
|
|
def _build_params_dict(self, weight, bias, **kwargs):
|
|
return [{'params': [weight]}, dict(params=[bias], **kwargs)]
|
|
|
|
def _build_params_dict_single(self, weight, bias, **kwargs):
|
|
return [dict(params=bias, **kwargs)]
|
|
|
|
def test_sgd(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.SGD([weight, bias], lr=1e-3)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.SGD(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.SGD(
|
|
self._build_params_dict_single(weight, bias, lr=1e-2),
|
|
lr=1e-3)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.SGD(
|
|
self._build_params_dict_single(weight, bias, lr=1e-2))
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.SGD([weight, bias], lr=1e-3),
|
|
[lambda opt: StepLR(opt, gamma=0.9, step_size=10)]
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.SGD([weight, bias], lr=1e-3),
|
|
[lambda opt: StepLR(opt, gamma=0.9, step_size=10),
|
|
lambda opt: ReduceLROnPlateau(opt)]
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.SGD([weight, bias], lr=1e-3),
|
|
[lambda opt: StepLR(opt, gamma=0.99, step_size=10),
|
|
lambda opt: ExponentialLR(opt, gamma=0.99),
|
|
lambda opt: ReduceLROnPlateau(opt)]
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid momentum value: -0.5"):
|
|
optim.SGD(None, lr=1e-2, momentum=-0.5)
|
|
|
|
def test_sgd_sparse(self):
|
|
self._test_rosenbrock_sparse(
|
|
lambda params: optim.SGD(params, lr=5e-3)
|
|
)
|
|
self._test_rosenbrock_sparse(
|
|
lambda params: optim.SGD(params, lr=0.005),
|
|
[lambda opt: StepLR(opt, gamma=0.99999, step_size=300)]
|
|
)
|
|
|
|
def test_adam(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adam([weight, bias], lr=1e-3)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adam(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adam([weight, bias], lr=1e-3,
|
|
amsgrad=True)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adam(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3, amsgrad=True)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adam(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3),
|
|
[lambda opt: ExponentialLR(opt, gamma=0.9)]
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adam([weight, bias], lr=1e-3,
|
|
amsgrad=True),
|
|
[lambda opt: ExponentialLR(opt, gamma=0.9),
|
|
lambda opt: ReduceLROnPlateau(opt)]
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adam(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3, amsgrad=True),
|
|
[lambda opt: StepLR(opt, gamma=0.9, step_size=10),
|
|
lambda opt: ReduceLROnPlateau(opt)]
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid beta parameter at index 0: 1.0"):
|
|
optim.Adam(None, lr=1e-2, betas=(1.0, 0.0))
|
|
|
|
def test_adamw(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.AdamW([weight, bias], lr=1e-3)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.AdamW(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3)
|
|
)
|
|
|
|
def test_sparse_adam(self):
|
|
self._test_rosenbrock_sparse(
|
|
lambda params: optim.SparseAdam(params, lr=4e-2),
|
|
[],
|
|
True
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid beta parameter at index 0: 1.0"):
|
|
optim.SparseAdam(None, lr=1e-2, betas=(1.0, 0.0))
|
|
|
|
# ROCm precision is too low to pass this test
|
|
@skipIfRocm
|
|
def test_adadelta(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adadelta([weight, bias])
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adadelta(
|
|
self._build_params_dict(weight, bias, rho=0.95))
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adadelta(
|
|
self._build_params_dict(weight, bias, rho=0.95)),
|
|
[lambda opt: StepLR(opt, gamma=0.9, step_size=10),
|
|
lambda opt: ReduceLROnPlateau(opt)]
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid rho value: 1.1"):
|
|
optim.Adadelta(None, lr=1e-2, rho=1.1)
|
|
|
|
def test_adagrad(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adagrad([weight, bias], lr=1e-1)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adagrad([weight, bias], lr=1e-1,
|
|
initial_accumulator_value=0.1)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adagrad(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-1)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adagrad(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-1),
|
|
[lambda opt: ReduceLROnPlateau(opt)]
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adagrad(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-1),
|
|
[lambda opt: ReduceLROnPlateau(opt),
|
|
lambda opt: ExponentialLR(opt, gamma=0.99)]
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid lr_decay value: -0.5"):
|
|
optim.Adagrad(None, lr=1e-2, lr_decay=-0.5)
|
|
|
|
def test_adagrad_sparse(self):
|
|
self._test_rosenbrock_sparse(
|
|
lambda params: optim.Adagrad(params, lr=1e-1)
|
|
)
|
|
self._test_rosenbrock_sparse(
|
|
lambda params: optim.Adagrad(params, lr=0.1),
|
|
[lambda opt: StepLR(opt, gamma=1 - 1e-5, step_size=500),
|
|
lambda opt: ReduceLROnPlateau(opt, threshold=1e-4)]
|
|
)
|
|
|
|
def test_adamax(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adamax([weight, bias], lr=1e-1)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Adamax(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-1)
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid beta parameter at index 1: 1.0"):
|
|
optim.Adamax(None, lr=1e-2, betas=(0.0, 1.0))
|
|
|
|
def test_rmsprop(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.RMSprop([weight, bias], lr=1e-2)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.RMSprop(
|
|
self._build_params_dict(weight, bias, lr=1e-3),
|
|
lr=1e-2)
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid momentum value: -1.0"):
|
|
optim.RMSprop(None, lr=1e-2, momentum=-1.0)
|
|
|
|
def test_asgd(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.ASGD([weight, bias], lr=1e-3, t0=100)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.ASGD(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3, t0=100)
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid weight_decay value: -0.5"):
|
|
optim.ASGD(None, lr=1e-2, weight_decay=-0.5)
|
|
|
|
def test_rprop(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Rprop([weight, bias], lr=1e-3)
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.Rprop(
|
|
self._build_params_dict(weight, bias, lr=1e-2),
|
|
lr=1e-3)
|
|
)
|
|
with self.assertRaisesRegex(ValueError, "Invalid eta values: 1.0, 0.5"):
|
|
optim.Rprop(None, lr=1e-2, etas=(1.0, 0.5))
|
|
|
|
def test_lbfgs(self):
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.LBFGS([weight, bias]),
|
|
ignore_multidevice=True
|
|
)
|
|
self._test_basic_cases(
|
|
lambda weight, bias: optim.LBFGS([weight, bias], line_search_fn="strong_wolfe"),
|
|
ignore_multidevice=True
|
|
)
|
|
|
|
@unittest.skipIf(TEST_WITH_UBSAN, "division-by-zero error with UBSAN")
|
|
def test_lbfgs_return_type(self):
|
|
params = [torch.randn(10, 5), torch.randn(10)]
|
|
opt1 = optim.LBFGS(params, 0.01, tolerance_grad=inf)
|
|
opt2 = optim.LBFGS(params, 0.01, tolerance_grad=-inf)
|
|
|
|
def closure():
|
|
return torch.Tensor([10])
|
|
|
|
res1 = opt1.step(closure)
|
|
res2 = opt2.step(closure)
|
|
self.assertEqual(type(res1), type(res2))
|
|
|
|
def test_invalid_param_type(self):
|
|
with self.assertRaises(TypeError):
|
|
optim.SGD(Variable(torch.randn(5, 5)), lr=3)
|
|
|
|
|
|
class SchedulerTestNet(torch.nn.Module):
|
|
def __init__(self):
|
|
super(SchedulerTestNet, self).__init__()
|
|
self.conv1 = torch.nn.Conv2d(1, 1, 1)
|
|
self.conv2 = torch.nn.Conv2d(1, 1, 1)
|
|
|
|
def forward(self, x):
|
|
return self.conv2(F.relu(self.conv1(x)))
|
|
|
|
|
|
class LambdaLRTestObject:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def __call__(self, epoch):
|
|
return self.value * epoch
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, self.__class__):
|
|
return self.__dict__ == other.__dict__
|
|
else:
|
|
return False
|
|
|
|
|
|
class TestLRScheduler(TestCase):
|
|
def setUp(self):
|
|
super(TestLRScheduler, self).setUp()
|
|
self.net = SchedulerTestNet()
|
|
self.opt = SGD(
|
|
[{'params': self.net.conv1.parameters()}, {'params': self.net.conv2.parameters(), 'lr': 0.5}],
|
|
lr=0.05)
|
|
|
|
def test_no_cyclic_references(self):
|
|
import gc
|
|
param = Variable(torch.Tensor(10), requires_grad=True)
|
|
optim = SGD([param], lr=0.5)
|
|
scheduler = LambdaLR(optim, lambda epoch: 1.0)
|
|
del scheduler
|
|
|
|
# Prior to Python 3.7, local variables in a function will be referred by the current frame.
|
|
import sys
|
|
if sys.version_info < (3, 7):
|
|
import inspect
|
|
referrers = gc.get_referrers(optim)
|
|
self.assertTrue(
|
|
len(referrers) == 1 and referrers[0] is inspect.currentframe(),
|
|
"Optimizer should contain no cyclic references (except current frame)")
|
|
del referrers
|
|
else:
|
|
self.assertTrue(
|
|
len(gc.get_referrers(optim)) == 0,
|
|
"Optimizer should contain no cyclic references")
|
|
|
|
gc.collect()
|
|
del optim
|
|
self.assertEqual(
|
|
gc.collect(), 0, "Optimizer should be garbage-collected on __del__")
|
|
|
|
def test_old_pattern_warning(self):
|
|
epochs = 35
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
def old_pattern():
|
|
for _ in range(epochs):
|
|
scheduler.step()
|
|
self.opt.step()
|
|
|
|
self.assertWarnsRegex(old_pattern, r'how-to-adjust-learning-rate')
|
|
|
|
def test_old_pattern_warning_with_arg(self):
|
|
epochs = 35
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
def old_pattern2():
|
|
for _ in range(epochs):
|
|
scheduler.step()
|
|
self.opt.step()
|
|
|
|
self.assertWarnsRegex(old_pattern2, r'how-to-adjust-learning-rate')
|
|
|
|
def test_old_pattern_warning_resuming(self):
|
|
epochs = 35
|
|
for i, group in enumerate(self.opt.param_groups):
|
|
group['initial_lr'] = 0.01
|
|
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3, last_epoch=10)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
def old_pattern():
|
|
for _ in range(epochs):
|
|
scheduler.step()
|
|
self.opt.step()
|
|
|
|
self.assertWarnsRegex(old_pattern, r'how-to-adjust-learning-rate')
|
|
|
|
def test_old_pattern_warning_resuming_with_arg(self):
|
|
epochs = 35
|
|
for i, group in enumerate(self.opt.param_groups):
|
|
group['initial_lr'] = 0.01
|
|
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3, last_epoch=10)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
def old_pattern2():
|
|
for _ in range(epochs):
|
|
scheduler.step()
|
|
self.opt.step()
|
|
|
|
self.assertWarnsRegex(old_pattern2, r'how-to-adjust-learning-rate')
|
|
|
|
def test_old_pattern_warning_with_overridden_optim_step(self):
|
|
epochs = 35
|
|
for i, group in enumerate(self.opt.param_groups):
|
|
group['initial_lr'] = 0.01
|
|
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3, last_epoch=10)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
# emulate use-case with optimizer.step overridden
|
|
import types
|
|
|
|
old_step = self.opt.step
|
|
|
|
def new_step(o, *args, **kwargs):
|
|
retval = old_step(*args, **kwargs)
|
|
return retval
|
|
|
|
self.opt.step = types.MethodType(new_step, self.opt)
|
|
|
|
def old_pattern2():
|
|
for _ in range(epochs):
|
|
scheduler.step()
|
|
self.opt.step()
|
|
|
|
self.assertWarnsRegex(old_pattern2, r'how-to-adjust-learning-rate')
|
|
|
|
def test_new_pattern_no_warning(self):
|
|
epochs = 35
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
for _ in range(epochs):
|
|
self.opt.step()
|
|
scheduler.step()
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
def test_new_pattern_no_warning_with_arg(self):
|
|
epochs = 35
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
for _ in range(epochs):
|
|
self.opt.step()
|
|
scheduler.step()
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
def test_new_pattern_no_warning_with_overridden_optim_step(self):
|
|
epochs = 35
|
|
with warnings.catch_warnings(record=True) as ws:
|
|
warnings.simplefilter("always") # allow any warning to be raised
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self.assertTrue(len(ws) == 0, "No warning should be raised")
|
|
|
|
# emulate use-case with optimizer.step overridden
|
|
import types
|
|
|
|
old_step = self.opt.step
|
|
|
|
def new_step(o, *args, **kwargs):
|
|
retval = old_step(*args, **kwargs)
|
|
return retval
|
|
|
|
self.opt.step = types.MethodType(new_step, self.opt)
|
|
|
|
def new_pattern():
|
|
for e in range(epochs):
|
|
self.opt.step()
|
|
scheduler.step()
|
|
|
|
self.assertWarnsRegex(new_pattern, r'`optimizer.step\(\)` has been overridden')
|
|
|
|
def _test_lr_is_constant_for_constant_epoch(self, scheduler):
|
|
l = []
|
|
|
|
# warnings.filterwarnings("ignore", category=DeprecationWarning)
|
|
for _ in range(10):
|
|
scheduler.step(2)
|
|
l.append(self.opt.param_groups[0]['lr'])
|
|
self.assertAlmostEqual(min(l), max(l))
|
|
|
|
def test_step_lr_is_constant_for_constant_epoch(self):
|
|
scheduler = StepLR(self.opt, 2)
|
|
self._test_lr_is_constant_for_constant_epoch(scheduler)
|
|
|
|
def test_exponential_lr_is_constant_for_constant_epoch(self):
|
|
scheduler = ExponentialLR(self.opt, gamma=0.9)
|
|
self._test_lr_is_constant_for_constant_epoch(scheduler)
|
|
|
|
def test_step_lr(self):
|
|
# lr = 0.05 if epoch < 3
|
|
# lr = 0.005 if 30 <= epoch < 6
|
|
# lr = 0.0005 if epoch >= 9
|
|
epochs = 10
|
|
single_targets = [0.05] * 3 + [0.005] * 3 + [0.0005] * 3 + [0.00005] * 3
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self._test(scheduler, targets, epochs)
|
|
|
|
def test_get_last_lr_step_lr(self):
|
|
from torch.nn import Parameter
|
|
epochs = 10
|
|
optimizer = torch.optim.SGD([Parameter(torch.randn(2, 2, requires_grad=True))], 0.1)
|
|
targets = [[0.1] * 3 + [0.01] * 3 + [0.001] * 3 + [0.0001]]
|
|
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 3, gamma=0.1)
|
|
self._test_get_last_lr(scheduler, targets, epochs)
|
|
|
|
def test_get_last_lr_multi_step_lr(self):
|
|
# lr = 0.05 if epoch < 2
|
|
# lr = 0.005 if 2 <= epoch < 5
|
|
# lr = 0.0005 if 5 <= epoch < 9
|
|
# lr = 0.00005 if 9 <= epoch
|
|
epochs = 10
|
|
single_targets = [0.05] * 2 + [0.005] * 3 + [0.0005] * 4 + [0.00005] * 1
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
scheduler = MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9])
|
|
self._test_get_last_lr(scheduler, targets, epochs)
|
|
|
|
def test_multi_step_lr(self):
|
|
# lr = 0.05 if epoch < 2
|
|
# lr = 0.005 if 2 <= epoch < 5
|
|
# lr = 0.0005 if epoch < 9
|
|
# lr = 0.00005 if epoch >= 9
|
|
epochs = 10
|
|
single_targets = [0.05] * 2 + [0.005] * 3 + [0.0005] * 4 + [0.00005] * 3
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
scheduler = MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9])
|
|
self._test(scheduler, targets, epochs)
|
|
|
|
def test_exp_lr(self):
|
|
epochs = 10
|
|
single_targets = [0.05 * (0.9 ** x) for x in range(epochs)]
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
scheduler = ExponentialLR(self.opt, gamma=0.9)
|
|
self._test(scheduler, targets, epochs)
|
|
|
|
def test_cos_anneal_lr(self):
|
|
epochs = 10
|
|
eta_min = 1e-10
|
|
single_targets = [eta_min + (0.05 - eta_min) *
|
|
(1 + math.cos(math.pi * x / epochs)) / 2
|
|
for x in range(epochs)]
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
scheduler = CosineAnnealingLR(self.opt, T_max=epochs, eta_min=eta_min)
|
|
self._test(scheduler, targets, epochs)
|
|
|
|
def test_closed_form_step_lr(self):
|
|
scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
closed_form_scheduler = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self._test_against_closed_form(scheduler, closed_form_scheduler, 20)
|
|
|
|
def test_closed_form_multi_step_lr(self):
|
|
scheduler = MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9])
|
|
closed_form_scheduler = MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9])
|
|
self._test_against_closed_form(scheduler, closed_form_scheduler, 20)
|
|
|
|
def test_closed_form_exp_lr(self):
|
|
scheduler = ExponentialLR(self.opt, gamma=0.9)
|
|
closed_form_scheduler = ExponentialLR(self.opt, gamma=0.9)
|
|
self._test_against_closed_form(scheduler, closed_form_scheduler, 20)
|
|
|
|
def test_closed_form_cos_anneal_lr(self):
|
|
eta_min = 1e-10
|
|
epochs = 20
|
|
T_max = 5
|
|
scheduler = CosineAnnealingLR(self.opt, T_max=T_max, eta_min=eta_min)
|
|
closed_form_scheduler = CosineAnnealingLR(self.opt, T_max=T_max, eta_min=eta_min)
|
|
self._test_against_closed_form(scheduler, closed_form_scheduler, epochs)
|
|
|
|
def test_reduce_lr_on_plateau1(self):
|
|
epochs = 10
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * 20]
|
|
metrics = [10 - i * 0.0167 for i in range(20)]
|
|
scheduler = ReduceLROnPlateau(self.opt, threshold_mode='abs', mode='min',
|
|
threshold=0.01, patience=5, cooldown=5)
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_reduce_lr_on_plateau2(self):
|
|
epochs = 22
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * 6 + [0.05] * 7 + [0.005] * 7 + [0.0005] * 2]
|
|
metrics = [10 - i * 0.0165 for i in range(22)]
|
|
scheduler = ReduceLROnPlateau(self.opt, patience=5, cooldown=0, threshold_mode='abs',
|
|
mode='min', threshold=0.1)
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_reduce_lr_on_plateau3(self):
|
|
epochs = 22
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * (2 + 6) + [0.05] * (5 + 6) + [0.005] * 4]
|
|
metrics = [-0.8] * 2 + [-0.234] * 20
|
|
scheduler = ReduceLROnPlateau(self.opt, mode='max', patience=5, cooldown=5,
|
|
threshold_mode='abs')
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_reduce_lr_on_plateau4(self):
|
|
epochs = 20
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * 20]
|
|
metrics = [1.5 * (1.025 ** i) for i in range(20)] # 1.025 > 1.1**0.25
|
|
scheduler = ReduceLROnPlateau(self.opt, mode='max', patience=3,
|
|
threshold_mode='rel', threshold=0.1)
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_reduce_lr_on_plateau5(self):
|
|
epochs = 20
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * 6 + [0.05] * (5 + 6) + [0.005] * 4]
|
|
metrics = [1.5 * (1.005 ** i) for i in range(20)]
|
|
scheduler = ReduceLROnPlateau(self.opt, mode='max', threshold_mode='rel',
|
|
threshold=0.1, patience=5, cooldown=5)
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_reduce_lr_on_plateau6(self):
|
|
epochs = 20
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * 20]
|
|
metrics = [1.5 * (0.85 ** i) for i in range(20)]
|
|
scheduler = ReduceLROnPlateau(self.opt, mode='min', threshold_mode='rel',
|
|
threshold=0.1)
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_reduce_lr_on_plateau7(self):
|
|
epochs = 20
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * 6 + [0.05] * (5 + 6) + [0.005] * 4]
|
|
metrics = [1] * 7 + [0.6] + [0.5] * 12
|
|
scheduler = ReduceLROnPlateau(self.opt, mode='min', threshold_mode='rel',
|
|
threshold=0.1, patience=5, cooldown=5)
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_reduce_lr_on_plateau8(self):
|
|
epochs = 20
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
targets = [[0.5] * 6 + [0.4] * 14, [0.5] * 6 + [0.3] * 14]
|
|
metrics = [1.5 * (1.005 ** i) for i in range(20)]
|
|
scheduler = ReduceLROnPlateau(self.opt, mode='max', threshold_mode='rel', min_lr=[0.4, 0.3],
|
|
threshold=0.1, patience=5, cooldown=5)
|
|
self._test_reduce_lr_on_plateau(scheduler, targets, metrics, epochs)
|
|
|
|
def test_compound_step_and_multistep_lr(self):
|
|
epochs = 10
|
|
schedulers = [None] * 2
|
|
schedulers[0] = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
schedulers[1] = MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9])
|
|
targets = [[0.05] * 2 + [0.005] * 1 + [5e-4] * 2 + [5e-5] + [5e-6] * 3 + [5e-8]]
|
|
self._test(schedulers, targets, epochs)
|
|
|
|
def test_compound_step_and_exp_lr(self):
|
|
epochs = 10
|
|
schedulers = [None] * 2
|
|
single_targets = [0.05 * (0.9 ** x) for x in range(3)]
|
|
single_targets += [0.005 * (0.9 ** x) for x in range(3, 6)]
|
|
single_targets += [0.0005 * (0.9 ** x) for x in range(6, 9)]
|
|
single_targets += [0.00005 * (0.9 ** x) for x in range(9, 12)]
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
schedulers[0] = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
schedulers[1] = ExponentialLR(self.opt, gamma=0.9)
|
|
self._test(schedulers, targets, epochs)
|
|
|
|
def test_compound_exp_and_multistep_lr(self):
|
|
epochs = 10
|
|
schedulers = [None] * 2
|
|
single_targets = [0.05 * (0.9 ** x) for x in range(2)]
|
|
single_targets += [0.005 * (0.9 ** x) for x in range(2, 5)]
|
|
single_targets += [0.0005 * (0.9 ** x) for x in range(5, 9)]
|
|
single_targets += [0.00005 * (0.9 ** x) for x in range(9, 11)]
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
schedulers[0] = MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9])
|
|
schedulers[1] = ExponentialLR(self.opt, gamma=0.9)
|
|
self._test(schedulers, targets, epochs)
|
|
|
|
def test_compound_cosanneal_and_step_lr(self):
|
|
epochs = 10
|
|
eta_min = 1e-10
|
|
single_targets = [eta_min + (0.05 - eta_min) *
|
|
(1 + math.cos(math.pi * x / epochs)) / 2
|
|
for x in range(epochs)]
|
|
single_targets = [x * 0.1 ** (i // 3) for i, x in enumerate(single_targets)]
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
schedulers = [None] * 2
|
|
schedulers[0] = CosineAnnealingLR(self.opt, T_max=epochs, eta_min=eta_min)
|
|
schedulers[1] = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self._test(schedulers, targets, epochs)
|
|
|
|
def test_compound_cosanneal_and_multistep_lr(self):
|
|
epochs = 10
|
|
eta_min = 1e-10
|
|
single_targets = [eta_min + (0.05 - eta_min) *
|
|
(1 + math.cos(math.pi * x / epochs)) / 2
|
|
for x in range(epochs)]
|
|
multipliers = [1] * 2 + [0.1] * 3 + [0.01] * 4 + [0.001]
|
|
single_targets = [x * y for x, y in zip(single_targets, multipliers)]
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
schedulers = [None] * 2
|
|
schedulers[0] = CosineAnnealingLR(self.opt, T_max=epochs, eta_min=eta_min)
|
|
schedulers[1] = MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9])
|
|
self._test(schedulers, targets, epochs)
|
|
|
|
def test_compound_cosanneal_and_exp_lr(self):
|
|
epochs = 10
|
|
eta_min = 1e-10
|
|
single_targets = [eta_min + (0.05 - eta_min) *
|
|
(1 + math.cos(math.pi * x / epochs)) / 2
|
|
for x in range(epochs)]
|
|
multipliers = [0.1 ** i for i in range(epochs)]
|
|
single_targets = [x * y for x, y in zip(single_targets, multipliers)]
|
|
targets = [single_targets, list(map(lambda x: x * epochs, single_targets))]
|
|
schedulers = [None] * 2
|
|
schedulers[0] = CosineAnnealingLR(self.opt, T_max=epochs, eta_min=eta_min)
|
|
schedulers[1] = ExponentialLR(self.opt, gamma=0.1)
|
|
self._test(schedulers, targets, epochs)
|
|
|
|
def test_compound_reduce_lr_on_plateau1(self):
|
|
epochs = 10
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
single_targets = [0.5] * 20
|
|
multipliers = [0.1 ** (i // 3) for i in range(20)]
|
|
single_targets = [x * y for x, y in zip(multipliers, single_targets)]
|
|
targets = [single_targets]
|
|
targets = targets[1:] # test runs step before checking lr
|
|
metrics = [10 - i * 0.0167 for i in range(20)]
|
|
schedulers = [None, None]
|
|
schedulers[0] = ReduceLROnPlateau(self.opt, threshold_mode='abs', mode='min',
|
|
threshold=0.01, patience=5, cooldown=5)
|
|
schedulers[1] = StepLR(self.opt, gamma=0.1, step_size=3)
|
|
self._test_reduce_lr_on_plateau(schedulers, targets, metrics, epochs)
|
|
|
|
def test_compound_reduce_lr_on_plateau2(self):
|
|
epochs = 22
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
single_targets = [0.5] * 6 + [0.05] * 7 + [0.005] * 7 + [0.0005] * 2
|
|
multipliers = [1] * 3 + [0.1] * 5 + [0.01] * 4 + [0.001] * 10
|
|
single_targets = [x * y for x, y in zip(single_targets, multipliers)]
|
|
targets = [single_targets]
|
|
targets = targets[1:] # test runs step before checking lr
|
|
metrics = [10 - i * 0.0165 for i in range(22)]
|
|
schedulers = [None] * 2
|
|
schedulers[0] = ReduceLROnPlateau(self.opt, patience=5, cooldown=0, threshold_mode='abs',
|
|
mode='min', threshold=0.1)
|
|
schedulers[1] = MultiStepLR(self.opt, gamma=0.1, milestones=[3, 8, 12])
|
|
self._test_reduce_lr_on_plateau(schedulers, targets, metrics, epochs)
|
|
|
|
def test_compound_reduce_lr_on_plateau3(self):
|
|
epochs = 22
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.5
|
|
single_targets = [0.5] * (2 + 6) + [0.05] * (5 + 6) + [0.005] * 4
|
|
multipliers = [0.1 ** i for i in range(epochs)]
|
|
single_targets = [x * y for x, y in zip(multipliers, single_targets)]
|
|
targets = [single_targets]
|
|
targets = targets[1:] # test runs step before checking lr
|
|
metrics = [-0.8] * 2 + [-0.234] * 20
|
|
schedulers = [None, None]
|
|
schedulers[0] = ReduceLROnPlateau(self.opt, mode='max', patience=5, cooldown=5,
|
|
threshold_mode='abs')
|
|
schedulers[1] = ExponentialLR(self.opt, gamma=0.1)
|
|
self._test_reduce_lr_on_plateau(schedulers, targets, metrics, epochs)
|
|
|
|
def test_compound_reduce_lr_on_plateau4(self):
|
|
epochs = 20
|
|
for param_group in self.opt.param_groups:
|
|
param_group['lr'] = 0.05
|
|
epochs = 10
|
|
eta_min = 1e-10
|
|
single_targets = [eta_min + (0.05 - eta_min) *
|
|
(1 + math.cos(math.pi * x / epochs)) / 2
|
|
for x in range(epochs)]
|
|
targets = [single_targets]
|
|
targets = targets[1:] # test runs step before checking lr
|
|
metrics = [1.5 * (1.025 ** i) for i in range(20)] # 1.025 > 1.1**0.25
|
|
schedulers = [None, None]
|
|
schedulers[0] = ReduceLROnPlateau(self.opt, mode='max', patience=3,
|
|
threshold_mode='rel', threshold=0.1)
|
|
schedulers[1] = CosineAnnealingLR(self.opt, epochs, eta_min)
|
|
self._test_reduce_lr_on_plateau(schedulers, targets, metrics, epochs)
|
|
|
|
def test_cycle_lr_invalid_mode(self):
|
|
with self.assertRaises(ValueError):
|
|
scheduler = CyclicLR(self.opt, base_lr=0, max_lr=0, mode="CATS")
|
|
|
|
def test_cycle_lr_triangular_mode_one_lr(self):
|
|
lr_target = [1, 2, 3, 4, 5, 4, 3, 2, 1, 2, 3]
|
|
momentum_target = [5, 4, 3, 2, 1, 2, 3, 4, 5, 4, 3]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = CyclicLR(self.opt, base_lr=1, max_lr=5, step_size_up=4,
|
|
cycle_momentum=True, base_momentum=1, max_momentum=5,
|
|
mode='triangular')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target))
|
|
|
|
def test_cycle_lr_triangular_mode_one_lr_no_momentum(self):
|
|
lr_target = [1, 2, 3, 4, 5, 4, 3, 2, 1, 2, 3]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_target = [self.opt.defaults['momentum']] * len(lr_target)
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = CyclicLR(self.opt, base_lr=1, max_lr=5, step_size_up=4,
|
|
cycle_momentum=False, mode='triangular')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target))
|
|
|
|
def test_cycle_lr_triangular2_mode_one_lr(self):
|
|
lr_target = [1, 2, 3, 4, 5, 4, 3, 2, 1, 1.5, 2.0, 2.5, 3.0, 2.5, 2.0, 1.5,
|
|
1, 1.25, 1.50, 1.75, 2.00, 1.75]
|
|
momentum_target = [5.0, 4.0, 3.0, 2.0, 1.0, 2.0, 3.0, 4.0, 5.0, 4.5, 4.0,
|
|
3.5, 3.0, 3.5, 4.0, 4.5, 5.0, 4.75, 4.5, 4.25, 4.0, 4.25]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = CyclicLR(self.opt, base_lr=1, max_lr=5, step_size_up=4,
|
|
cycle_momentum=True, base_momentum=1, max_momentum=5,
|
|
mode='triangular2')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target))
|
|
|
|
def test_cycle_lr_exp_range_mode_one_lr(self):
|
|
base_lr, max_lr = 1, 5
|
|
diff_lr = max_lr - base_lr
|
|
gamma = 0.9
|
|
xs = [0, 0.25, 0.5, 0.75, 1, 0.75, 0.50, 0.25, 0, 0.25, 0.5, 0.75, 1]
|
|
lr_target = list(map(lambda x: base_lr + x[1] * diff_lr * gamma**x[0], enumerate(xs)))
|
|
momentum_target = list(map(lambda x: max_lr - x[1] * diff_lr * gamma**x[0], enumerate(xs)))
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = CyclicLR(self.opt, base_lr=base_lr,
|
|
max_lr=max_lr, step_size_up=4,
|
|
cycle_momentum=True, base_momentum=base_lr, max_momentum=max_lr,
|
|
mode='exp_range', gamma=gamma)
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target))
|
|
|
|
def test_cycle_lr_triangular_mode(self):
|
|
lr_target_1 = [1, 2, 3, 4, 5, 4, 3, 2, 1, 2, 3]
|
|
lr_target_2 = list(map(lambda x: x + 1, lr_target_1))
|
|
lr_targets = [lr_target_1, lr_target_2]
|
|
momentum_target_1 = [5, 4, 3, 2, 1, 2, 3, 4, 5, 4, 3]
|
|
momentum_target_2 = list(map(lambda x: x + 1, momentum_target_1))
|
|
momentum_targets = [momentum_target_1, momentum_target_2]
|
|
scheduler = CyclicLR(self.opt, base_lr=[1, 2], max_lr=[5, 6], step_size_up=4,
|
|
cycle_momentum=True, base_momentum=[1, 2], max_momentum=[5, 6],
|
|
mode='triangular')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target_1))
|
|
|
|
def test_cycle_lr_triangular2_mode(self):
|
|
lr_target_1 = [1, 2, 3, 4, 5, 4, 3, 2, 1, 1.5, 2.0, 2.5, 3.0, 2.5, 2.0, 1.5, 1,
|
|
1.25, 1.50, 1.75, 2.00, 1.75]
|
|
lr_target_2 = list(map(lambda x: x + 2, lr_target_1))
|
|
lr_targets = [lr_target_1, lr_target_2]
|
|
momentum_target_1 = [5.0, 4.0, 3.0, 2.0, 1.0, 2.0, 3.0, 4.0, 5.0, 4.5, 4.0, 3.5,
|
|
3.0, 3.5, 4.0, 4.5, 5.0, 4.75, 4.5, 4.25, 4.0, 4.25]
|
|
momentum_target_2 = list(map(lambda x: x + 2, momentum_target_1))
|
|
momentum_targets = [momentum_target_1, momentum_target_2]
|
|
scheduler = CyclicLR(self.opt, base_lr=[1, 3], max_lr=[5, 7], step_size_up=4,
|
|
cycle_momentum=True, base_momentum=[1, 3], max_momentum=[5, 7],
|
|
mode='triangular2')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target_1))
|
|
|
|
def test_cycle_lr_exp_range_mode(self):
|
|
base_lr_1, max_lr_1 = 1, 5
|
|
base_lr_2, max_lr_2 = 5, 12
|
|
|
|
diff_lr_1 = max_lr_1 - base_lr_1
|
|
diff_lr_2 = max_lr_2 - base_lr_2
|
|
|
|
gamma = 0.9
|
|
xs = [0, 0.25, 0.5, 0.75, 1, 0.75, 0.50, 0.25, 0, 0.25, 0.5, 0.75, 1]
|
|
lr_target_1 = list(map(lambda x: base_lr_1 + x[1] * diff_lr_1 * gamma**x[0], enumerate(xs)))
|
|
lr_target_2 = list(map(lambda x: base_lr_2 + x[1] * diff_lr_2 * gamma**x[0], enumerate(xs)))
|
|
lr_targets = [lr_target_1, lr_target_2]
|
|
momentum_target_1 = list(map(lambda x: max_lr_1 - x[1] * diff_lr_1 * gamma**x[0], enumerate(xs)))
|
|
momentum_target_2 = list(map(lambda x: max_lr_2 - x[1] * diff_lr_2 * gamma**x[0], enumerate(xs)))
|
|
momentum_targets = [momentum_target_1, momentum_target_2]
|
|
scheduler = CyclicLR(self.opt, base_lr=[base_lr_1, base_lr_2],
|
|
max_lr=[max_lr_1, max_lr_2], step_size_up=4,
|
|
cycle_momentum=True, base_momentum=[base_lr_1, base_lr_2],
|
|
max_momentum=[max_lr_1, max_lr_2],
|
|
mode='exp_range', gamma=gamma)
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target_1))
|
|
|
|
def test_cycle_lr_triangular_mode_step_size_up_down(self):
|
|
lr_target = [1.0, 2.0, 3.0, 4.0, 5.0, 13.0 / 3, 11.0 / 3, 9.0 / 3, 7.0 / 3, 5.0 / 3, 1.0]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_target = [5.0, 4.0, 3.0, 2.0, 1.0, 5.0 / 3, 7.0 / 3, 3.0, 11.0 / 3, 13.0 / 3, 5.0]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
|
|
scheduler = CyclicLR(self.opt, base_lr=1, max_lr=5,
|
|
step_size_up=4,
|
|
step_size_down=6,
|
|
cycle_momentum=True,
|
|
base_momentum=1, max_momentum=5,
|
|
mode='triangular')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target))
|
|
|
|
def test_cycle_lr_triangular2_mode_step_size_up_down(self):
|
|
lr_base_target = ([
|
|
1.0, 3.0, 5.0, 13.0 / 3, 11.0 / 3, 9.0 / 3, 7.0 / 3, 5.0 / 3, 1.0, 2.0, 3.0, 8.0 / 3,
|
|
7.0 / 3, 6.0 / 3, 5.0 / 3, 4.0 / 3, 1.0, 3.0 / 2, 2.0, 11.0 / 6, 10.0 / 6, 9.0 / 6,
|
|
8.0 / 6, 7.0 / 6
|
|
])
|
|
momentum_base_target = ([
|
|
5.0, 3.0, 1.0, 5.0 / 3, 7.0 / 3, 3.0, 11.0 / 3, 13.0 / 3, 5.0, 4.0, 3.0, 10.0 / 3,
|
|
11.0 / 3, 4.0, 13.0 / 3, 14.0 / 3, 5.0, 4.5, 4.0, 25.0 / 6, 13.0 / 3, 4.5, 14.0 / 3,
|
|
29.0 / 6
|
|
])
|
|
deltas = [2 * i for i in range(0, 2)]
|
|
base_lrs = [1 + delta for delta in deltas]
|
|
max_lrs = [5 + delta for delta in deltas]
|
|
lr_targets = [[x + delta for x in lr_base_target] for delta in deltas]
|
|
momentum_targets = [[x + delta for x in momentum_base_target] for delta in deltas]
|
|
scheduler = CyclicLR(
|
|
self.opt,
|
|
base_lr=base_lrs,
|
|
max_lr=max_lrs,
|
|
step_size_up=2,
|
|
step_size_down=6,
|
|
cycle_momentum=True,
|
|
base_momentum=base_lrs,
|
|
max_momentum=max_lrs,
|
|
mode='triangular2')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_base_target))
|
|
|
|
def test_cycle_lr_exp_range_mode_step_size_up_down(self):
|
|
base_lr, max_lr = 1, 5
|
|
diff_lr = max_lr - base_lr
|
|
gamma = 0.9
|
|
xs = ([
|
|
0.0, 0.5, 1.0, 5.0 / 6, 4.0 / 6, 3.0 / 6, 2.0 / 6, 1.0 / 6, 0.0, 0.5, 1.0, 5.0 / 6,
|
|
4.0 / 6
|
|
])
|
|
lr_target = [base_lr + x * diff_lr * gamma**i for i, x in enumerate(xs)]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_target = [max_lr - x * diff_lr * gamma**i for i, x in enumerate(xs)]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = CyclicLR(self.opt, base_lr=base_lr, max_lr=max_lr,
|
|
step_size_up=2, step_size_down=6,
|
|
cycle_momentum=True, base_momentum=base_lr,
|
|
max_momentum=max_lr,
|
|
mode='exp_range', gamma=gamma)
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target))
|
|
|
|
def test_cycle_lr_with_momentumless_optimizer(self):
|
|
# Note [Temporarily set optimizer to Adam]
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# The TestLRScheduler object carries around an SGD optimizer to avoid having to
|
|
# instantiate one for every test. This gets in the way for our very specific case
|
|
# in which we need to use Adam (or really any optimizer that doesn't use momentum)
|
|
# in order to test that the momentum bug in CyclicLR is fixed (the bug is described
|
|
# in more detail in https://github.com/pytorch/pytorch/issues/19003 ).
|
|
old_opt = self.opt
|
|
self.opt = optim.Adam(
|
|
[{'params': self.net.conv1.parameters()}, {'params': self.net.conv2.parameters(), 'lr': 0.5}],
|
|
lr=0.05)
|
|
|
|
lr_target = [1, 2, 3, 4, 5, 4, 3, 2, 1, 2, 3]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_target = [None] * len(lr_target)
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = CyclicLR(self.opt, base_lr=1, max_lr=5, step_size_up=4,
|
|
cycle_momentum=False, mode='triangular')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, len(lr_target))
|
|
|
|
self.opt = old_opt # set optimizer back to SGD
|
|
|
|
def test_cycle_lr_cycle_momentum_fail_with_momentumless_optimizer(self):
|
|
with self.assertRaises(ValueError):
|
|
adam_opt = optim.Adam(self.net.parameters())
|
|
scheduler = CyclicLR(adam_opt, base_lr=1, max_lr=5, cycle_momentum=True)
|
|
|
|
def test_onecycle_lr_invalid_anneal_strategy(self):
|
|
with self.assertRaises(ValueError):
|
|
scheduler = OneCycleLR(self.opt, max_lr=1e-3, total_steps=10, anneal_strategy="CATS")
|
|
|
|
def test_onecycle_lr_invalid_pct_start(self):
|
|
with self.assertRaises(ValueError):
|
|
scheduler = OneCycleLR(self.opt, max_lr=1e-3, total_steps=10, pct_start=1.1)
|
|
|
|
def test_onecycle_lr_cannot_calculate_total_steps(self):
|
|
with self.assertRaises(ValueError):
|
|
scheduler = OneCycleLR(self.opt, max_lr=1e-3)
|
|
|
|
def test_onecycle_lr_linear_annealing(self):
|
|
lr_target = [1, 13, 25, 21.5, 18, 14.5, 11, 7.5, 4, 0.5]
|
|
momentum_target = [22, 11.5, 1, 4, 7, 10, 13, 16, 19, 22]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = OneCycleLR(self.opt, max_lr=25, final_div_factor=2, base_momentum=1, max_momentum=22,
|
|
total_steps=10, anneal_strategy='linear')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, 10)
|
|
|
|
def test_onecycle_lr_cosine_annealing(self):
|
|
def annealing_cos(start, end, pct):
|
|
cos_out = math.cos(math.pi * pct) + 1
|
|
return end + (start - end) / 2.0 * cos_out
|
|
lr_target = [1, 13, 25, annealing_cos(25, 0.5, 1 / 7.0), annealing_cos(25, 0.5, 2 / 7.0),
|
|
annealing_cos(25, 0.5, 3 / 7.0), annealing_cos(25, 0.5, 4 / 7.0), annealing_cos(25, 0.5, 5 / 7.0),
|
|
annealing_cos(25, 0.5, 6 / 7.0), 0.5]
|
|
momentum_target = [22, 11.5, 1, annealing_cos(1, 22, 1 / 7.0), annealing_cos(1, 22, 2 / 7.0),
|
|
annealing_cos(1, 22, 3 / 7.0), annealing_cos(1, 22, 4 / 7.0), annealing_cos(1, 22, 5 / 7.0),
|
|
annealing_cos(1, 22, 6 / 7.0), 22]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = OneCycleLR(self.opt, max_lr=25, final_div_factor=2, base_momentum=1, max_momentum=22,
|
|
total_steps=10)
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, 10)
|
|
|
|
def test_cycle_lr_with_adam(self):
|
|
old_opt = self.opt
|
|
self.opt = optim.Adam(
|
|
[{'params': self.net.conv1.parameters()}, {'params': self.net.conv2.parameters(), 'lr': 0.5}],
|
|
lr=0.05)
|
|
|
|
lr_target = [1, 13, 25, 21.5, 18, 14.5, 11, 7.5, 4, 0.5]
|
|
momentum_target = [22, 11.5, 1, 4, 7, 10, 13, 16, 19, 22]
|
|
lr_targets = [lr_target, lr_target]
|
|
momentum_targets = [momentum_target, momentum_target]
|
|
scheduler = OneCycleLR(self.opt, max_lr=25, final_div_factor=2, base_momentum=1, max_momentum=22,
|
|
total_steps=10, anneal_strategy='linear')
|
|
self._test_cycle_lr(scheduler, lr_targets, momentum_targets, 10, use_beta1=True)
|
|
self.opt = old_opt # set optimizer back to SGD
|
|
|
|
def test_lambda_lr(self):
|
|
epochs = 10
|
|
self.opt.param_groups[0]['lr'] = 0.05
|
|
self.opt.param_groups[1]['lr'] = 0.4
|
|
targets = [[0.05 * (0.9 ** x) for x in range(epochs)], [0.4 * (0.8 ** x) for x in range(epochs)]]
|
|
scheduler = LambdaLR(self.opt,
|
|
lr_lambda=[lambda x1: 0.9 ** x1, lambda x2: 0.8 ** x2])
|
|
self._test(scheduler, targets, epochs)
|
|
|
|
def test_CosineAnnealingWarmRestarts_lr1(self):
|
|
iters = 100
|
|
eta_min = 1e-10
|
|
T_mults = [1, 2, 4]
|
|
for T_mult in T_mults:
|
|
T_i = 10
|
|
T_cur = 0
|
|
targets = [[0.05], [0.5]]
|
|
scheduler = CosineAnnealingWarmRestarts(self.opt, T_0=T_i, T_mult=T_mult, eta_min=eta_min)
|
|
for _ in range(1, iters, 1):
|
|
T_cur += 1
|
|
if T_cur >= T_i:
|
|
T_cur = T_cur - T_i
|
|
T_i = int(T_mult) * T_i
|
|
targets[0] += [eta_min + (0.05 - eta_min) * (1 + math.cos(math.pi * T_cur / T_i)) / 2]
|
|
targets[1] += [eta_min + (0.5 - eta_min) * (1 + math.cos(math.pi * T_cur / T_i)) / 2]
|
|
self._test(scheduler, targets, iters)
|
|
|
|
def test_CosineAnnealingWarmRestarts_lr2(self):
|
|
iters = 30
|
|
eta_min = 1e-10
|
|
T_mults = [1, 2, 4]
|
|
for T_mult in T_mults:
|
|
T_i = 10
|
|
T_cur = 0
|
|
targets = [[0.05], [0.5]]
|
|
scheduler = CosineAnnealingWarmRestarts(self.opt, T_0=T_i, T_mult=T_mult, eta_min=eta_min)
|
|
for _ in torch.arange(0.1, iters, 0.1):
|
|
T_cur = round(T_cur + 0.1, 1)
|
|
if T_cur >= T_i:
|
|
T_cur = T_cur - T_i
|
|
T_i = int(T_mult) * T_i
|
|
targets[0] += [eta_min + (0.05 - eta_min) * (1 + math.cos(math.pi * T_cur / T_i)) / 2]
|
|
targets[1] += [eta_min + (0.5 - eta_min) * (1 + math.cos(math.pi * T_cur / T_i)) / 2]
|
|
self._test_CosineAnnealingWarmRestarts(scheduler, targets, iters)
|
|
|
|
def test_CosineAnnealingWarmRestarts_lr3(self):
|
|
epochs_for_T_mults = [[0, 1, 2, 3, 4, 5, 12, 27, 3, 4, 5, 6, 13],
|
|
[0, 1, 2, 3, 4, 5, 25, 32, 33, 34, 80, 81, 3],
|
|
[0, 0.1, 0.2, 0.3, 1.3, 2.3, 17.5, 18.5, 19.5, 29.5, 30.5, 31.5, 50]]
|
|
T_curs_for_T_mults = [[1, 2, 3, 4, 5, 2, 7, 3, 4, 5, 6, 3],
|
|
[1, 2, 3, 4, 5, 15, 2, 3, 4, 10, 11, 3],
|
|
[0.1, 0.2, 0.3, 1.3, 2.3, 7.5, 8.5, 9.5, 19.5, 20.5, 21.5, 10]]
|
|
T_is_for_T_mults = [[10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10],
|
|
[10, 10, 10, 10, 10, 20, 40, 40, 40, 80, 80, 10],
|
|
[10, 10, 10, 10, 10, 30, 30, 30, 30, 30, 30, 90]]
|
|
eta_min = 1e-10
|
|
T_mults = [1, 2, 3]
|
|
for epochs, T_mult, T_curs, T_is in zip(epochs_for_T_mults, T_mults, T_curs_for_T_mults, T_is_for_T_mults):
|
|
targets = [[0.05], [0.5]]
|
|
scheduler = CosineAnnealingWarmRestarts(self.opt, T_0=10, T_mult=T_mult, eta_min=eta_min)
|
|
for T_cur, T_i in zip(T_curs, T_is):
|
|
targets[0] += [eta_min + (0.05 - eta_min) * (1 + math.cos(math.pi * T_cur / T_i)) / 2]
|
|
targets[1] += [eta_min + (0.5 - eta_min) * (1 + math.cos(math.pi * T_cur / T_i)) / 2]
|
|
self._test_interleaved_CosineAnnealingWarmRestarts(scheduler, targets, epochs)
|
|
|
|
def test_step_lr_state_dict(self):
|
|
self._check_scheduler_state_dict(
|
|
lambda: StepLR(self.opt, gamma=0.1, step_size=3),
|
|
lambda: StepLR(self.opt, gamma=0.01 / 2, step_size=1))
|
|
|
|
def test_multi_step_lr_state_dict(self):
|
|
self._check_scheduler_state_dict(
|
|
lambda: MultiStepLR(self.opt, gamma=0.1, milestones=[2, 5, 9]),
|
|
lambda: MultiStepLR(self.opt, gamma=0.01, milestones=[1, 4, 6]))
|
|
|
|
def test_exp_step_lr_state_dict(self):
|
|
self._check_scheduler_state_dict(
|
|
lambda: ExponentialLR(self.opt, gamma=0.1),
|
|
lambda: ExponentialLR(self.opt, gamma=0.01))
|
|
|
|
def test_cosine_lr_state_dict(self):
|
|
epochs = 10
|
|
eta_min = 1e-10
|
|
self._check_scheduler_state_dict(
|
|
lambda: CosineAnnealingLR(self.opt, T_max=epochs, eta_min=eta_min),
|
|
lambda: CosineAnnealingLR(self.opt, T_max=epochs // 2, eta_min=eta_min / 2),
|
|
epochs=epochs)
|
|
|
|
def test_reduce_lr_on_plateau_state_dict(self):
|
|
scheduler = ReduceLROnPlateau(self.opt, mode='min', factor=0.1, patience=2)
|
|
for score in [1.0, 2.0, 3.0, 4.0, 3.0, 4.0, 5.0, 3.0, 2.0, 1.0]:
|
|
scheduler.step(score)
|
|
scheduler_copy = ReduceLROnPlateau(self.opt, mode='max', factor=0.5, patience=10)
|
|
scheduler_copy.load_state_dict(scheduler.state_dict())
|
|
for key in scheduler.__dict__.keys():
|
|
if key not in {'optimizer', 'is_better'}:
|
|
self.assertEqual(scheduler.__dict__[key], scheduler_copy.__dict__[key], allow_inf=True)
|
|
|
|
def test_lambda_lr_state_dict_fn(self):
|
|
scheduler = LambdaLR(self.opt, lr_lambda=lambda x: x)
|
|
state = scheduler.state_dict()
|
|
self.assertIsNone(state['lr_lambdas'][0])
|
|
|
|
scheduler_copy = LambdaLR(self.opt, lr_lambda=lambda x: x)
|
|
scheduler_copy.load_state_dict(state)
|
|
for key in scheduler.__dict__.keys():
|
|
if key not in {'optimizer', 'lr_lambdas'}:
|
|
self.assertEqual(scheduler.__dict__[key], scheduler_copy.__dict__[key], allow_inf=True)
|
|
|
|
def test_lambda_lr_state_dict_obj(self):
|
|
scheduler = LambdaLR(self.opt, lr_lambda=LambdaLRTestObject(10))
|
|
state = scheduler.state_dict()
|
|
self.assertIsNotNone(state['lr_lambdas'][0])
|
|
|
|
scheduler_copy = LambdaLR(self.opt, lr_lambda=LambdaLRTestObject(-1))
|
|
scheduler_copy.load_state_dict(state)
|
|
for key in scheduler.__dict__.keys():
|
|
if key not in {'optimizer'}:
|
|
self.assertEqual(scheduler.__dict__[key], scheduler_copy.__dict__[key], allow_inf=True)
|
|
|
|
def test_CosineAnnealingWarmRestarts_lr_state_dict(self):
|
|
self._check_scheduler_state_dict(
|
|
lambda: CosineAnnealingWarmRestarts(self.opt, T_0=10, T_mult=2),
|
|
lambda: CosineAnnealingWarmRestarts(self.opt, T_0=100))
|
|
|
|
def _check_scheduler_state_dict(self, constr, constr2, epochs=10):
|
|
scheduler = constr()
|
|
for _ in range(epochs):
|
|
scheduler.step()
|
|
scheduler_copy = constr2()
|
|
scheduler_copy.load_state_dict(scheduler.state_dict())
|
|
for key in scheduler.__dict__.keys():
|
|
if key != 'optimizer':
|
|
self.assertAlmostEqual(scheduler.__dict__[key], scheduler_copy.__dict__[key])
|
|
self.assertAlmostEqual(scheduler.get_last_lr(), scheduler_copy.get_last_lr())
|
|
|
|
def _test_get_last_lr(self, schedulers, targets, epochs=10):
|
|
if isinstance(schedulers, _LRScheduler):
|
|
schedulers = [schedulers]
|
|
for epoch in range(epochs):
|
|
result = [scheduler.get_last_lr() for scheduler in schedulers]
|
|
[scheduler.step() for scheduler in schedulers]
|
|
target = [[t[epoch] for t in targets]] * len(schedulers)
|
|
# print(target)
|
|
for t, r in zip(target, result):
|
|
self.assertAlmostEqual(target, result,
|
|
msg='LR is wrong in epoch {}: expected {}, got {}'.format(
|
|
epoch, t, r), delta=1e-5)
|
|
|
|
def _test(self, schedulers, targets, epochs=10):
|
|
if isinstance(schedulers, _LRScheduler):
|
|
schedulers = [schedulers]
|
|
for epoch in range(epochs):
|
|
for param_group, target in zip(self.opt.param_groups, targets):
|
|
self.assertAlmostEqual(target[epoch], param_group['lr'],
|
|
msg='LR is wrong in epoch {}: expected {}, got {}'.format(
|
|
epoch, target[epoch], param_group['lr']), delta=1e-5)
|
|
[scheduler.step() for scheduler in schedulers]
|
|
|
|
def _test_CosineAnnealingWarmRestarts(self, scheduler, targets, epochs=10):
|
|
for index, epoch in enumerate(torch.arange(0, epochs, 0.1)):
|
|
epoch = round(epoch.item(), 1)
|
|
scheduler.step(epoch)
|
|
for param_group, target in zip(self.opt.param_groups, targets):
|
|
self.assertAlmostEqual(target[index], param_group['lr'],
|
|
msg='LR is wrong in epoch {}: expected {}, got {}'.format(
|
|
epoch, target[index], param_group['lr']), delta=1e-5)
|
|
|
|
def _test_interleaved_CosineAnnealingWarmRestarts(self, scheduler, targets, epochs):
|
|
for index, epoch in enumerate(epochs):
|
|
scheduler.step(epoch)
|
|
for param_group, target in zip(self.opt.param_groups, targets):
|
|
self.assertAlmostEqual(target[index], param_group['lr'],
|
|
msg='LR is wrong in epoch {}: expected {}, got {}'.format(
|
|
epoch, target[index], param_group['lr']), delta=1e-5)
|
|
|
|
def _test_against_closed_form(self, scheduler, closed_form_scheduler, epochs=10):
|
|
self.setUp()
|
|
targets = []
|
|
# warnings.filterwarnings("ignore", category=DeprecationWarning)
|
|
for epoch in range(epochs):
|
|
closed_form_scheduler.step(epoch)
|
|
targets.append([group['lr'] for group in self.opt.param_groups])
|
|
self.setUp()
|
|
for epoch in range(epochs):
|
|
scheduler.step()
|
|
for i, param_group in enumerate(self.opt.param_groups):
|
|
self.assertAlmostEqual(targets[epoch][i], param_group['lr'],
|
|
msg='LR is wrong in epoch {}: expected {}, got {}'.format(
|
|
epoch, targets[epoch][i], param_group['lr']), delta=1e-5)
|
|
|
|
def _test_reduce_lr_on_plateau(self, schedulers, targets, metrics, epochs=10, verbose=False):
|
|
if isinstance(schedulers, _LRScheduler) or isinstance(schedulers, ReduceLROnPlateau):
|
|
schedulers = [schedulers]
|
|
for epoch in range(epochs):
|
|
for scheduler in schedulers:
|
|
if isinstance(scheduler, ReduceLROnPlateau):
|
|
scheduler.step(metrics[epoch])
|
|
else:
|
|
scheduler.step()
|
|
if verbose:
|
|
print('epoch{}:\tlr={}'.format(epoch, self.opt.param_groups[0]['lr']))
|
|
for param_group, target in zip(self.opt.param_groups, targets):
|
|
self.assertAlmostEqual(target[epoch], param_group['lr'],
|
|
msg='LR is wrong in epoch {}: expected {}, got {}'.format(
|
|
epoch, target[epoch], param_group['lr']), delta=1e-5)
|
|
|
|
def _test_cycle_lr(self, scheduler, lr_targets, momentum_targets, batch_iterations, verbose=False, use_beta1=False):
|
|
for batch_num in range(batch_iterations):
|
|
if verbose:
|
|
if 'momentum' in self.opt.param_groups[0].keys():
|
|
print('batch{}:\tlr={},momentum={}'.format(batch_num, self.opt.param_groups[0]['lr'],
|
|
self.opt.param_groups[0]['momentum']))
|
|
elif use_beta1 and 'betas' in self.opt.param_groups[0].keys():
|
|
print('batch{}:\tlr={},beta1={}'.format(batch_num, self.opt.param_groups[0]['lr'],
|
|
self.opt.param_groups[0]['betas'][0]))
|
|
else:
|
|
print('batch{}:\tlr={}'.format(batch_num, self.opt.param_groups[0]['lr']))
|
|
|
|
for param_group, lr_target, momentum_target in zip(self.opt.param_groups, lr_targets, momentum_targets):
|
|
self.assertAlmostEqual(
|
|
lr_target[batch_num], param_group['lr'],
|
|
msg='LR is wrong in batch_num {}: expected {}, got {}'.format(
|
|
batch_num, lr_target[batch_num], param_group['lr']), delta=1e-5)
|
|
|
|
if use_beta1 and 'betas' in param_group.keys():
|
|
self.assertAlmostEqual(
|
|
momentum_target[batch_num], param_group['betas'][0],
|
|
msg='Beta1 is wrong in batch_num {}: expected {}, got {}'.format(
|
|
batch_num, momentum_target[batch_num], param_group['betas'][0]), delta=1e-5)
|
|
elif 'momentum' in param_group.keys():
|
|
self.assertAlmostEqual(
|
|
momentum_target[batch_num], param_group['momentum'],
|
|
msg='Momentum is wrong in batch_num {}: expected {}, got {}'.format(
|
|
batch_num, momentum_target[batch_num], param_group['momentum']), delta=1e-5)
|
|
scheduler.step()
|
|
|
|
def test_cosine_then_cyclic(self):
|
|
# https://github.com/pytorch/pytorch/issues/21965
|
|
|
|
max_lr = 0.3
|
|
base_lr = 0.1
|
|
optim_lr = 0.5
|
|
|
|
model = torch.nn.Linear(2, 1)
|
|
optimizer = torch.optim.SGD(model.parameters(), lr=optim_lr)
|
|
lr_scheduler_1 = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=20, eta_min=0.1)
|
|
lr_scheduler_2 = torch.optim.lr_scheduler.CyclicLR(
|
|
optimizer, base_lr=base_lr, max_lr=max_lr, step_size_up=1, step_size_down=3
|
|
)
|
|
|
|
for i in range(40):
|
|
if i <= lr_scheduler_1.T_max:
|
|
lr_scheduler_1.step()
|
|
else:
|
|
lr_scheduler_2.step()
|
|
last_lr = optimizer.param_groups[0]["lr"]
|
|
|
|
self.assertLessEqual(last_lr, max_lr)
|
|
|
|
if __name__ == '__main__':
|
|
run_tests()
|