mirror of
https://github.com/saymrwulf/pytorch.git
synced 2026-05-15 21:00:47 +00:00
Summary: There is a module called `2to3` which you can target for future specifically to remove these, the directory of `caffe2` has the most redundant imports: ```2to3 -f future -w caffe2``` Pull Request resolved: https://github.com/pytorch/pytorch/pull/45033 Reviewed By: seemethere Differential Revision: D23808648 Pulled By: bugra fbshipit-source-id: 38971900f0fe43ab44a9168e57f2307580d36a38
181 lines
6.7 KiB
Python
181 lines
6.7 KiB
Python
|
|
|
|
|
|
|
|
import numpy as np
|
|
from caffe2.python import core, workspace
|
|
from caffe2.python.test_util import TestCase, rand_array
|
|
|
|
|
|
class TestPartitionOps(TestCase):
|
|
|
|
def test_configs(self):
|
|
# (main dims, partitions, main type, [list of (extra dims, type)])
|
|
configs = [
|
|
((10, ), 3),
|
|
((4, ), 10),
|
|
((10, 10), 4),
|
|
((100, ), 2),
|
|
((5, ), 1),
|
|
((1, ), 1),
|
|
((2, 10), 2),
|
|
]
|
|
suffixes = [
|
|
[],
|
|
[((2, 2), np.float32)],
|
|
[((3, ), np.int64), ((2, ), np.float32)],
|
|
]
|
|
return [
|
|
(main_dims, parts, main_type, extra, pack)
|
|
for main_dims, parts in configs
|
|
for main_type in [np.int32, np.int64] for extra in suffixes
|
|
for pack in [False, True]
|
|
]
|
|
|
|
def testPartition(self):
|
|
for main_dims, parts, main_type, extra_ins, pack in self.test_configs():
|
|
ins = ['in' + str(i) for i in range(1 + len(extra_ins))]
|
|
outs = [
|
|
'in{}_p{}'.format(j, i)
|
|
for i in range(parts) for j in range(1 + len(extra_ins))
|
|
]
|
|
op = core.CreateOperator(
|
|
'Partition', ins, outs, pack_first_input=(1 if pack else 0))
|
|
x = []
|
|
for i, (dims, t) in enumerate([((), main_type)] + extra_ins):
|
|
if t in [np.float32, np.float64]:
|
|
d = rand_array(*(main_dims + dims))
|
|
else:
|
|
d = np.random.randint(-100, 100, (main_dims + dims))
|
|
d = d.astype(t)
|
|
workspace.FeedBlob(ins[i], d)
|
|
x.append(d)
|
|
|
|
def sharding(x):
|
|
# numpy has proper modulo op that yields non-negative results
|
|
shards = (x[0] % parts).reshape([-1])
|
|
out = []
|
|
for i in range(parts):
|
|
for ind, v in enumerate(x):
|
|
suffix_shape = v.shape[len(x[0].shape):]
|
|
accum = []
|
|
data = v.reshape((-1, ) + suffix_shape)
|
|
|
|
if pack and ind == 0:
|
|
data = data // parts
|
|
|
|
for j, s in enumerate(shards):
|
|
if s == i:
|
|
accum.append(data[j])
|
|
|
|
def join(a):
|
|
if not a:
|
|
return np.empty(shape=(0, ) + suffix_shape)
|
|
return np.stack(a)
|
|
|
|
out.append(join(accum))
|
|
return out
|
|
|
|
workspace.RunOperatorOnce(op)
|
|
ref = sharding(x)
|
|
print(x)
|
|
print(ref)
|
|
for name, expected in zip(outs, ref):
|
|
np.testing.assert_array_equal(
|
|
expected, workspace.FetchBlob(name)
|
|
)
|
|
|
|
# test inverse operation (GatherByKey)
|
|
if len(main_dims) == 1:
|
|
# currently only 1D key tensor supported
|
|
for i in range(len(extra_ins)):
|
|
expected_out = ins[i + 1]
|
|
gather_ins = [ins[0]] + [
|
|
outs[len(ins) * p + i + 1] for p in range(parts)]
|
|
actual_out = expected_out + '_actual'
|
|
op = core.CreateOperator(
|
|
'GatherByKey', gather_ins, actual_out)
|
|
workspace.RunOperatorOnce(op)
|
|
expected = workspace.FetchBlob(expected_out)
|
|
actual = workspace.FetchBlob(actual_out)
|
|
np.testing.assert_array_equal(expected, actual)
|
|
|
|
|
|
def testLengthsPartition(self):
|
|
for main_dims, parts, main_type, extra_ins, pack in self.test_configs():
|
|
# For LengthsSharding only 1-D tensors supported as a first input
|
|
if len(main_dims) > 1:
|
|
continue
|
|
ins = ['in' + str(i) for i in range(2 + len(extra_ins))]
|
|
outs = [
|
|
'in{}_p{}'.format(j, i)
|
|
for i in range(parts) for j in range(2 + len(extra_ins))
|
|
]
|
|
op = core.CreateOperator(
|
|
'LengthsPartition', ins, outs,
|
|
pack_first_input=(1 if pack else 0)
|
|
)
|
|
x = []
|
|
for i, (dims, t) in enumerate([((), main_type)] + extra_ins):
|
|
if t in [np.float32, np.float64]:
|
|
d = rand_array(*(main_dims + dims))
|
|
else:
|
|
d = np.random.randint(-100, 100, (main_dims + dims))
|
|
d = d.astype(t)
|
|
workspace.FeedBlob(ins[i + 1], d)
|
|
x.append(d)
|
|
|
|
# Randomly generate length tensor as well
|
|
elements = np.random.randint(2, 10)
|
|
lengths = []
|
|
total_length = 0
|
|
for _ in range(elements - 1):
|
|
lengths.append(np.random.randint(main_dims[0] - total_length))
|
|
total_length += lengths[-1]
|
|
lengths.append(main_dims[0] - total_length)
|
|
workspace.FeedBlob(ins[0], np.array(lengths, dtype=np.int32))
|
|
|
|
def sharding(x):
|
|
# numpy has proper modulo op that yields non-negative results
|
|
shards = (x[0] % parts).reshape([-1])
|
|
out = []
|
|
for i in range(parts):
|
|
idx = 0
|
|
sharded_lengths = np.zeros(elements)
|
|
for ind, length in enumerate(lengths):
|
|
for _ in range(length):
|
|
if shards[idx] == i:
|
|
sharded_lengths[ind] += 1
|
|
idx += 1
|
|
out.append(sharded_lengths)
|
|
|
|
for ind, v in enumerate(x):
|
|
suffix_shape = v.shape[len(x[0].shape):]
|
|
accum = []
|
|
data = v.reshape((-1, ) + suffix_shape)
|
|
|
|
if pack and ind == 0:
|
|
data = data // parts
|
|
|
|
for j, s in enumerate(shards):
|
|
if s == i:
|
|
accum.append(data[j])
|
|
|
|
def join(a):
|
|
if not a:
|
|
return np.empty(shape=(0, ) + suffix_shape)
|
|
return np.stack(a)
|
|
|
|
out.append(join(accum))
|
|
return out
|
|
|
|
workspace.RunOperatorOnce(op)
|
|
ref = sharding(x)
|
|
for name, expected in zip(outs, ref):
|
|
np.testing.assert_array_equal(
|
|
expected, workspace.FetchBlob(name)
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
import unittest
|
|
unittest.main()
|