framework for committed serialized tests (#10594)

Summary:
Generate serialized test inputs/outputs/backward graphs of tests inside `caffe2/python/operator_test` that call assertSerializedOperatorCheck(). Tests should be decorated with serialized_test.collect_tests.given_and_seeded to run hypothesis tests that are actually random and a single fixed seeded hypothesis tests.

To use:
1. Refactor your test to be a SerializedTestCase
1a. Decorate it with given_and_seeded
1b. Call testWithArgs in main
2. Run your test with -g to generate the output. Check it in.
3. Subsequent runs of the test without generating the output will check against the checked in test case.

Details:
Run your test with `python caffe2/python/operator_test/[your_test].py -g`
Outputs are in `caffe2/python/serialized_test/data`. The operator tests outputs are in a further subdirectory `operator_test`, to allow for other tests in the future (model zoo tests?)

Currently, we've only refactored weighted_sum_test to use this, but in the next diff, we'll refactor as many as possible. The directory structure may also change as usually there are multiple tests in a single file, so we may create more structure to account for that.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/10594

Reviewed By: ezyang

Differential Revision: D9370359

Pulled By: ajyu

fbshipit-source-id: 2ce77389cd8bcc0255d3bccd61569833e545ede8
This commit is contained in:
Ansha Yu 2018-08-30 22:38:42 -07:00 committed by Facebook Github Bot
parent 00df09b65d
commit 9fae8fcdff
12 changed files with 274 additions and 9 deletions

View file

@ -49,7 +49,7 @@ fi
mkdir -p $TEST_DIR/{cpp,python}
cd ${INSTALL_PREFIX}
cd "${WORKSPACE}"
# C++ tests
echo "Running C++ tests.."
@ -137,6 +137,8 @@ echo "Running Python tests.."
"$CAFFE2_PYPATH/python" \
"${EXTRA_TESTS[@]}"
cd ${INSTALL_PREFIX}
if [[ -n "$INTEGRATED" ]]; then
pip install --user torchvision
"$ROOT_DIR/scripts/onnx/test.sh"

View file

@ -11,6 +11,11 @@ from caffe2.python import core, workspace, net_drawer
from caffe2.proto import caffe2_pb2
def getGradientForOp(op):
return core.GradientRegistry.GetGradientForOp(
op, [s + '_grad' for s in op.output])
def _get_grad_blob(grad_map, input_to_check):
grad_blob = grad_map[input_to_check]
@ -257,8 +262,7 @@ class GradientChecker:
if grad_ops is None:
# TODO(jiayq): use the gradient registration instead of the old
# hack.
grad_ops, g_input = core.GradientRegistry.GetGradientForOp(
op, [s + '_grad' for s in op.output])
grad_ops, g_input = getGradientForOp(op)
dims_to_check = inputs[input_to_check].size
_input_device_options = input_device_options or \

View file

@ -318,6 +318,38 @@ def runOpBenchmark(
return ret
def runOpOnInput(
device_option,
op,
inputs,
input_device_options=None,
):
op = copy.deepcopy(op)
op.device_option.CopyFrom(device_option)
with temp_workspace():
if (len(op.input) > len(inputs)):
raise ValueError(
'must supply an input for each input on the op: %s vs %s' %
(op.input, inputs))
_input_device_options = input_device_options or \
core.InferOpBlobDevicesAsDict(op)[0]
for (n, b) in zip(op.input, inputs):
workspace.FeedBlob(
n,
b,
device_option=_input_device_options.get(n, device_option)
)
workspace.RunOperatorOnce(op)
outputs_to_check = list(range(len(op.output)))
outs = []
for output_index in outputs_to_check:
output_blob_name = op.output[output_index]
output = workspace.FetchBlob(output_blob_name)
outs.append(output)
return outs
class HypothesisTestCase(test_util.TestCase):
"""
A unittest.TestCase subclass with some helper functions for
@ -594,6 +626,7 @@ class HypothesisTestCase(test_util.TestCase):
op, inputs, reference_outputs,
output_to_grad, grad_reference,
threshold=threshold)
return outs
def assertValidationChecks(

View file

View file

@ -4,20 +4,22 @@ from __future__ import print_function
from __future__ import unicode_literals
from caffe2.python import core
from hypothesis import given
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
class TestWeightedSumOp(hu.HypothesisTestCase):
class TestWeightedSumOp(serial.SerializedTestCase):
@given(n=st.integers(5, 8), m=st.integers(1, 1),
d=st.integers(2, 4), grad_on_w=st.booleans(),
**hu.gcs_cpu_only)
def test_weighted_sum(self, n, m, d, grad_on_w, gc, dc):
@serial.given_and_seeded(
n=st.integers(5, 8), m=st.integers(1, 1), d=st.integers(2, 4),
grad_on_w=st.booleans(), seed=st.integers(min_value=0, max_value=65535),
**hu.gcs_cpu_only)
def test_weighted_sum(self, n, m, d, grad_on_w, seed, gc, dc):
input_names = []
input_vars = []
np.random.seed(seed)
for i in range(m):
X_name = 'X' + str(i)
w_name = 'w' + str(i)
@ -59,3 +61,7 @@ class TestWeightedSumOp(hu.HypothesisTestCase):
outputs_to_check=i,
outputs_with_grads=[0],
)
if __name__ == "__main__":
serial.testWithArgs()

View file

@ -0,0 +1,12 @@
# Serialized operator test framework
Major functionality lives in `serialized_test_util.py`
## How to use
1. Extend the test case class from `SerializedTestCase`
2. Change the `@given` decorator to `@given_and_seeded`. This runs a seeded hypothesis test instance which will generate outputs if desired in addition to the unseeded hypothesis tests normally run.
3. Change a call to `unittest.main()` in `__main__` to `testWithArgs`.
4. Run your test `python caffe2/python/operator_test/my_test.py -g` to generate serialized outputs. They will live in `caffe2/python/serialized_test/data/operator_test`, one folder per test function
5. Thereafter, runs of the test without the flag will load serialized outputs and gradient operators for comparison against the seeded run. If for any reason the seeded run's inputs are different (this can happen with different hypothesis versions or different setups), then we'll run the serialized inputs through the serialized operator to get a runtime output for comparison.
If we'd like to extend the test framework beyond that for operator tests, we can create a new subfolder for them inside `caffe2/python/serialized_test/data`.

View file

@ -0,0 +1,208 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import argparse
from caffe2.proto import caffe2_pb2
from caffe2.python import gradient_checker
import caffe2.python.hypothesis_test_util as hu
from hypothesis import given, seed, settings
import inspect
import numpy
import os
import re
import shutil
import sys
import threading
operator_test_type = 'operator_test'
TOP_DIR = os.path.dirname(os.path.realpath(__file__))
DATA_SUFFIX = 'data'
DATA_DIR = os.path.join(TOP_DIR, DATA_SUFFIX)
_output_context = threading.local()
def given_and_seeded(*given_args, **given_kwargs):
def wrapper(f):
hyp_func = given(*given_args, **given_kwargs)(f)
fixed_seed_func = seed(0)(settings(max_examples=1)(given(
*given_args, **given_kwargs)(f)))
def func(self, *args, **kwargs):
self.should_serialize = True
fixed_seed_func(self, *args, **kwargs)
self.should_serialize = False
hyp_func(self, *args, **kwargs)
return func
return wrapper
class SerializedTestCase(hu.HypothesisTestCase):
should_serialize = False
def get_output_dir(self):
class_path = inspect.getfile(self.__class__)
file_name_components = os.path.basename(class_path).split('.')
test_file = file_name_components[0]
function_name_components = self.id().split('.')
test_function = function_name_components[-1]
output_dir_arg = getattr(_output_context, 'output_dir', DATA_DIR)
output_dir = os.path.join(
output_dir_arg, operator_test_type, test_file + '.' + test_function)
if os.path.exists(output_dir):
return output_dir
# fall back to pwd
cwd = os.getcwd()
serialized_util_module_components = __name__.split('.')
serialized_util_module_components.pop()
serialized_dir = '/'.join(serialized_util_module_components)
output_dir_fallback = os.path.join(cwd, serialized_dir, DATA_SUFFIX)
output_dir = os.path.join(
output_dir_fallback,
operator_test_type,
test_file + '.' + test_function)
return output_dir
def serialize_test(self, inputs, outputs, grad_ops, op, device_option):
def prepare_dir(path):
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)
output_dir = self.get_output_dir()
prepare_dir(output_dir)
for (i, grad) in enumerate(grad_ops):
grad_path = os.path.join(output_dir, 'gradient_{}.pb'.format(i))
with open(grad_path, 'wb') as f:
f.write(grad.SerializeToString())
device_type = int(device_option.device_type)
op_path = os.path.join(output_dir, 'operator_{}.pb'.format(device_type))
with open(op_path, 'wb') as f:
f.write(op.SerializeToString())
numpy.savez_compressed(
os.path.join(output_dir, 'inputs'), inputs=inputs)
numpy.savez_compressed(
os.path.join(output_dir, 'outputs'), outputs=outputs)
def compare_test(self, inputs, outputs, grad_ops, atol=1e-7, rtol=1e-7):
def parse_proto(x):
proto = caffe2_pb2.OperatorDef()
proto.ParseFromString(x)
return proto
source_dir = self.get_output_dir()
# load serialized input and output
loaded_inputs = numpy.load(
os.path.join(source_dir, 'inputs.npz'), encoding='bytes')['inputs']
inputs_equal = True
for (x, y) in zip(inputs, loaded_inputs):
if not numpy.array_equal(x, y):
inputs_equal = False
loaded_outputs = numpy.load(os.path.join(
source_dir, 'outputs.npz'), encoding='bytes')['outputs']
# load operator
found_op = False
for i in os.listdir(source_dir):
op_file = os.path.join(source_dir, i)
match = re.search('operator_(.+?)\.pb', i)
if os.path.isfile(op_file) and match:
with open(op_file, 'rb') as f:
loaded_op = f.read()
op_proto = parse_proto(loaded_op)
device_type = int(match.group(1))
device_option = caffe2_pb2.DeviceOption(device_type=device_type)
grad_ops, _ = gradient_checker.getGradientForOp(op_proto)
found_op = True
break
# if inputs are not the same, run serialized input through serialized op
if not inputs_equal:
self.assertTrue(found_op)
outputs = hu.runOpOnInput(device_option, op_proto, loaded_inputs)
# assert outputs are equal
for (x, y) in zip(outputs, loaded_outputs):
numpy.testing.assert_allclose(x, y, atol=atol, rtol=rtol)
# assert gradient op is equal
for i in range(len(grad_ops)):
with open(os.path.join(source_dir, 'gradient_{}.pb'.format(i)), 'rb') as f:
loaded_grad = f.read()
grad_proto = parse_proto(loaded_grad)
self.assertTrue(grad_proto == grad_ops[i])
def assertSerializedOperatorChecks(
self,
inputs,
outputs,
gradient_operator,
op,
device_option,
):
if self.should_serialize:
if getattr(_output_context, 'should_write_output', False):
self.serialize_test(
inputs, outputs, gradient_operator, op, device_option)
else:
self.compare_test(inputs, outputs, gradient_operator)
def assertReferenceChecks(
self,
device_option,
op,
inputs,
reference,
input_device_options=None,
threshold=1e-4,
output_to_grad=None,
grad_reference=None,
atol=None,
outputs_to_check=None,
):
outs = super(SerializedTestCase, self).assertReferenceChecks(
device_option,
op,
inputs,
reference,
input_device_options,
threshold,
output_to_grad,
grad_reference,
atol,
outputs_to_check,
)
grad_ops, _ = gradient_checker.getGradientForOp(op)
self.assertSerializedOperatorChecks(
inputs,
outs,
grad_ops,
op,
device_option,
)
def testWithArgs():
parser = argparse.ArgumentParser()
parser.add_argument(
'-g', '--generate-serialized', action='store_true', dest='write',
help='generate output files (default=false, compares to current files)')
parser.add_argument(
'-o', '--output', default=DATA_DIR,
help='output directory (default: %(default)s)')
parser.add_argument('unittest_args', nargs='*')
args = parser.parse_args()
sys.argv[1:] = args.unittest_args
_output_context.__setattr__('should_write_output', args.write)
_output_context.__setattr__('output_dir', args.output)
import unittest
unittest.main()