from caffe2.python import core from hypothesis import assume, given, settings import caffe2.python.hypothesis_test_util as hu import hypothesis.strategies as st import numpy as np class TestReduceFrontSum(hu.HypothesisTestCase): @given(batch_size=st.integers(1, 3), stride=st.integers(1, 3), pad=st.integers(0, 3), kernel=st.integers(1, 5), dilation=st.integers(1, 3), size=st.integers(7, 10), channels=st.integers(1, 8), **hu.gcs) def test_im2col_layout(self, batch_size, stride, pad, kernel, dilation, size, channels, gc, dc): dkernel = (dilation * (kernel - 1) + 1) assume(size >= dkernel) NCHW_TO_NHWC = (0, 2, 3, 1) NHWC_TO_NCHW = (0, 3, 1, 2) COL_NHWC_TO_NCHW = (4, 2, 3, 0, 1) N = batch_size C = channels H = size W = size out_h = int((H + (2 * pad) - dkernel) / stride + 1) out_w = int((W + (2 * pad) - dkernel) / stride + 1) im_nchw = np.random.rand(N, C, H, W).astype(np.float32) - 0.5 im_nhwc = im_nchw.transpose(NCHW_TO_NHWC) op_im2col_nchw = core.CreateOperator( "Im2Col", ["im_nchw"], ["col_nchw"], stride=stride, kernel=kernel, dilation=dilation, pad=pad, order="NCHW", device_option=gc) op_im2col_nhwc = core.CreateOperator( "Im2Col", ["im_nhwc"], ["col_nhwc"], stride=stride, kernel=kernel, dilation=dilation, pad=pad, order="NHWC", device_option=gc) self.ws.create_blob("im_nchw").feed(im_nchw, device_option=gc) self.ws.create_blob("im_nhwc").feed(im_nhwc, device_option=gc) self.ws.run(op_im2col_nchw) self.ws.run(op_im2col_nhwc) # there is probably a clever way to spell this in np col_nchw = self.ws.blobs["col_nchw"].fetch() col_nhwc = self.ws.blobs["col_nhwc"].fetch() col_nchw_ = col_nchw.reshape(N, C, kernel, kernel, out_h, out_w) col_nhwc_ = col_nhwc.reshape(N, out_h, out_w, kernel, kernel, C) for i in range(0, N): np.testing.assert_allclose( col_nchw_[i], col_nhwc_[i].transpose(COL_NHWC_TO_NCHW), atol=1e-4, rtol=1e-4) op_col2im_nchw = core.CreateOperator( "Col2Im", ["col_nchw", "im_nchw"], ["out_nchw"], stride=stride, kernel=kernel, dilation=dilation, pad=pad, order="NCHW", device_option=gc) op_col2im_nhwc = core.CreateOperator( "Col2Im", ["col_nhwc", "im_nhwc"], ["out_nhwc"], stride=stride, kernel=kernel, dilation=dilation, pad=pad, order="NHWC", device_option=gc) self.ws.run(op_col2im_nchw) self.ws.run(op_col2im_nhwc) out_nchw = self.ws.blobs["out_nchw"].fetch() out_nhwc = self.ws.blobs["out_nhwc"].fetch() np.testing.assert_allclose( out_nchw, out_nhwc.transpose(NHWC_TO_NCHW), atol=1e-4, rtol=1e-4) @given(batch_size=st.integers(1, 3), stride=st.integers(1, 3), pad=st.integers(0, 3), kernel=st.integers(1, 5), dilation=st.integers(1, 3), size=st.integers(7, 10), channels=st.integers(1, 8), order=st.sampled_from(["NCHW"]), **hu.gcs) @settings(deadline=10000) def test_col2im_gradients(self, batch_size, stride, pad, kernel, dilation, size, channels, order, gc, dc): assume(size >= dilation * (kernel - 1) + 1) op = core.CreateOperator( "Im2Col", ["X"], ["Y"], stride=stride, kernel=kernel, dilation=dilation, pad=pad, order=order, device_option=gc) X = np.random.rand(batch_size, channels, size, size).astype(np.float32) self.assertGradientChecks(gc, op, [X], 0, [0]) return