import numpy as np from caffe2.python import core, workspace from caffe2.python.test_util import TestCase, rand_array class TestPartitionOps(TestCase): def test_configs(self): # (main dims, partitions, main type, [list of (extra dims, type)]) configs = [ ((10, ), 3), ((4, ), 10), ((10, 10), 4), ((100, ), 2), ((5, ), 1), ((1, ), 1), ((2, 10), 2), ] suffixes = [ [], [((2, 2), np.float32)], [((3, ), np.int64), ((2, ), np.float32)], ] return [ (main_dims, parts, main_type, extra, pack) for main_dims, parts in configs for main_type in [np.int32, np.int64] for extra in suffixes for pack in [False, True] ] def testPartition(self): for main_dims, parts, main_type, extra_ins, pack in self.test_configs(): ins = ['in' + str(i) for i in range(1 + len(extra_ins))] outs = [ 'in{}_p{}'.format(j, i) for i in range(parts) for j in range(1 + len(extra_ins)) ] op = core.CreateOperator( 'Partition', ins, outs, pack_first_input=(1 if pack else 0)) x = [] for i, (dims, t) in enumerate([((), main_type)] + extra_ins): if t in [np.float32, np.float64]: d = rand_array(*(main_dims + dims)) else: d = np.random.randint(-100, 100, (main_dims + dims)) d = d.astype(t) workspace.FeedBlob(ins[i], d) x.append(d) def sharding(x): # numpy has proper modulo op that yields non-negative results shards = (x[0] % parts).reshape([-1]) out = [] for i in range(parts): for ind, v in enumerate(x): suffix_shape = v.shape[len(x[0].shape):] accum = [] data = v.reshape((-1, ) + suffix_shape) if pack and ind == 0: data = data // parts for j, s in enumerate(shards): if s == i: accum.append(data[j]) def join(a): if not a: return np.empty(shape=(0, ) + suffix_shape) return np.stack(a) out.append(join(accum)) return out workspace.RunOperatorOnce(op) ref = sharding(x) print(x) print(ref) for name, expected in zip(outs, ref): np.testing.assert_array_equal( expected, workspace.FetchBlob(name) ) # test inverse operation (GatherByKey) if len(main_dims) == 1: # currently only 1D key tensor supported for i in range(len(extra_ins)): expected_out = ins[i + 1] gather_ins = [ins[0]] + [ outs[len(ins) * p + i + 1] for p in range(parts)] actual_out = expected_out + '_actual' op = core.CreateOperator( 'GatherByKey', gather_ins, actual_out) workspace.RunOperatorOnce(op) expected = workspace.FetchBlob(expected_out) actual = workspace.FetchBlob(actual_out) np.testing.assert_array_equal(expected, actual) def testLengthsPartition(self): for main_dims, parts, main_type, extra_ins, pack in self.test_configs(): # For LengthsSharding only 1-D tensors supported as a first input if len(main_dims) > 1: continue ins = ['in' + str(i) for i in range(2 + len(extra_ins))] outs = [ 'in{}_p{}'.format(j, i) for i in range(parts) for j in range(2 + len(extra_ins)) ] op = core.CreateOperator( 'LengthsPartition', ins, outs, pack_first_input=(1 if pack else 0) ) x = [] for i, (dims, t) in enumerate([((), main_type)] + extra_ins): if t in [np.float32, np.float64]: d = rand_array(*(main_dims + dims)) else: d = np.random.randint(-100, 100, (main_dims + dims)) d = d.astype(t) workspace.FeedBlob(ins[i + 1], d) x.append(d) # Randomly generate length tensor as well elements = np.random.randint(2, 10) lengths = [] total_length = 0 for _ in range(elements - 1): lengths.append(np.random.randint(main_dims[0] - total_length)) total_length += lengths[-1] lengths.append(main_dims[0] - total_length) workspace.FeedBlob(ins[0], np.array(lengths, dtype=np.int32)) def sharding(x): # numpy has proper modulo op that yields non-negative results shards = (x[0] % parts).reshape([-1]) out = [] for i in range(parts): idx = 0 sharded_lengths = np.zeros(elements) for ind, length in enumerate(lengths): for _ in range(length): if shards[idx] == i: sharded_lengths[ind] += 1 idx += 1 out.append(sharded_lengths) for ind, v in enumerate(x): suffix_shape = v.shape[len(x[0].shape):] accum = [] data = v.reshape((-1, ) + suffix_shape) if pack and ind == 0: data = data // parts for j, s in enumerate(shards): if s == i: accum.append(data[j]) def join(a): if not a: return np.empty(shape=(0, ) + suffix_shape) return np.stack(a) out.append(join(accum)) return out workspace.RunOperatorOnce(op) ref = sharding(x) for name, expected in zip(outs, ref): np.testing.assert_array_equal( expected, workspace.FetchBlob(name) ) if __name__ == "__main__": import unittest unittest.main()