from caffe2.proto import caffe2_pb2 from caffe2.python import core, workspace import caffe2.python.hypothesis_test_util as hu import caffe2.python.serialized_test.serialized_test_util as serial from hypothesis import given, settings import hypothesis.strategies as st import numpy as np def _fill_diagonal(shape, value): result = np.zeros(shape) np.fill_diagonal(result, value) return (result,) class TestFillerOperator(serial.SerializedTestCase): @given(**hu.gcs) @settings(deadline=10000) def test_shape_error(self, gc, dc): op = core.CreateOperator( 'GaussianFill', [], 'out', shape=32, # illegal parameter mean=0.0, std=1.0, ) exception = False try: workspace.RunOperatorOnce(op) except Exception: exception = True self.assertTrue(exception, "Did not throw exception on illegal shape") op = core.CreateOperator( 'ConstantFill', [], 'out', shape=[], # scalar value=2.0, ) exception = False self.assertTrue(workspace.RunOperatorOnce(op)) self.assertEqual(workspace.FetchBlob('out'), [2.0]) @given(**hu.gcs) @settings(deadline=10000) def test_int64_shape(self, gc, dc): large_dim = 2 ** 31 + 1 net = core.Net("test_shape_net") net.UniformFill( [], 'out', shape=[0, large_dim], min=0.0, max=1.0, ) self.assertTrue(workspace.CreateNet(net)) self.assertTrue(workspace.RunNet(net.Name())) self.assertEqual(workspace.blobs['out'].shape, (0, large_dim)) @given( shape=hu.dims().flatmap( lambda dims: hu.arrays( [dims], dtype=np.int64, elements=st.integers(min_value=0, max_value=20) ) ), a=st.integers(min_value=0, max_value=100), b=st.integers(min_value=0, max_value=100), **hu.gcs ) @settings(deadline=10000) def test_uniform_int_fill_op_blob_input(self, shape, a, b, gc, dc): net = core.Net('test_net') with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)): shape_blob = net.Const(shape, dtype=np.int64) a_blob = net.Const(a, dtype=np.int32) b_blob = net.Const(b, dtype=np.int32) uniform_fill = net.UniformIntFill([shape_blob, a_blob, b_blob], 1, input_as_shape=1) workspace.RunNetOnce(net) blob_out = workspace.FetchBlob(uniform_fill) if b < a: new_shape = shape[:] new_shape[0] = 0 np.testing.assert_array_equal(new_shape, blob_out.shape) else: np.testing.assert_array_equal(shape, blob_out.shape) self.assertTrue((blob_out >= a).all()) self.assertTrue((blob_out <= b).all()) @given( **hu.gcs ) def test_uniform_fill_using_arg(self, gc, dc): net = core.Net('test_net') shape = [2**3, 5] # uncomment this to test filling large blob # shape = [2**30, 5] min_v = -100 max_v = 100 output_blob = net.UniformIntFill( [], ['output_blob'], shape=shape, min=min_v, max=max_v, ) workspace.RunNetOnce(net) output_data = workspace.FetchBlob(output_blob) np.testing.assert_array_equal(shape, output_data.shape) min_data = np.min(output_data) max_data = np.max(output_data) self.assertGreaterEqual(min_data, min_v) self.assertLessEqual(max_data, max_v) self.assertNotEqual(min_data, max_data) @serial.given( shape=st.sampled_from( [ [3, 3], [5, 5, 5], [7, 7, 7, 7], ] ), **hu.gcs ) def test_diagonal_fill_op_float(self, shape, gc, dc): value = 2.5 op = core.CreateOperator( 'DiagonalFill', [], 'out', shape=shape, # scalar value=value, ) for device_option in dc: op.device_option.CopyFrom(device_option) # Check against numpy reference self.assertReferenceChecks(gc, op, [shape, value], _fill_diagonal) @given(**hu.gcs) def test_diagonal_fill_op_int(self, gc, dc): value = 2 shape = [3, 3] op = core.CreateOperator( 'DiagonalFill', [], 'out', shape=shape, dtype=core.DataType.INT32, value=value, ) # Check against numpy reference self.assertReferenceChecks(gc, op, [shape, value], _fill_diagonal) @serial.given(lengths=st.lists(st.integers(min_value=0, max_value=10), min_size=0, max_size=10), **hu.gcs) def test_lengths_range_fill(self, lengths, gc, dc): op = core.CreateOperator( "LengthsRangeFill", ["lengths"], ["increasing_seq"]) def _len_range_fill(lengths): sids = [] for _, l in enumerate(lengths): sids.extend(list(range(l))) return (np.array(sids, dtype=np.int32), ) self.assertReferenceChecks( device_option=gc, op=op, inputs=[np.array(lengths, dtype=np.int32)], reference=_len_range_fill) @given(**hu.gcs) def test_gaussian_fill_op(self, gc, dc): op = core.CreateOperator( 'GaussianFill', [], 'out', shape=[17, 3, 3], # sample odd dimensions mean=0.0, std=1.0, ) for device_option in dc: op.device_option.CopyFrom(device_option) assert workspace.RunOperatorOnce(op), "GaussianFill op did not run " "successfully" blob_out = workspace.FetchBlob('out') assert np.count_nonzero(blob_out) > 0, "All generated elements are " "zeros. Is the random generator functioning correctly?" @given(**hu.gcs) def test_msra_fill_op(self, gc, dc): op = core.CreateOperator( 'MSRAFill', [], 'out', shape=[15, 5, 3], # sample odd dimensions ) for device_option in dc: op.device_option.CopyFrom(device_option) assert workspace.RunOperatorOnce(op), "MSRAFill op did not run " "successfully" blob_out = workspace.FetchBlob('out') assert np.count_nonzero(blob_out) > 0, "All generated elements are " "zeros. Is the random generator functioning correctly?" @given( min=st.integers(min_value=0, max_value=5), range=st.integers(min_value=1, max_value=10), emb_size=st.sampled_from((10000, 20000, 30000)), dim_size=st.sampled_from((16, 32, 64)), **hu.gcs) @settings(deadline=None) def test_fp16_uniformfill_op(self, min, range, emb_size, dim_size, gc, dc): op = core.CreateOperator( 'Float16UniformFill', [], 'out', shape=[emb_size, dim_size], min=float(min), max=float(min + range), ) for device_option in dc: op.device_option.CopyFrom(device_option) assert workspace.RunOperatorOnce(op), "Float16UniformFill op did not run successfully" self.assertEqual(workspace.blobs['out'].shape, (emb_size, dim_size)) blob_out = workspace.FetchBlob('out') expected_type = "float16" expected_mean = min + range / 2.0 expected_var = range * range / 12.0 expected_min = min expected_max = min + range self.assertEqual(blob_out.dtype.name, expected_type) self.assertAlmostEqual(np.mean(blob_out, dtype=np.float32), expected_mean, delta=0.1) self.assertAlmostEqual(np.var(blob_out, dtype=np.float32), expected_var, delta=0.1) self.assertGreaterEqual(np.min(blob_out), expected_min) self.assertLessEqual(np.max(blob_out), expected_max) if __name__ == "__main__": import unittest unittest.main()