2016-10-07 20:08:53 +00:00
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
2017-08-16 02:04:12 +00:00
from future . utils import viewkeys
from multiprocessing import Process , Queue
2016-10-07 20:08:53 +00:00
import numpy as np
2017-08-16 02:04:12 +00:00
import os
2017-07-06 16:04:29 +00:00
import shutil
2017-08-16 02:04:12 +00:00
import tempfile
2016-10-07 20:08:53 +00:00
import unittest
[Caffe2][fbcode=>GH sync] Update from facebook 4323b18ce13c (#7116)
* [fix] Re-enable events in RNN ops
We have earlier added event disabling in RNN ops as back then we didn't use
events, with current use cases this is no longer true
(https://fburl.com/8vd0lp8y)
* use ops with cude impl
* Revert D7729695: [caffe2][fix] Re-enable events in RNN ops
This reverts commit 4b215c7496fb724656ff4c776933a15bdbbcde5e
@bypass-lint
An infra SEV is better than not reverting this diff.
If you copy this password, see you in SEV Review!
@cause_a_sev_many_files
* [observer] Clean up observer_config.h
#accept2ship
* [1/n] Refactor dataio_test.py
Replace code duplication with a common function
* Add barrier net that runs before training nets
Add a synchonize barrier net that is run before training nets. With this net, shards that are faster will wait for other shards before start training. This reduce chances of the faster shards timing out during GLOO AllReduce.
Removed explicit data_parallel_model.py.synchronize call in holmes workflow. Similar change in speech/asr_training workflow will come in another diff.
* Support the dnnlowp backend in caffe2_benchmark
This is for SHARE operator latency evaluation
* Migrate integral_image_op to main caffe2
migrate integral_image_op(GPU version) given by https://fburl.com/yvqezigi
to caffe2/caffe2/operators and implement its CPU version. Write up a test
using the hypothesis_test mechanism
* [pos_disc, fbcode] Implement unjoined lr loss
As explained in https://our.intern.facebook.com/intern/wiki/Model_Based_Calibration/, when the dataset is an joined data set, where labels might change later, we need to use unjoined logloss.
The implementation is almost the same as in Sigrid (https://fburl.com/1trngsls), where
loss = y (log(p) - log(1-p)) + (1-y)(log(1-p)) = xy - (1-y)x - (1-y)log(1+exp(-x))
For x < 0, to ensure stability and avoid overflow, we reformulate the above exp as
loss = xy - (1-y)x - (1-y)x + (1-y)log(1+exp(x)) = xy + (1-y)log(1+exp(x))
Then the final expression becomes
loss = xy + (y - 1) x (x >= 0) - (1 - y) log(1 + exp(x - 2 x (x >= 0)))
where y is the true label, x is the dot product and p = logistic(x).
This kind of implementation is align with the current implementation of the original cross entropy in
https://phabricator.intern.facebook.com/diffusion/FBS/browse/master/fbcode/caffe2/caffe2/operators/cross_entropy_op.cc;0bae3b5d0f825897c5e0dd0ff10f489d7271bf25$7-13
* Keep the array to fix the conflict
* [C2] Compute Adagrad effective LR
The AdagradWithLR op outputs an extra blob which is contains the average effective learning rate across all weights in this blob.
* Open-source extractMetaNetDef & runGlobalInitialization, add new Predictor constructor from db file, and add run_map_outputs
1. Open-source extractMetaNetDef and runGlobalInitialization, for use in
2. new Predictor constructor from db file.
3. Add new run function that returns outputs as TensorMap
* Disable eigen cpu
Disable eigen cpu in transpose and reduce
* Introduce request_only/object_only property of ModelLayer
by default this is False
* A simple TC Caffe2 benchmark
We can run tunner, get MappingOptions and then use them to
compare against cuBLAS
currently broken due to LLVM issues. How to run:
hg checkout eec1ab31b59c03b8deded1c755a9abaf8c45be01
add D7401202
add D7434625
add D7506031
add D7540728
buck run @mode/dev-nosan tc/tc/benchmarks_python:caffe2_benchmark
* Move Caffe2 feature_maps_ops to open source
Need feature maps operators in open source project facebookresearch/BlueWhale
* Manually fix the conflicts in channel shuffle op
* Fix the inconsistency between different gh and fbcode
* Skip Adagrad GPU Test (Because some gpu implementation is missing)
* Fix another test to make sure it won't run on gpu when implementation is not available yet
2018-05-02 03:49:00 +00:00
import time
[Caffe2] Changes done inside Facebook (#6378)
* fix unit test for sqrt op
From the error logging:
[idx, grad, grad_estimate] are:
[[ 146. 0.5 0.45776367]
[ 147. 0.5 0.45776367]
The gradient == 0.5 is correct, which means the SqrtOp and its gradient is doing right job. (Because y = sqrt(x), loss = y^2/2 = x/2, and then d(loss)/dx = 1/2 = 0.5; )
The test failed because of numerical problem of grad_estimate (in unit test). It can be because the step_size is small, and float precision is not high (when there are multiple elements in the tensor, we do sum(y^2) to compute loss)
This diff
- increase the step size, and also move the test cases to be further away from 0 (where sqrt(x) is not well defined) to be safe :)
- also clean up, and merge the test case for inplace Vs. non-inplace
Tested with:
`CAFFE2_HYPOTHESIS_PROFILE=debug ai_bt caffe2/caffe2/python/operator_test:elementwise_ops_test -- "test_sqrt"`
* CompositeReader & CompositeReaderBuilder
A new type of reader gluing multiple readers together.
* Back out "Revert D7394363: [GanH]: Log D Trick for Cross Entropy with Sigmoid"
Original commit changeset: 9325a4356dbe
* [dai][WIP] convert params to int8 on ps before sending to trainer
Add float->uint8 conversion in addition to float->fp16 conversion in model_saver.
* [easy] improve unit test for sparse length sum ops
as desc.
#accept2ship
* Update GitHub upstream to 771fcb3455cbfe69c2abcc4cb3bd7ef92d59af24
* move sparse hash unique ops to OOS and add unit tests
- move the SparseHash version to OOS, since 'sparsehash' is already deps of caffe2 OOS: https://fburl.com/arssw4n1
- The 'SparseHash' engine is also being used in OOS, so the SparseHash version shall be in OOS to reduce confusion: https://fburl.com/o5ea7ah2
- fix the CUDA UniqueOp for the case when batch is empty.
- add unit test
* group_norm_op for caffe2
This is the cuda op for Group Normalization (GN): https://arxiv.org/abs/1803.08494
This code implements GN in one op that computes Y=gamma * (X-mu) / sigma + beta and also its gradients. It is expected to have minimal memory consumption (similar to the BN op), without creating new blobs if GN were implemented as several ops (e.g., reshape, norm_mean/std, affine_channel).
* Resubmit D7405233: disappeared in D7464958
OOS publish causes the op missing -- however, test was still there
* [c2] add sparse hash engine for cuda unique op
The SparseHash version of UniqueOp copy input tensor to CPU, and make use of sparse hash map to get unique output, and then copy back to GPU.
* [dper][gpu] enable unit testing gpu trainer for sparse nn
to debug the GPU trainer using mock data in unit test.
make it easier to develop GPU trainer for new models.
* Reuse Gloo context for Synchronize() calls
Previously we were creating (and leaking) the Gloo context on each call to Synchronize(). Now only run the common world op and create the barrier net once, then run the barrier net on each Synchronize() call. Since timeout is associated with the Gloo context, assert that the timeout is fixed instead of trying to handle the complexity of multiple timeouts (and associated contexts).
* [GanH/WGAN][1/n]: add FC param clipping
as titled
* [mobile] minimizing changes between caffe2_benchmark and speed_benchmark
* [GanH]: enable diagnose within model
avoid finding blob names but to directly enable inside the model
* Add `net_transformer_fun` option to DPM
This callback allows for various transformations to be made to the
model after gradient operators have been added. The immediate motivation for
this is to allow transformations such has "checkpoint-and-recompute" which
allow trading off memory for additional compute.
Adding several callbacks like this has made DPM's API less than ideal at this
stage. However, I could not find any reasonable alternative.
* [DT] [33/n] Compile flow task groups
task groups need to compiled in order to pickle the object in fblearner. However I also changed the Job's compile function as creating new object is not necessary.
* Initial commit for sparse_normalize vectorization and benchmark
* [GanH]: LB Calibration for JSD
as titled
* Tracing event in async executor
Adding event tracing through TRACE_EVENT macro in async executor
* [Resubmit] D7409751 Reseting book-keeping blobs when the reservoir is reset
D7409751 got lost in D7464958
* Visualizing realtime weights values
we want to visualize the weights values as optimizer is iterating. This diff supports to visual the weights at an assigned index.
Currently, we assume the blob to be 2 dimensional.
* [GanH][Easy]: Fix Homotopy Weighting
apparantely, there was a bug in homotopy weight (alpha, beta) update
* [c2] move sparse hash unique op out of oss
so that oss do not need to depend on google hash map.
* Get rid of std::round as it's not supported on Android
* Revert changes on setup.py
* Skip shaky test on Dataio
* fix
2018-04-11 04:11:43 +00:00
from mock import Mock
2017-11-20 07:28:40 +00:00
from hypothesis import assume , given
import hypothesis . strategies as st
2017-08-16 02:04:12 +00:00
2016-10-07 20:08:53 +00:00
from caffe2 . proto import caffe2_pb2
2017-11-20 07:28:40 +00:00
from caffe2 . python import brew , core , cnn , data_parallel_model , dyndep , \
Update from Facebook (#6692)
* [GanH][Easy]: Add assertion to adaptive weighting layer
0 weight causes numeric instability and exploding ne
* [Easy] Add cast op before computing norm in diagnose options
As LpNorm only takes floats we add a manual casting here.
* Introduce a new caching device allocator
`cudaMalloc` and `cudaFree` calls are slow, and become slower the
more GPUs there are. Essentially, they grab a host-wide (not device-wide) lock
because GPU memory is transparently shared across all GPUs. Normally, this
isn't much of a concern since workloads allocate memory upfront, and reuse it
during later computation.
However, under some computation models (specifically, memory conserving
approaches like checkpoint-and-recompute, see
https://medium.com/@yaroslavvb/fitting-larger-networks-into-memory-583e3c758ff9)
this assumption is no longer true. In these situations, `cudaMalloc` and
`cudaFree` are common and frequent. Furthermore, in data parallel contexts,
these calls happen at nearly the same time from all GPUs worsening lock
contention.
A common solution to this problem is to add a custom allocator. In fact,
nVIDIA provides one out of the box: CUB, which Caffe2 already supports.
Unfortunately, the CUB allocator suffers from very high fragmentation. This is
primarily because it is a "buddy" allocator which neither splits nor merges
free cached blocks. Study
https://github.com/NVlabs/cub/blob/1.8.0/cub/util_allocator.cuh#L357 if you
want to convince yourself.
This diff adapts a caching allocator from the Torch codebase
https://github.com/torch/cutorch/blob/master/lib/THC/THCCachingAllocator.cpp
which does splitting and merging and ends up working really well, at least for
workloads like the checkpoint-and-recompute computation models noted above.
I simplified the implementation a little bit, made it a bit more C++-like. I
also removed a bunch of stream synchronization primitives for this diff. I
plan to add them back in subsequent diffs.
* Report reader progress in fblearner workflows
Integrate with fblearner progress reporting API and add support to report training progress from reader nodes.
If reader is constructed with batch limits, report based on finished batch vs total batch. The finished batch may be more than total batch because we evaludate if we should stop processing everytime we dequeue a split.
If no limit for the reader, report based on finished splits (Hive files) vs total splits. This is fairly accurate.
* [GanH][Diagnose]: fix plotting
1. ganh diagnose needs to set plot options
2. modifier's blob name is used for metric field can need to be fixed before
generating net
* Automatic update of fbcode/onnx to 985af3f5a0f7e7d29bc0ee6b13047e7ead9c90c8
* Make CompositeReader stops as soon as one reader finishes
Previously, CompositeReader calls all readers before stopping. It results in flaky test since the last batch may be read by different threads; resulting in dropped data.
* [dper] make sure loss is not nan
as desc.
* [rosetta2] [mobile-vision] Option to export NHWC order for RoIWarp/RoIAlign
Thanks for finding this @stzpz and @wangyanghan. Looks like NHWC is more
optimized. For OCR though it doesn't yet help since NHWC uses more mem b/w but
will soon become important.
* Intra-op parallel FC operator
Intra-op parallel FC operator
* [C2 Proto] extra info in device option
passing extra information in device option
design doc: https://fb.quip.com/yAiuAXkRXZGx
* Unregister MKL fallbacks for NCHW conversions
* Tracing for more executors
Modified Tracer to work with other executors and add more tracing
* Remove ShiftActivationDevices()
* Check for blob entry iff it is present
When processing the placeholders ops, ignore if the blob is not present in the blob_to_device.
* Internalize use of eigen tensor
Move use of eigen tensor out of the header file so we don't get template partial specialization errors when building other libraries.
* feature importance for transformed features.
* - Fix unused parameter warnings
The changes in this diff comments out unused parameters.
This will allow us to enable -Wunused-parameter as error.
#accept2ship
* add opencv dependencies to caffe2
The video input op requires additional opencv packages. This is to add them to
cmake so that it can build
* Add clip_by_value option in gradient clipping
Add clip_by_value option in gradient clipping
when the value is bigger than max or smaller than min, do the clip
* std::round compat
2018-04-18 06:36:40 +00:00
model_helper , optimizer , rnn_cell , workspace
2016-10-07 20:08:53 +00:00
from caffe2 . python . test_util import TestCase
2017-06-29 23:52:01 +00:00
2016-10-07 20:08:53 +00:00
2017-07-06 16:04:29 +00:00
dyndep . InitOpsLibrary ( " @/caffe2/caffe2/distributed:file_store_handler_ops " )
class TemporaryDirectory :
def __enter__ ( self ) :
self . tmpdir = tempfile . mkdtemp ( )
return self . tmpdir
def __exit__ ( self , type , value , traceback ) :
shutil . rmtree ( self . tmpdir )
2017-08-16 02:04:12 +00:00
# Note(jiayq): we are yet to find out why Travis gives out an error in gloo
# like:
# RuntimeError: [enforce fail at /home/travis/build/caffe2/caffe2/third_party/gloo/gloo/transport/tcp/device.cc:113] ifa != nullptr. Unable to find interface for: [127.0.1.1]
# See for example https://travis-ci.org/caffe2/caffe2/jobs/262433866
# As a result, we will check if this is travis, and if yes, disable it.
@unittest.skipIf ( os . environ . get ( " TRAVIS " ) , " DPMTest has a known issue with Travis. " )
2017-06-21 06:10:51 +00:00
class DataParallelModelTest ( TestCase ) :
2016-10-07 20:08:53 +00:00
2017-06-21 06:10:51 +00:00
def run_model ( self , devices , gpu ) :
2016-11-30 23:06:32 +00:00
'''
Helper function for test_equiv
'''
2016-11-14 22:58:04 +00:00
def input_builder_fun ( model ) :
return None
2016-10-07 20:08:53 +00:00
2017-02-03 15:37:59 +00:00
def model_build_fun ( model , loss_scale ) :
2016-11-30 23:06:32 +00:00
fc = model . FC ( " data " , " fc " , 16 , 1 ,
( " ConstantFill " , { } ) , ( " ConstantFill " , { } ) )
fc_fl = model . FlattenToVec ( fc , " fc_fl " )
sigm = model . Sigmoid ( fc_fl , " sigm " )
sq = model . SquaredL2Distance ( [ sigm , " label " ] , " sq " )
2016-11-14 22:58:04 +00:00
loss = model . AveragedLoss ( sq , " loss " )
2017-02-03 15:37:59 +00:00
loss = model . Scale ( loss , scale = loss_scale )
2017-09-12 17:20:22 +00:00
# For testing explicit sync
model . param_init_net . UniformFill ( [ ] , [ " sync_num " ] , shape = [ 1 ] )
2016-11-14 22:58:04 +00:00
return [ loss ]
2016-10-07 20:08:53 +00:00
2017-05-30 19:44:50 +00:00
def add_optimizer ( model ) :
2017-08-24 17:10:58 +00:00
return optimizer . build_sgd (
2017-08-26 01:53:20 +00:00
model ,
0.1 ,
policy = " fixed " ,
max_gradient_norm = 5.0 ,
allow_lr_injection = True ,
2017-08-24 17:10:58 +00:00
)
2016-11-14 22:58:04 +00:00
2016-11-30 23:06:32 +00:00
workspace . ResetWorkspace ( )
model = cnn . CNNModelHelper (
order = " NHWC " ,
2017-06-21 06:10:51 +00:00
name = " test {} " . format ( devices ) ,
2016-11-30 23:06:32 +00:00
)
2017-06-21 06:10:51 +00:00
data_parallel_model . Parallelize (
2016-11-14 22:58:04 +00:00
model ,
input_builder_fun = input_builder_fun ,
forward_pass_builder_fun = model_build_fun ,
2017-05-30 19:44:50 +00:00
optimizer_builder_fun = add_optimizer ,
2017-06-21 06:10:51 +00:00
devices = devices ,
cpu_device = not gpu ,
2017-09-15 23:05:58 +00:00
shared_model = not gpu ,
2018-01-24 21:10:56 +00:00
combine_spatial_bn = not gpu ,
2016-11-14 22:58:04 +00:00
)
2017-09-12 17:20:22 +00:00
data_parallel_model . AddBlobSync ( model , [ " sync_num " ] )
2016-11-14 22:58:04 +00:00
2017-10-13 00:17:29 +00:00
# Light test for LR names
lr_names = data_parallel_model . GetLearningRateBlobNames ( model )
self . assertGreater ( len ( lr_names ) , 0 )
2016-11-30 23:06:32 +00:00
np . random . seed ( 2603 )
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range ( 0 , 10 ) :
full_data = np . random . rand ( batch_size , 16 )
full_labels = np . round ( full_data [ : , 0 ] )
2017-06-21 06:10:51 +00:00
batch_per_device = batch_size / / len ( devices )
2016-11-30 23:06:32 +00:00
2017-06-21 06:10:51 +00:00
for ( j , g ) in enumerate ( devices ) :
2016-11-30 23:06:32 +00:00
st = j * batch_per_device
en = st + batch_per_device
data = full_data [ st : en , : ] . astype ( np . float32 )
labels = full_labels [ st : en ] . astype ( np . float32 )
2017-06-21 06:10:51 +00:00
with core . DeviceScope ( core . DeviceOption ( model . _device_type , g ) ) :
workspace . FeedBlob (
" {} _ {} /data " . format ( model . _device_prefix , g ) , data
)
workspace . FeedBlob (
" {} _ {} /label " . format ( model . _device_prefix , g ) , labels
)
2016-11-30 23:06:32 +00:00
if i == 0 :
workspace . RunNetOnce ( model . param_init_net )
workspace . CreateNet ( model . net )
2017-09-12 17:20:22 +00:00
workspace . FeedBlob (
model . _device_prefix + " _0/sync_num " ,
np . array ( [ i * 2 ] ) . astype ( np . float32 ) ,
device_option = core . DeviceOption ( model . _device_type , 0 ) )
2016-11-30 23:06:32 +00:00
workspace . RunNet ( model . net . Proto ( ) . name )
2017-09-12 17:20:22 +00:00
# Test AddBlobSync
for j in model . _devices :
sync = workspace . FetchBlob (
model . _device_prefix + " _ {} /sync_num " . format ( j ) ) [ 0 ]
self . assertTrue ( abs ( sync - i * 2 ) < 0.01 )
2017-06-21 06:10:51 +00:00
return workspace . FetchBlob ( " {} _0/fc_w " . format ( model . _device_prefix ) )
2016-11-30 23:06:32 +00:00
2017-07-06 16:04:29 +00:00
def run_test_locally ( self , fn , device_option = None , * * kwargs ) :
# Queue for assertion errors on subprocesses
queue = Queue ( )
# Capture any exception thrown by the subprocess
def run_fn ( * args , * * kwargs ) :
try :
2017-07-12 17:38:51 +00:00
if device_option is None :
2017-07-06 16:04:29 +00:00
fn ( * args , * * kwargs )
workspace . ResetWorkspace ( )
2017-07-12 17:38:51 +00:00
else :
with core . DeviceScope ( device_option ) :
fn ( * args , * * kwargs )
workspace . ResetWorkspace ( )
2017-07-06 16:04:29 +00:00
except Exception as ex :
queue . put ( ex )
# Start N processes in the background
procs = [ ]
for i in range ( kwargs [ ' comm_size ' ] ) :
kwargs [ ' comm_rank ' ] = i
proc = Process (
target = run_fn ,
kwargs = kwargs )
proc . start ( )
procs . append ( proc )
# Test complete, join background processes
while len ( procs ) > 0 :
proc = procs . pop ( 0 )
while proc . is_alive ( ) :
proc . join ( 1 )
# Raise exception if we find any.
# Note that the following is executed ALSO after
# the last process was joined, so if ANY exception
# was raised, it will be re-raised here.
if not queue . empty ( ) :
raise queue . get ( )
2016-11-30 23:06:32 +00:00
def test_equiv ( self ) :
'''
Test that the model produces exactly same results given
total batchsize , independent of number of GPUs .
'''
2017-06-21 06:10:51 +00:00
for gpu in [ True , False ] :
2017-07-05 21:19:40 +00:00
if gpu and ( not workspace . has_gpu_support or
workspace . NumCudaDevices ( ) < 2 ) :
2017-06-21 06:10:51 +00:00
continue
result_2gpus = self . run_model ( [ 0 , 1 ] , gpu = gpu )
result_1gpus = self . run_model ( [ 0 ] , gpu = gpu )
2016-11-30 23:06:32 +00:00
2017-06-21 06:10:51 +00:00
self . assertTrue ( np . allclose ( result_1gpus , result_2gpus ) )
2016-11-30 23:06:32 +00:00
2017-06-21 06:10:51 +00:00
if not gpu or workspace . NumCudaDevices ( ) > = 4 :
result_4gpus = self . run_model ( list ( range ( 4 ) ) , gpu = gpu )
self . assertTrue ( np . allclose ( result_1gpus , result_4gpus ) )
2016-11-30 23:06:32 +00:00
2017-06-21 06:10:51 +00:00
if not gpu or workspace . NumCudaDevices ( ) > = 8 :
result_8gpus = self . run_model ( list ( range ( 8 ) ) , gpu = gpu )
self . assertTrue ( np . allclose ( result_1gpus , result_8gpus ) )
2017-03-09 21:41:36 +00:00
2017-09-07 06:54:20 +00:00
if not gpu or workspace . NumCudaDevices ( ) > = 16 :
result_16gpus = self . run_model ( list ( range ( 16 ) ) , gpu = gpu )
self . assertTrue ( np . allclose ( result_1gpus , result_16gpus ) )
2017-05-19 19:46:18 +00:00
def test_checkpoint_params ( self ) :
def add_input_ops ( model ) :
pass
2017-05-20 16:04:37 +00:00
def add_model_ops ( model , loss_scale ) :
2017-05-19 19:46:18 +00:00
model . NHWC2NCHW ( " data " , " data_nchw " )
model . Conv ( " data_nchw " , ' conv1 ' , 3 , 64 ,
weight_init = ( " MSRAFill " , { } ) , kernel = 7 ,
stride = 2 , pad = 3 , no_bias = 0 )
2017-09-26 06:32:11 +00:00
model . SpatialBN ( ' conv1 ' , ' conv1_spatbn_relu ' , 64 , epsilon = 1e-3 , is_test = False )
2017-05-19 19:46:18 +00:00
model . Relu ( ' conv1_spatbn_relu ' , ' conv1_spatbn_relu ' )
model . MaxPool ( ' conv1_spatbn_relu ' , ' pool1 ' , kernel = 3 , stride = 2 )
model . FC ( ' pool1 ' , ' fc ' , dim_in = ( 64 * 56 * 56 ) , dim_out = 100 )
model . Sigmoid ( ' fc ' , ' fc_sigm ' )
model . Softmax ( ' fc_sigm ' , ' softmax ' )
model . LabelCrossEntropy ( [ ' softmax ' , ' label ' ] , ' xent ' )
loss = model . AveragedLoss ( ' xent ' , ' loss ' )
2017-05-20 16:04:37 +00:00
# Add a duplicate param init to ensure it does not cause issues
model . param_init_net . ConstantFill (
[ ] , [ " fc_w " ] , shape = ( ( 64 * 56 * 56 ) , 1000 )
)
return [ loss ]
2017-05-19 19:46:18 +00:00
2017-05-30 19:44:50 +00:00
def add_optimizer ( model ) :
optimizer . build_sgd ( model , 0.1 , policy = " fixed " , momentum = 0.9 )
2017-05-19 19:46:18 +00:00
2017-05-20 16:04:37 +00:00
model = cnn . CNNModelHelper (
order = " NHWC " ,
name = " test " ,
)
2017-06-21 06:10:51 +00:00
data_parallel_model . Parallelize_CPU (
2017-05-20 16:04:37 +00:00
model ,
input_builder_fun = add_input_ops ,
forward_pass_builder_fun = add_model_ops ,
2017-05-30 19:44:50 +00:00
optimizer_builder_fun = add_optimizer ,
2017-05-20 16:04:37 +00:00
devices = [ 1 , 2 , 3 ] ,
)
# Only gpu_1 params should be returned (gpu_1 is the first gpu)
checkpoint_params = data_parallel_model . GetCheckpointParams ( model )
2017-06-21 06:10:51 +00:00
for p in model . GetParams ( " cpu_1/ " ) :
2017-05-20 16:04:37 +00:00
self . assertTrue ( p in checkpoint_params )
self . assertTrue ( p + " _momentum " in checkpoint_params )
2017-06-21 06:10:51 +00:00
for p in model . GetParams ( " cpu_2/ " ) :
2017-05-20 16:04:37 +00:00
self . assertFalse ( p in checkpoint_params )
2017-05-30 19:44:50 +00:00
self . assertTrue (
2017-06-21 06:10:51 +00:00
core . BlobReference ( " cpu_1/fc_w_momentum " ) in checkpoint_params )
for c in model . GetComputedParams ( " cpu_1/ " ) :
2017-05-20 16:04:37 +00:00
self . assertTrue ( c in checkpoint_params )
2017-06-21 06:10:51 +00:00
for c in model . GetComputedParams ( " cpu_2/ " ) :
2017-05-20 16:04:37 +00:00
self . assertFalse ( c in checkpoint_params )
2017-06-21 06:10:51 +00:00
self . assertFalse ( core . BlobReference ( " cpu_1/data " ) in checkpoint_params )
2017-05-30 19:44:50 +00:00
self . assertTrue ( core . BlobReference ( " optimizer_iteration " ) in checkpoint_params )
2017-05-19 19:46:18 +00:00
2017-07-27 17:53:51 +00:00
def test_net_conversion_and_append_net ( self ) :
other = model_helper . ModelHelper ( )
fc1 = brew . fc ( other , " data " , " other_fc1 " , dim_in = 3 * 227 * 227 , dim_out = 10 )
fc2 = brew . fc ( other , fc1 , " other_fc2 " , dim_in = 10 , dim_out = 10 )
brew . fc ( other , fc2 , " other_fc3 " , dim_in = 10 , dim_out = 10 )
def add_input_ops ( model ) :
model . net . UniformFill ( [ ] , [ " data " ] , shape = [ 4 , 227 , 227 , 3 ] )
model . net . UniformFill ( [ ] , [ " label " ] , shape = [ 4 ] )
def add_model_ops ( model , loss_scale ) :
model . NHWC2NCHW ( " data " , " data_nchw " )
model . Conv ( " data_nchw " , ' conv1 ' , 3 , 64 ,
weight_init = ( " MSRAFill " , { } ) , kernel = 7 ,
stride = 2 , pad = 3 , no_bias = 0 )
2017-09-26 06:32:11 +00:00
model . SpatialBN ( ' conv1 ' , ' conv1_spatbn_relu ' , 64 , epsilon = 1e-3 , is_test = False )
2017-07-27 17:53:51 +00:00
model . Relu ( ' conv1_spatbn_relu ' , ' conv1_spatbn_relu ' )
model . MaxPool ( ' conv1_spatbn_relu ' , ' pool1 ' , kernel = 3 , stride = 2 )
model . FC ( ' pool1 ' , ' fc ' , dim_in = ( 64 * 56 * 56 ) , dim_out = 10 )
# Append the net and param_init_net of the other model
appendnet = data_parallel_model . ConvertNetForDevice ( other . net )
model . net . AppendNet ( appendnet )
model . param_init_net . AppendNet (
data_parallel_model . ConvertNetForDevice ( other . param_init_net ) )
model . Sigmoid ( ' fc ' , ' fc_sigm ' )
model . Softmax ( ' fc_sigm ' , ' softmax ' )
loss = model . AveragedLoss ( ' softmax ' , ' loss ' )
return [ loss ]
def add_optimizer ( model ) :
optimizer . build_sgd ( model , 0.1 , policy = " fixed " , momentum = 0.9 )
model = cnn . CNNModelHelper (
order = " NCHW " ,
name = " test " ,
)
data_parallel_model . Parallelize_CPU (
model ,
input_builder_fun = add_input_ops ,
forward_pass_builder_fun = add_model_ops ,
optimizer_builder_fun = add_optimizer ,
devices = range ( 4 )
)
# Just create and run net and confirm no exception is thrown
workspace . RunNetOnce ( model . param_init_net )
workspace . CreateNet ( model . net )
workspace . RunNet ( model . net )
2017-07-06 16:04:29 +00:00
def test_synchronization_barrier ( self ) :
def run ( comm_rank , comm_size , tmpdir ) :
def add_input_ops ( model ) :
pass
def add_model_ops ( model , loss_scale ) :
return [ ]
def add_optimizer ( model ) :
pass
store_handler = " store_handler "
workspace . RunOperatorOnce (
core . CreateOperator (
" FileStoreHandlerCreate " ,
[ ] ,
[ store_handler ] ,
path = tmpdir ) )
rendezvous = dict (
kv_handler = store_handler ,
shard_id = comm_rank ,
num_shards = comm_size ,
engine = ' GLOO ' ,
)
model = cnn . CNNModelHelper (
order = " NHWC " ,
name = " test " ,
)
data_parallel_model . Parallelize_CPU (
model ,
input_builder_fun = add_input_ops ,
forward_pass_builder_fun = add_model_ops ,
optimizer_builder_fun = add_optimizer ,
devices = [ 1 , 2 , 3 ] ,
rendezvous = rendezvous
)
data_parallel_model . RunInitNet ( model )
for _ in range ( 2 ) :
data_parallel_model . Synchronize ( model )
with TemporaryDirectory ( ) as tmpdir :
self . run_test_locally (
run ,
comm_size = 2 ,
2017-07-12 17:38:51 +00:00
device_option = None ,
2017-07-06 16:04:29 +00:00
tmpdir = tmpdir )
[Caffe2][fbcode=>GH sync] Update from facebook 4323b18ce13c (#7116)
* [fix] Re-enable events in RNN ops
We have earlier added event disabling in RNN ops as back then we didn't use
events, with current use cases this is no longer true
(https://fburl.com/8vd0lp8y)
* use ops with cude impl
* Revert D7729695: [caffe2][fix] Re-enable events in RNN ops
This reverts commit 4b215c7496fb724656ff4c776933a15bdbbcde5e
@bypass-lint
An infra SEV is better than not reverting this diff.
If you copy this password, see you in SEV Review!
@cause_a_sev_many_files
* [observer] Clean up observer_config.h
#accept2ship
* [1/n] Refactor dataio_test.py
Replace code duplication with a common function
* Add barrier net that runs before training nets
Add a synchonize barrier net that is run before training nets. With this net, shards that are faster will wait for other shards before start training. This reduce chances of the faster shards timing out during GLOO AllReduce.
Removed explicit data_parallel_model.py.synchronize call in holmes workflow. Similar change in speech/asr_training workflow will come in another diff.
* Support the dnnlowp backend in caffe2_benchmark
This is for SHARE operator latency evaluation
* Migrate integral_image_op to main caffe2
migrate integral_image_op(GPU version) given by https://fburl.com/yvqezigi
to caffe2/caffe2/operators and implement its CPU version. Write up a test
using the hypothesis_test mechanism
* [pos_disc, fbcode] Implement unjoined lr loss
As explained in https://our.intern.facebook.com/intern/wiki/Model_Based_Calibration/, when the dataset is an joined data set, where labels might change later, we need to use unjoined logloss.
The implementation is almost the same as in Sigrid (https://fburl.com/1trngsls), where
loss = y (log(p) - log(1-p)) + (1-y)(log(1-p)) = xy - (1-y)x - (1-y)log(1+exp(-x))
For x < 0, to ensure stability and avoid overflow, we reformulate the above exp as
loss = xy - (1-y)x - (1-y)x + (1-y)log(1+exp(x)) = xy + (1-y)log(1+exp(x))
Then the final expression becomes
loss = xy + (y - 1) x (x >= 0) - (1 - y) log(1 + exp(x - 2 x (x >= 0)))
where y is the true label, x is the dot product and p = logistic(x).
This kind of implementation is align with the current implementation of the original cross entropy in
https://phabricator.intern.facebook.com/diffusion/FBS/browse/master/fbcode/caffe2/caffe2/operators/cross_entropy_op.cc;0bae3b5d0f825897c5e0dd0ff10f489d7271bf25$7-13
* Keep the array to fix the conflict
* [C2] Compute Adagrad effective LR
The AdagradWithLR op outputs an extra blob which is contains the average effective learning rate across all weights in this blob.
* Open-source extractMetaNetDef & runGlobalInitialization, add new Predictor constructor from db file, and add run_map_outputs
1. Open-source extractMetaNetDef and runGlobalInitialization, for use in
2. new Predictor constructor from db file.
3. Add new run function that returns outputs as TensorMap
* Disable eigen cpu
Disable eigen cpu in transpose and reduce
* Introduce request_only/object_only property of ModelLayer
by default this is False
* A simple TC Caffe2 benchmark
We can run tunner, get MappingOptions and then use them to
compare against cuBLAS
currently broken due to LLVM issues. How to run:
hg checkout eec1ab31b59c03b8deded1c755a9abaf8c45be01
add D7401202
add D7434625
add D7506031
add D7540728
buck run @mode/dev-nosan tc/tc/benchmarks_python:caffe2_benchmark
* Move Caffe2 feature_maps_ops to open source
Need feature maps operators in open source project facebookresearch/BlueWhale
* Manually fix the conflicts in channel shuffle op
* Fix the inconsistency between different gh and fbcode
* Skip Adagrad GPU Test (Because some gpu implementation is missing)
* Fix another test to make sure it won't run on gpu when implementation is not available yet
2018-05-02 03:49:00 +00:00
def test_pre_train_synchronization_barrier ( self ) :
def run ( comm_rank , comm_size , tmpdir ) :
def add_input_ops ( model ) :
pass
def add_model_ops ( model , loss_scale ) :
return [ ]
def add_optimizer ( model ) :
pass
workspace . ResetWorkspace ( )
store_handler = " store_handler "
workspace . RunOperatorOnce (
core . CreateOperator (
" FileStoreHandlerCreate " ,
[ ] ,
[ store_handler ] ,
path = tmpdir ) )
rendezvous = dict (
kv_handler = store_handler ,
shard_id = comm_rank ,
num_shards = comm_size ,
engine = ' GLOO ' ,
)
model = cnn . CNNModelHelper (
order = " NHWC " ,
name = " test " ,
)
# Set network timeout to 2 seconds, and add a 3 seconds
# sleep for 1 host. Make sure there is no timeout on the
# second RunNet.
Update from facebook (#7451)
* [bootcamp] Improve "Shape" operator to support axes specification
To improve .shape operator of Caffe2 to support x.shape(tensor, axes), which takes an optional int array "axes" as input. For example, x.shape(tensor, [1, 0]) will return the dimension for axis 1 and 0 following the specified order. For current version, "axes" input allows duplications and can have arbitrary length.
* Back out "Add barrier net that runs before training nets"
Original commit changeset: b373fdc9c30f. Need additional changes to some callers to support barrier failures.
* Change warning to verbose log to reduce log spam
The `LOG(WARNING)` was a bit spammy for regular use so lets just make it a `VLOG`.
* Extract the shared code from different caffe2_benchmark binaries
The OSS benchmark and Internal benchmark will share most functions in the benchmark.
* Support MFR in sequence training
As titled.
* Make knowledge distillation work with using logged prediction feature as teacher label.
1) Add loading raw dense feature as teacher label.
2) Optional calibration function for teacher label
3) Add teacher label into generic unit test
4) Deprecated TTSN workflow version using feature_options to config teacher label
* [C2/CUDA]: unjoined cross entropy sigmoid
as desc
* Add async_scheduling executor into deferrable_net_exec_test
Add async_scheduling into tests and fix some exception cases
* Fix Event disabled error
When disabling event in RNN ops make sure we don't call Finish on disabled
event from op's RunAsync
* cuda ensure cpu output op can handle both TensorCPU and TensorCUDA
as desc.
* [C2 Core] Infer input device option in C2 hypothesis_test checkers
Improve how we default input blob device options.
Previously it defaults as where op lives but it is not necessarily the case.
For example:
CopyCPUToGPU
* [C2 Op]SplitByLengthsOp CPU/GPU implementation
[C2 Op]SplitByLengthsOp CPU/GPU implementation
* fix undefined symbol error
not sure why we're getting undefined symbol even with link_whole = True
Need to figure out why but need this workaround for now
* Add tools in DAIPlayground platform to help debugging models
Add additional tools to allow Plauground override individual method defined in AnyExp. This will allow user to create module that specificly change certain default method behavior. An example included in this diff is deactivating test model and checkpointing. When debugging any model problems, switching off components helps me quickly narrow down the location of the bug. The technique is extensively used in task T27038712 (Steady memory increase in EDPM, eventually resulting in gloo/cuda.cu:34: out of memory)
* add shape and type inference for int8 conversion operator
* Fix flaky test for group_norm
Fix flaky test for group_norm
* Fix group_norm_op_test flaky
Fix group_norm_op_test flaky
* Implementation of composite learning rate policy
In many state-of-the-arts deep learning works, people use a simple trick to
schedule the learning rate: use a fixed learning rate until error plateaus
and then switch to a different fixed learning rate, and so on. In this diff,
we implemented a simple version of the composite learning rate. The user gives
a set of learning rates policies and corresponding iteration nums, and the
optimizer will change the learning rate policy based on the number of iterations so far.
For example, the user give two learning rate policies, one is FixedLearningRate
and PolyLearningRate, with an iteration number of 1k. Then the first 1k iteration,
we use FixedLearningRate. For the following iterations, we use PolyLearningRate.
* Split two use cases of CachedReader into two classes, DBFileReader and CachedReader
# Use Cases:
1). input: DB file -> output: DatasetReader.
Use DBFileReader.
2). input: Reader -> build cache DB file -> output: DatasetReader.
Use CachedReader.
# Changes to CachedReader:
1). Move db_path to the constructor.
Because in mock reader. cache will always be built ahead.
# Changes to tests:
1). Make a separate TestCase class for CachedReader and DBFileReader.
2). Make it possible to add more test functions by adding setUp, tearDown and _make_temp_path.
3). Make delete db_path more general. `db_path` could be a file for `log_file_db`, but could also be a directory for `leveldb`.
* Back out "On Mobile phones, call GlobalInit with no arguments in predictor in case we need to perform initialization"
Original commit changeset: 4489c6133f11
* Fix LARS bug
Fixed a bug in the LARS implementation which caused all subsequent blobs not using LARS to have the LARS learning rate multiplier applied to them.
* [tum] support sparse init & add uniformFill option
as title
* Propagate exception for async nets
Capture the exception when an exception is thrown in async nets and re-throw it after wait(). This allows exceptions to be propagated up to the caller.
This diff was a part of D7752068. We split the diff so that C2 core files changes are in a separate diff.
* Automatic update of fbcode/onnx to 69894f207dfcd72d1e70497d387201cec327efbc
Previous import was 403ccfbd0161c38f0834413d790bad0874afbf9a
Included changes:
- **[69894f2](https://github.com/onnx/onnx/commit/69894f2)**: Use op schema.all tensor types in random like definitions (#865) <Scott McKay>
- **[b9d6b90](https://github.com/onnx/onnx/commit/b9d6b90)**: Clarify random like operators (#846) <Scott McKay>
- **[fc6b5fb](https://github.com/onnx/onnx/commit/fc6b5fb)**: Refactor shape inference implementation (#855) <anderspapitto>
- **[b7d8dc8](https://github.com/onnx/onnx/commit/b7d8dc8)**: fix cmake warning message (#863) <Eric S. Yu>
- **[f585c5d](https://github.com/onnx/onnx/commit/f585c5d)**: add pytorch-operator test for tile (#831) <Wenhao Hu>
- **[993fe70](https://github.com/onnx/onnx/commit/993fe70)**: add install step (#832) <Eric S. Yu>
- **[68bc26c](https://github.com/onnx/onnx/commit/68bc26c)**: add type inference for traditional ml ops except classifier ops. (#857) <Ke Zhang>
- **[9cc0cda](https://github.com/onnx/onnx/commit/9cc0cda)**: fix string representation of scalar types (#858) <G. Ramalingam>
- **[1078925](https://github.com/onnx/onnx/commit/1078925)**: fix y in pow test case to scalar (#852) <Wenhao Hu>
- **[c66fb6f](https://github.com/onnx/onnx/commit/c66fb6f)**: Add some math function shape inference (#845) <anderspapitto>
- **[ff667d1](https://github.com/onnx/onnx/commit/ff667d1)**: Refactor return type and docs for ONNXIFI_BACKEND_DIRECTX_ID (#853) <Marat Dukhan>
- **[11c6876](https://github.com/onnx/onnx/commit/11c6876)**: clear initializer names when clear initializer (#849) <Wenhao Hu>
- **[73c34ae](https://github.com/onnx/onnx/commit/73c34ae)**: Clarify FeatureVectorizer description. (#843) <Scott McKay>
- **[1befb9b](https://github.com/onnx/onnx/commit/1befb9b)**: Remove useless text in docs (#850) <Lu Fang>
- **[e84788f](https://github.com/onnx/onnx/commit/e84788f)**: Fix SELU attributes' default values (#839) <Lu Fang>
- **[ebac046](https://github.com/onnx/onnx/commit/ebac046)**: Add tile test case (#823) <Wenhao Hu>
- **[8b7a925](https://github.com/onnx/onnx/commit/8b7a925)**: a few more shape inference functions (#772) <anderspapitto>
- **[9718f42](https://github.com/onnx/onnx/commit/9718f42)**: Make the coefficient non optional for LinearClassifier (#836) <Jaliya Ekanayake>
- **[ef083d0](https://github.com/onnx/onnx/commit/ef083d0)**: Add save_tensor and load_tensor functions for Protos (#770) <Lu Fang>
- **[45ceb55](https://github.com/onnx/onnx/commit/45ceb55)**: Check if CMAKE_BUILD_TYPE set before project(). (#812) <Sergii Dymchenko>
- **[4b3d2b0](https://github.com/onnx/onnx/commit/4b3d2b0)**: [WIP] reenable shape inference tests (#834) <anderspapitto>
- **[22d17ee](https://github.com/onnx/onnx/commit/22d17ee)**: RNN tests: LSTM, GRU, SimpleRNN (#739) <Peyman Manikashani>
- **[de65b95](https://github.com/onnx/onnx/commit/de65b95)**: dimension denotation (#443) <Tian Jin>
- **[eccc76e](https://github.com/onnx/onnx/commit/eccc76e)**: fix field number issue in onnx operator proto and enable its build (#829) <Ke Zhang>
- **[d582beb](https://github.com/onnx/onnx/commit/d582beb)**: disable shape inference test to unbreak ci (#830) <Lu Fang>
- **[485b787](https://github.com/onnx/onnx/commit/485b787)**: function proto for composite op. (#802) <Ke Zhang>
- **[cd58928](https://github.com/onnx/onnx/commit/cd58928)**: specify defaults for attributes of Affine op (#820) <G. Ramalingam>
- **[7ee2cf9](https://github.com/onnx/onnx/commit/7ee2cf9)**: merge the dummy backend back into the main one (#743) <anderspapitto>
- **[1c03a5a](https://github.com/onnx/onnx/commit/1c03a5a)**: [Proposal] ONNX Interface for Framework Integration (previously ONNX Backend API) header and docs (#551) <Marat Dukhan>
- **[3769a98](https://github.com/onnx/onnx/commit/3769a98)**: Rename real model test case from VGG-16 to ZFNet (#821) <Lu Fang>
* [C2]ReluN Op
relu n op.
tf reference: https://www.tensorflow.org/api_docs/python/tf/nn/relu6
* Call destructor when assigning a blob value
* Add executor overrides
Add executor overrides flag to enable migration to async_scheduling executor
* Add barrier net that runs before training nets - attempt #2
Add a synchonize barrier net that is run before training nets. With this net, shards that are faster will wait for other shards before start training. This reduce chances of the faster shards timing out during GLOO AllReduce.
Removed explicit data_parallel_model.py.synchronize call in holmes workflow.
This change was landed previously but caused errors for some EDPM workflows - See https://fb.facebook.com/groups/1426530000692545/permalink/1906766366002237/ - because EDPM assumes any call to CreateOrCloneCommonWorld and Gloo ops are wrapped in exception handlers but in this case exception thrown in the barrier init net is not handled.
To address this issue, we add _CreateOrCloneCommonWorld to the param_init_net instead of a new barrier init net. Since errors for param_init_net run is handled gracefully and re-rendezvous, it should fixes the problem.
* Handle empty nets in async_scheduling
Make sure we don't get stuck on empty nets
* use CUDA_ARCH for conditional compile
* [C2 fix] infer function for ensure_cpu_output_op
* Update group_norm test to reduce flaky test
* Fix lr_multiplier for GPU
2018-05-11 06:14:27 +00:00
data_parallel_model . _DEFAULT_TIMEOUT_SEC = 2
[Caffe2][fbcode=>GH sync] Update from facebook 4323b18ce13c (#7116)
* [fix] Re-enable events in RNN ops
We have earlier added event disabling in RNN ops as back then we didn't use
events, with current use cases this is no longer true
(https://fburl.com/8vd0lp8y)
* use ops with cude impl
* Revert D7729695: [caffe2][fix] Re-enable events in RNN ops
This reverts commit 4b215c7496fb724656ff4c776933a15bdbbcde5e
@bypass-lint
An infra SEV is better than not reverting this diff.
If you copy this password, see you in SEV Review!
@cause_a_sev_many_files
* [observer] Clean up observer_config.h
#accept2ship
* [1/n] Refactor dataio_test.py
Replace code duplication with a common function
* Add barrier net that runs before training nets
Add a synchonize barrier net that is run before training nets. With this net, shards that are faster will wait for other shards before start training. This reduce chances of the faster shards timing out during GLOO AllReduce.
Removed explicit data_parallel_model.py.synchronize call in holmes workflow. Similar change in speech/asr_training workflow will come in another diff.
* Support the dnnlowp backend in caffe2_benchmark
This is for SHARE operator latency evaluation
* Migrate integral_image_op to main caffe2
migrate integral_image_op(GPU version) given by https://fburl.com/yvqezigi
to caffe2/caffe2/operators and implement its CPU version. Write up a test
using the hypothesis_test mechanism
* [pos_disc, fbcode] Implement unjoined lr loss
As explained in https://our.intern.facebook.com/intern/wiki/Model_Based_Calibration/, when the dataset is an joined data set, where labels might change later, we need to use unjoined logloss.
The implementation is almost the same as in Sigrid (https://fburl.com/1trngsls), where
loss = y (log(p) - log(1-p)) + (1-y)(log(1-p)) = xy - (1-y)x - (1-y)log(1+exp(-x))
For x < 0, to ensure stability and avoid overflow, we reformulate the above exp as
loss = xy - (1-y)x - (1-y)x + (1-y)log(1+exp(x)) = xy + (1-y)log(1+exp(x))
Then the final expression becomes
loss = xy + (y - 1) x (x >= 0) - (1 - y) log(1 + exp(x - 2 x (x >= 0)))
where y is the true label, x is the dot product and p = logistic(x).
This kind of implementation is align with the current implementation of the original cross entropy in
https://phabricator.intern.facebook.com/diffusion/FBS/browse/master/fbcode/caffe2/caffe2/operators/cross_entropy_op.cc;0bae3b5d0f825897c5e0dd0ff10f489d7271bf25$7-13
* Keep the array to fix the conflict
* [C2] Compute Adagrad effective LR
The AdagradWithLR op outputs an extra blob which is contains the average effective learning rate across all weights in this blob.
* Open-source extractMetaNetDef & runGlobalInitialization, add new Predictor constructor from db file, and add run_map_outputs
1. Open-source extractMetaNetDef and runGlobalInitialization, for use in
2. new Predictor constructor from db file.
3. Add new run function that returns outputs as TensorMap
* Disable eigen cpu
Disable eigen cpu in transpose and reduce
* Introduce request_only/object_only property of ModelLayer
by default this is False
* A simple TC Caffe2 benchmark
We can run tunner, get MappingOptions and then use them to
compare against cuBLAS
currently broken due to LLVM issues. How to run:
hg checkout eec1ab31b59c03b8deded1c755a9abaf8c45be01
add D7401202
add D7434625
add D7506031
add D7540728
buck run @mode/dev-nosan tc/tc/benchmarks_python:caffe2_benchmark
* Move Caffe2 feature_maps_ops to open source
Need feature maps operators in open source project facebookresearch/BlueWhale
* Manually fix the conflicts in channel shuffle op
* Fix the inconsistency between different gh and fbcode
* Skip Adagrad GPU Test (Because some gpu implementation is missing)
* Fix another test to make sure it won't run on gpu when implementation is not available yet
2018-05-02 03:49:00 +00:00
data_parallel_model . Parallelize_CPU (
model ,
input_builder_fun = add_input_ops ,
forward_pass_builder_fun = add_model_ops ,
optimizer_builder_fun = add_optimizer ,
devices = [ 1 , 2 , 3 ] ,
rendezvous = rendezvous ,
barrier_net_timeout_sec = 5
)
data_parallel_model . RunInitNet ( model )
data_parallel_model . RunNet ( model , 2 )
if comm_rank == 0 :
time . sleep ( data_parallel_model . _DEFAULT_TIMEOUT_SEC )
data_parallel_model . RunNet ( model , 2 )
with TemporaryDirectory ( ) as tmpdir :
self . run_test_locally (
run ,
comm_size = 2 ,
device_option = None ,
tmpdir = tmpdir )
2017-07-12 17:38:51 +00:00
def test_device_scope_check ( self ) :
2017-09-13 18:26:41 +00:00
with self . assertRaises ( AssertionError ) :
2018-11-29 21:58:11 +00:00
with core . DeviceScope ( core . DeviceOption ( workspace . GpuDeviceType , 0 ) ) :
2017-09-13 18:26:41 +00:00
data_parallel_model . Parallelize_GPU ( None , None , None )
2017-07-12 17:38:51 +00:00
[Caffe2] Changes done inside Facebook (#6378)
* fix unit test for sqrt op
From the error logging:
[idx, grad, grad_estimate] are:
[[ 146. 0.5 0.45776367]
[ 147. 0.5 0.45776367]
The gradient == 0.5 is correct, which means the SqrtOp and its gradient is doing right job. (Because y = sqrt(x), loss = y^2/2 = x/2, and then d(loss)/dx = 1/2 = 0.5; )
The test failed because of numerical problem of grad_estimate (in unit test). It can be because the step_size is small, and float precision is not high (when there are multiple elements in the tensor, we do sum(y^2) to compute loss)
This diff
- increase the step size, and also move the test cases to be further away from 0 (where sqrt(x) is not well defined) to be safe :)
- also clean up, and merge the test case for inplace Vs. non-inplace
Tested with:
`CAFFE2_HYPOTHESIS_PROFILE=debug ai_bt caffe2/caffe2/python/operator_test:elementwise_ops_test -- "test_sqrt"`
* CompositeReader & CompositeReaderBuilder
A new type of reader gluing multiple readers together.
* Back out "Revert D7394363: [GanH]: Log D Trick for Cross Entropy with Sigmoid"
Original commit changeset: 9325a4356dbe
* [dai][WIP] convert params to int8 on ps before sending to trainer
Add float->uint8 conversion in addition to float->fp16 conversion in model_saver.
* [easy] improve unit test for sparse length sum ops
as desc.
#accept2ship
* Update GitHub upstream to 771fcb3455cbfe69c2abcc4cb3bd7ef92d59af24
* move sparse hash unique ops to OOS and add unit tests
- move the SparseHash version to OOS, since 'sparsehash' is already deps of caffe2 OOS: https://fburl.com/arssw4n1
- The 'SparseHash' engine is also being used in OOS, so the SparseHash version shall be in OOS to reduce confusion: https://fburl.com/o5ea7ah2
- fix the CUDA UniqueOp for the case when batch is empty.
- add unit test
* group_norm_op for caffe2
This is the cuda op for Group Normalization (GN): https://arxiv.org/abs/1803.08494
This code implements GN in one op that computes Y=gamma * (X-mu) / sigma + beta and also its gradients. It is expected to have minimal memory consumption (similar to the BN op), without creating new blobs if GN were implemented as several ops (e.g., reshape, norm_mean/std, affine_channel).
* Resubmit D7405233: disappeared in D7464958
OOS publish causes the op missing -- however, test was still there
* [c2] add sparse hash engine for cuda unique op
The SparseHash version of UniqueOp copy input tensor to CPU, and make use of sparse hash map to get unique output, and then copy back to GPU.
* [dper][gpu] enable unit testing gpu trainer for sparse nn
to debug the GPU trainer using mock data in unit test.
make it easier to develop GPU trainer for new models.
* Reuse Gloo context for Synchronize() calls
Previously we were creating (and leaking) the Gloo context on each call to Synchronize(). Now only run the common world op and create the barrier net once, then run the barrier net on each Synchronize() call. Since timeout is associated with the Gloo context, assert that the timeout is fixed instead of trying to handle the complexity of multiple timeouts (and associated contexts).
* [GanH/WGAN][1/n]: add FC param clipping
as titled
* [mobile] minimizing changes between caffe2_benchmark and speed_benchmark
* [GanH]: enable diagnose within model
avoid finding blob names but to directly enable inside the model
* Add `net_transformer_fun` option to DPM
This callback allows for various transformations to be made to the
model after gradient operators have been added. The immediate motivation for
this is to allow transformations such has "checkpoint-and-recompute" which
allow trading off memory for additional compute.
Adding several callbacks like this has made DPM's API less than ideal at this
stage. However, I could not find any reasonable alternative.
* [DT] [33/n] Compile flow task groups
task groups need to compiled in order to pickle the object in fblearner. However I also changed the Job's compile function as creating new object is not necessary.
* Initial commit for sparse_normalize vectorization and benchmark
* [GanH]: LB Calibration for JSD
as titled
* Tracing event in async executor
Adding event tracing through TRACE_EVENT macro in async executor
* [Resubmit] D7409751 Reseting book-keeping blobs when the reservoir is reset
D7409751 got lost in D7464958
* Visualizing realtime weights values
we want to visualize the weights values as optimizer is iterating. This diff supports to visual the weights at an assigned index.
Currently, we assume the blob to be 2 dimensional.
* [GanH][Easy]: Fix Homotopy Weighting
apparantely, there was a bug in homotopy weight (alpha, beta) update
* [c2] move sparse hash unique op out of oss
so that oss do not need to depend on google hash map.
* Get rid of std::round as it's not supported on Android
* Revert changes on setup.py
* Skip shaky test on Dataio
* fix
2018-04-11 04:11:43 +00:00
def test_net_transformer_function ( self ) :
devices = [ 1 , 2 , 3 ]
def add_input_ops ( model ) :
model . param_init_net . UniformFill ( [ ] , [ " data " ] , shape = [ 32 , 8 ] )
def add_optimizer ( model ) :
optimizer . build_sgd ( model , 0.1 )
def add_model_ops ( model , loss_scale ) :
fc1 = brew . fc ( model , " data " , " fc1 " , dim_in = 8 , dim_out = 8 )
return [ fc1 ]
kwargs = {
' input_builder_fun ' : add_input_ops ,
' forward_pass_builder_fun ' : add_model_ops ,
' devices ' : devices ,
}
# assert that the transformer is called for both train and test cases
transform = Mock ( )
kwargs [ ' net_transformer_fun ' ] = transform
model = model_helper . ModelHelper ( name = " r " , init_params = False )
data_parallel_model . Parallelize_CPU ( model , * * kwargs )
self . assertTrue ( transform . called )
self . assertEqual ( transform . call_count , 1 )
transform = Mock ( )
kwargs [ ' net_transformer_fun ' ] = transform
kwargs [ ' optimizer_builder_fun ' ] = add_optimizer
model = model_helper . ModelHelper ( name = " r " , init_params = True )
data_parallel_model . Parallelize_CPU ( model , * * kwargs )
self . assertTrue ( transform . called )
self . assertEqual ( transform . call_count , 1 )
2018-09-06 21:24:16 +00:00
@given ( seed = st . integers ( 0 , 65535 ) , batch_size = st . integers ( 1 , 20 ) )
2018-10-18 18:19:28 +00:00
def test_multi_device_bn_op_level_cpu ( self , seed , batch_size ) :
2018-09-20 02:41:23 +00:00
self . _bn_check_op_level ( " cpu " , seed , batch_size )
2018-09-06 21:24:16 +00:00
2018-10-18 18:19:28 +00:00
@unittest.skipIf ( not workspace . has_gpu_support , " No gpu support. " )
@unittest.skipIf ( workspace . NumCudaDevices ( ) < 2 , " Need at least 2 GPUs. " )
@given ( seed = st . integers ( 0 , 65535 ) , batch_size = st . integers ( 1 , 20 ) )
def test_multi_device_bn_op_level_gpu ( self , seed , batch_size ) :
self . _bn_check_op_level ( " gpu " , seed , batch_size )
2018-09-20 02:41:23 +00:00
def _bn_check_op_level ( self , device_type , seed , batch_size ) :
'''
Test multi device batch normalization at the operation level . This is
done by checking the outputs of batch normalization and its gradient
operator . We compare values produced with our manually calculated
batch normalization values and gradients .
'''
2018-10-18 18:19:28 +00:00
devices = [ 0 , 1 ]
2018-09-06 21:24:16 +00:00
epsilon = 1e-3
tolerance = 1e-3
def _test_forward_pass ( x , devices , device_type , scale , bias , epsilon ) :
x_concat = np . concatenate ( x )
mean = np . mean ( x_concat , axis = 0 )
var = np . var ( x_concat , axis = 0 )
for device in devices :
x_i = x [ device ]
x_hat = ( x_i - mean ) / ( np . sqrt ( var + epsilon ) )
expected_out = scale * x_hat + bias
spatial_out = workspace . FetchBlob (
2018-09-20 02:41:23 +00:00
" {} _ {} /bn_out " . format ( device_type , device ) )
2018-09-06 21:24:16 +00:00
rel_error = np . linalg . norm ( spatial_out - expected_out ) \
/ np . linalg . norm ( expected_out )
self . assertTrue ( rel_error < 0.005 )
def _test_backward_pass ( x , devices , device_type , scale , tolerance ) :
dBias_arr = [ ]
dY_arr = [ ]
dGamma_arr = [ ]
num_devices = len ( devices )
mean = np . array ( workspace . FetchBlob (
2018-09-20 02:41:23 +00:00
" {} _0/bn_out_sm " . format ( device_type ) ) , dtype = np . float32 )
2018-09-06 21:24:16 +00:00
inv_var = np . array ( workspace . FetchBlob (
2018-09-20 02:41:23 +00:00
" {} _0/bn_out_siv " . format ( device_type ) ) , dtype = np . float32 )
2018-09-06 21:24:16 +00:00
# dBias
# Sum dBias values over all devices to find the average gradient
for device in devices :
dY_blob = workspace . FetchBlob (
2018-09-20 02:41:23 +00:00
" {} _ {} /bn_out_grad " . format ( device_type , device ) )
2018-09-06 21:24:16 +00:00
dY = np . array ( dY_blob , dtype = np . float32 )
dY_arr . append ( dY )
dBias_arr . append ( np . array ( np . sum ( dY , axis = 0 ) , dtype = np . float32 ) )
dBias = np . sum ( dBias_arr , dtype = np . float32 )
dBias_avg = dBias / num_devices
for device in devices :
2018-09-20 02:41:23 +00:00
dBiasActual = np . sum ( workspace . FetchBlob ( " {} _ {} /bn_out_b_grad "
2018-09-06 21:24:16 +00:00
. format ( device_type , device ) ) , dtype = np . float32 )
self . assertTrue ( np . isclose ( [ dBiasActual ] , [ dBias ] , atol = tolerance ) )
# dGamma
# Sum dGamma values over all devices to find the average gradient
for device in devices :
dGamma = np . sum ( ( x [ device ] - mean ) * inv_var * dY_arr [ device ] ,
axis = 0 , dtype = np . float32 )
dGamma_arr . append ( dGamma )
dGamma = np . sum ( dGamma_arr , axis = 0 , dtype = np . float32 )
dGamma_avg = dGamma / num_devices
for device in devices :
dGammaActual = workspace . FetchBlob (
2018-09-20 02:41:23 +00:00
" {} _ {} /bn_out_s_grad " . format ( device_type , device ) )
2018-09-06 21:24:16 +00:00
self . assertTrue ( np . isclose ( [ dGamma ] , [ dGammaActual ] , atol = tolerance ) )
# dX
scale_inv_var = scale * inv_var / batch_size
for device in devices :
dX = scale_inv_var * ( dY_arr [ device ] * batch_size - dBias_avg
- ( x [ device ] - mean ) * dGamma_avg * inv_var )
dX_actual = workspace . FetchBlob (
" {} _ {} /tanh_grad " . format ( device_type , device ) )
self . assertTrue ( np . isclose ( [ dX ] , [ dX_actual ] , atol = tolerance ) . all ( ) )
def add_input_ops ( model ) :
for device in devices :
data = np . random . rand ( batch_size , 1 , 1 , 1 ) . astype ( np . float32 )
workspace . FeedBlob ( " {} _ {} /data " . format ( device_type , device ) , data )
def add_model_ops ( model , loss_scale ) :
2018-10-18 18:19:28 +00:00
if device_type == " gpu " :
model . CopyCPUToGPU ( " data " , " device_data " )
model . Tanh ( " device_data " , " tanh " )
else :
model . Tanh ( " data " , " tanh " )
2018-09-20 02:41:23 +00:00
model . SpatialBN ( " tanh " , " bn_out " , 1 , epsilon = epsilon , is_test = False )
model . Sqr ( " bn_out " , " sqr " )
2018-09-06 21:24:16 +00:00
loss = model . SumElements ( " sqr " , " loss " )
return [ loss ]
def add_optimizer ( model ) :
return optimizer . build_sgd ( model , 0.1 )
np . random . seed ( seed )
workspace . ResetWorkspace ( )
model = cnn . CNNModelHelper (
order = " NCHW " ,
name = " test "
)
data_parallel_model . Parallelize (
model ,
input_builder_fun = add_input_ops ,
forward_pass_builder_fun = add_model_ops ,
optimizer_builder_fun = add_optimizer ,
devices = devices ,
cpu_device = device_type == " cpu " ,
shared_model = False ,
combine_spatial_bn = True ,
)
workspace . RunNetOnce ( model . param_init_net )
2018-09-20 02:41:23 +00:00
scale = workspace . FetchBlob ( " {} _0/bn_out_s " . format ( device_type ) )
bias = workspace . FetchBlob ( " {} _0/bn_out_b " . format ( device_type ) )
2018-09-06 21:24:16 +00:00
workspace . RunNetOnce ( model . net )
x = [ ]
for device in devices :
x_blob = workspace . FetchBlob ( " {} _ {} /tanh " . format ( device_type , device ) )
x_i = np . array ( x_blob , dtype = np . float32 )
x . append ( x_i )
_test_forward_pass ( x , devices , device_type , scale , bias , epsilon )
_test_backward_pass ( x , devices , device_type , scale , tolerance )
2017-03-09 21:41:36 +00:00
2018-09-20 02:41:23 +00:00
@given ( seed = st . integers ( 0 , 65535 ) , batch_size = st . integers ( 1 , 20 ) )
def test_multi_device_bn_net_lvl_cpu ( self , seed , batch_size ) :
if batch_size % 2 == 1 :
batch_size + = 1
self . _test_multi_device_bn_net_lvl ( " cpu " , seed , batch_size )
2018-10-18 18:19:28 +00:00
@unittest.skipIf ( not workspace . has_gpu_support , " No gpu support. " )
@unittest.skipIf ( workspace . NumCudaDevices ( ) < 2 , " Need at least 2 GPUs. " )
@given ( seed = st . integers ( 0 , 65535 ) , batch_size = st . integers ( 1 , 20 ) )
def test_multi_device_bn_net_lvl_gpu ( self , seed , batch_size ) :
if batch_size % 2 == 1 :
batch_size + = 1
self . _test_multi_device_bn_net_lvl ( " gpu " , seed , batch_size )
2018-09-20 02:41:23 +00:00
def _test_multi_device_bn_net_lvl ( self , device_type , seed , batch_size ) :
'''
Test multi device batch normalization at the net level . This is done
by verifying that the final batch normalization outputs and the
gradient outputs from multiple devices are the same as those produced
from a single device
'''
# Verify that the gradients calculated over multiple devices are the
# same as the gradients calculated over one device. These values should
# be equivalent because combine_spatial_bn sums values over all devices
def _verify_bn_outputs (
devices ,
device_type ,
tolerance ,
single_device_bn_out ,
two_device_bn_out_vals ,
single_device_grads ,
two_device_grads ,
) :
two_device_bn_out = np . concatenate ( two_device_bn_out_vals )
self . assertTrue ( np . isclose (
[ single_device_bn_out ] , [ two_device_bn_out ] , atol = tolerance ) . all ( ) )
# Scalar and Bias gradients should be the same across devices
gradient_names = [ " bn_out_s_grad " , " bn_out_b_grad " ]
for name in gradient_names :
expected_grad = single_device_grads [ name ]
for device in devices :
actual_grad = two_device_grads [ device ] [ name ]
self . assertTrue (
np . isclose ( [ actual_grad ] , [ expected_grad ] , atol = tolerance ) )
# Expected tanh_grad should be the combined tanh_grad vectors
# across the devices
first_grad = two_device_grads [ 0 ] [ " tanh_grad " ]
second_grad = two_device_grads [ 1 ] [ " tanh_grad " ]
actual_grad = np . concatenate ( [ first_grad , second_grad ] )
expected_grad = single_device_grads [ " tanh_grad " ]
rel_error = np . linalg . norm ( actual_grad - expected_grad ) \
/ np . linalg . norm ( expected_grad )
self . assertTrue ( rel_error < 1e-3 )
def _create_model ( multiple_devices ) :
def add_input_ops_no_combine ( model ) :
workspace . FeedBlob ( " {} _0/data " . format ( device_type ) , data )
def add_input_ops_combine ( model ) :
half = int ( batch_size / 2 )
workspace . FeedBlob ( " {} _0/data " . format ( device_type ) , data [ : half ] )
workspace . FeedBlob ( " {} _1/data " . format ( device_type ) , data [ half : ] )
def add_model_ops ( model , loss_scale ) :
2018-10-18 18:19:28 +00:00
if device_type == " gpu " :
model . CopyCPUToGPU ( " data " , " device_data " )
model . Tanh ( " device_data " , " tanh " )
else :
model . Tanh ( " data " , " tanh " )
2018-09-20 02:41:23 +00:00
model . SpatialBN ( " tanh " , " bn_out " , 1 , epsilon = epsilon , is_test = False )
model . Sqr ( " bn_out " , " sqr " )
loss = model . SumElements ( " sqr " , " loss " )
return [ loss ]
def add_optimizer ( model ) :
return optimizer . build_sgd ( model , 0.1 )
if multiple_devices :
input_fun = add_input_ops_combine
devices = [ 0 , 1 ]
combine_spatial_bn = True
else :
input_fun = add_input_ops_no_combine
devices = [ 0 ]
combine_spatial_bn = False
model = cnn . CNNModelHelper (
order = " NCHW " ,
name = " test "
)
data_parallel_model . Parallelize (
model ,
input_builder_fun = input_fun ,
forward_pass_builder_fun = add_model_ops ,
optimizer_builder_fun = add_optimizer ,
devices = devices ,
cpu_device = device_type == " cpu " ,
shared_model = False ,
combine_spatial_bn = combine_spatial_bn ,
)
return model
devices = [ 0 , 1 ]
epsilon = 1e-3
tolerance = 1e-3
# We are generating random data
np . random . seed ( seed )
data = np . random . rand ( batch_size , 1 , 1 , 1 ) . astype ( np . float32 )
data = np . reshape ( data , ( batch_size , 1 , 1 , 1 ) )
# Get values calculated without combine_spatial_bn
workspace . ResetWorkspace ( )
model_no_combine = _create_model ( multiple_devices = False )
workspace . RunNetOnce ( model_no_combine . param_init_net )
workspace . RunNetOnce ( model_no_combine . net )
single_device_bn_out = workspace . FetchBlob ( " {} _0/bn_out " . format ( device_type ) )
single_device_grads = { }
single_device_grads [ " bn_out_s_grad " ] = workspace . FetchBlob (
" {} _0/bn_out_s_grad " . format ( device_type ) )
single_device_grads [ " bn_out_b_grad " ] = workspace . FetchBlob (
" {} _0/bn_out_b_grad " . format ( device_type ) )
single_device_grads [ " tanh_grad " ] = workspace . FetchBlob (
" {} _0/tanh_grad " . format ( device_type ) )
# Get values calculated over multiple devices with combine_spatial_bn true
workspace . ResetWorkspace ( )
model_combine = _create_model ( multiple_devices = True )
workspace . RunNetOnce ( model_combine . param_init_net )
workspace . RunNetOnce ( model_combine . net )
two_device_bn_out_vals = [ ]
two_device_grads = { }
for device in devices :
bn_out_blob = " {} _ {} /bn_out " . format ( device_type , device )
two_device_bn_out_vals . append ( workspace . FetchBlob ( bn_out_blob ) )
two_device_grads [ device ] = { }
two_device_grads [ device ] [ " bn_out_s_grad " ] = workspace . FetchBlob (
" {} _ {} /bn_out_s_grad " . format ( device_type , device ) )
two_device_grads [ device ] [ " bn_out_b_grad " ] = workspace . FetchBlob (
" {} _ {} /bn_out_b_grad " . format ( device_type , device ) )
two_device_grads [ device ] [ " tanh_grad " ] = workspace . FetchBlob (
" {} _ {} /tanh_grad " . format ( device_type , device ) )
# Check to see if the combined values are equivalent
_verify_bn_outputs (
devices ,
device_type ,
tolerance ,
single_device_bn_out ,
two_device_bn_out_vals ,
single_device_grads ,
two_device_grads
)
2017-03-14 22:43:42 +00:00
class RecurrentNetworkParallelTest ( TestCase ) :
2017-06-21 06:10:51 +00:00
def run_model ( self , devices , gpu ) :
2017-03-14 22:43:42 +00:00
'''
Helper function for test_equiv
'''
def input_builder_fun ( model ) :
return None
def model_build_fun ( model , loss_scale ) :
workspace . FeedBlob (
core . ScopedBlobReference ( " seq_lengths " ) ,
np . array ( [ self . T ] * self . batch_per_device , dtype = np . int32 )
)
model . param_init_net . ConstantFill (
[ ] ,
" hidden_init " ,
value = 0.0 ,
shape = [ 1 , self . batch_per_device , self . hidden_dim ]
)
model . param_init_net . ConstantFill (
[ ] ,
" cell_init " ,
value = 0.0 ,
shape = [ 1 , self . batch_per_device , self . hidden_dim ]
)
2017-04-18 07:33:06 +00:00
output , _last_hidden , _ , _last_state , = rnn_cell . LSTM (
2017-03-14 22:43:42 +00:00
model = model ,
input_blob = " data " ,
seq_lengths = " seq_lengths " ,
initial_states = ( " hidden_init " , " cell_init " ) ,
dim_in = self . input_dim ,
dim_out = self . hidden_dim ,
scope = " partest " ,
)
# A silly loss function
loss = model . AveragedLoss (
model . Sub ( [ output , " target " ] , " dist " ) ,
" loss " ,
)
loss = model . Scale ( loss , " loss_scaled " , scale = loss_scale )
return [ loss ]
def param_update_fun ( model ) :
ITER = model . Iter ( " ITER " )
LR = model . net . LearningRate (
[ ITER ] ,
" LR " ,
base_lr = ( - 0.1 ) ,
policy = " fixed " ,
)
ONE = model . param_init_net . ConstantFill (
[ ] , " ONE " , shape = [ 1 ] , value = 1.0 ,
)
for param in model . GetParams ( ) :
param_grad = model . param_to_grad [ param ]
model . WeightedSum ( [ param , ONE , param_grad , LR ] , param )
assert len ( model . GetParams ( ) ) == len ( model . params ) / / len ( model . _devices )
workspace . ResetWorkspace ( )
model = cnn . CNNModelHelper (
2017-06-21 06:10:51 +00:00
name = " recurrent_test {} " . format ( devices ) ,
2017-03-14 22:43:42 +00:00
)
self . T = 8
self . batch_size = 64
self . input_dim = 8
self . hidden_dim = 31
2017-06-21 06:10:51 +00:00
self . batch_per_device = self . batch_size / / len ( devices )
2017-03-14 22:43:42 +00:00
2017-06-21 06:10:51 +00:00
data_parallel_model . Parallelize (
2017-03-14 22:43:42 +00:00
model ,
input_builder_fun = input_builder_fun ,
forward_pass_builder_fun = model_build_fun ,
param_update_builder_fun = param_update_fun ,
2017-06-21 06:10:51 +00:00
devices = devices ,
2017-04-05 16:44:10 +00:00
optimize_gradient_memory = True ,
2017-06-21 06:10:51 +00:00
cpu_device = not gpu ,
2017-03-14 22:43:42 +00:00
)
# Change all initialization to be ConstantFills so that
# the everything is deterministic
for op in model . param_init_net . Proto ( ) . op :
if op . type . endswith ( ' Fill ' ) :
op . type = ' ConstantFill '
# Each run has same input, independent of number of gpus
np . random . seed ( 20150210 )
for i in range ( 0 , 10 ) :
full_data = np . random . rand ( self . T , self . batch_size , self . input_dim )
full_target = np . random . rand (
self . T , self . batch_size , self . hidden_dim
)
2017-06-21 06:10:51 +00:00
for ( j , g ) in enumerate ( devices ) :
2017-03-14 22:43:42 +00:00
st = j * self . batch_per_device
en = st + self . batch_per_device
data = full_data [ : , st : en , : ] . astype ( np . float32 )
targets = full_target [ : , st : en , : ] . astype ( np . float32 )
2017-06-21 06:10:51 +00:00
with core . DeviceScope ( core . DeviceOption ( model . _device_type , g ) ) :
workspace . FeedBlob (
" {} _ {} /data " . format ( model . _device_prefix , g ) , data
)
workspace . FeedBlob (
" {} _ {} /target " . format ( model . _device_prefix , g ) , targets
)
2017-03-14 22:43:42 +00:00
if i == 0 :
workspace . RunNetOnce ( model . param_init_net )
workspace . CreateNet ( model . net )
workspace . RunNet ( model . net . Proto ( ) . name )
2017-06-21 06:10:51 +00:00
return workspace . FetchBlob ( " {} _0/partest/i2h_w " . format ( model . _device_prefix ) )
2017-03-14 22:43:42 +00:00
2019-11-06 21:27:55 +00:00
@unittest.skip ( " Test is flaky: https://github.com/pytorch/pytorch/issues/10322 " )
2017-03-14 22:43:42 +00:00
def test_equiv_recurrent ( self ) :
'''
Test that the model produces exactly same results given
2017-06-21 06:10:51 +00:00
total batchsize , independent of number of GPUs / CPUs .
2017-03-14 22:43:42 +00:00
'''
2017-06-21 06:10:51 +00:00
for gpu in [ True , False ] :
if gpu and not workspace . has_gpu_support :
continue
result_2gpus = self . run_model ( [ 0 , 1 ] , gpu )
result_1gpus = self . run_model ( [ 0 ] , gpu )
2017-03-14 22:43:42 +00:00
2017-06-21 06:10:51 +00:00
self . assertTrue ( np . allclose ( result_1gpus , result_2gpus ) )
2017-03-14 22:43:42 +00:00
2017-06-21 06:10:51 +00:00
if not gpu or workspace . NumCudaDevices ( ) > = 4 :
result_4gpus = self . run_model ( list ( range ( 4 ) ) , gpu )
self . assertTrue ( np . allclose ( result_1gpus , result_4gpus ) )
2017-03-14 22:43:42 +00:00
2017-06-21 06:10:51 +00:00
if not gpu or workspace . NumCudaDevices ( ) > = 8 :
result_8gpus = self . run_model ( list ( range ( 8 ) ) , gpu )
self . assertTrue ( np . allclose ( result_1gpus , result_8gpus ) )
2017-03-14 22:43:42 +00:00
2017-03-09 21:41:36 +00:00
@unittest.skipIf ( not workspace . has_gpu_support , " No gpu support. " )
@unittest.skipIf ( workspace . NumCudaDevices ( ) < 2 , " Need at least 2 GPUs. " )
class SparseDataParallelModelTest ( TestCase ) :
2017-04-14 00:15:51 +00:00
'''
Create and run the model . We try with both storing indices for gather
on CPU and on GPU
'''
def run_model ( self , V , gpu_devices , cpu_indices ) :
2017-03-09 21:41:36 +00:00
2017-03-14 22:43:42 +00:00
def input_builder_fun ( model ) :
return None
2017-03-09 21:41:36 +00:00
2017-03-14 22:43:42 +00:00
def model_build_fun ( model , loss_scale ) :
2017-04-14 00:15:51 +00:00
if cpu_indices :
with core . DeviceScope ( core . DeviceOption ( caffe2_pb2 . CPU ) ) :
gathered_cpu = model . net . Gather (
[ self . vecs , ' indices ' ] , ' gathered_cpu ' )
gathered = model . CopyCPUToGPU ( gathered_cpu , " gathered " )
else :
gpu_vecs = model . param_init_net . CopyCPUToGPU (
self . vecs , " gpuvecs " ,
)
model . params . append ( gpu_vecs )
gathered = model . net . Gather ( [ gpu_vecs , ' indices ' ] , ' gathered ' )
2017-03-14 22:43:42 +00:00
flattened = model . Flatten ( gathered , " flattened " )
fc = model . FC ( flattened , " fc " , 16 * 16 , 1 ,
( " ConstantFill " , { } ) , ( " ConstantFill " , { } ) )
fc_fl = model . FlattenToVec ( fc , " fc_fl " )
sigm = model . Sigmoid ( fc_fl , " sigm " )
sq = model . SquaredL2Distance ( [ sigm , " label " ] , " sq " )
loss = model . AveragedLoss ( sq , " loss " )
loss = model . Scale ( loss , scale = loss_scale )
return [ loss ]
def param_update_fun ( model ) :
ONE = model . param_init_net . ConstantFill (
[ ] , " ONE " , shape = [ 1 ] , value = 1.0 ,
)
LR = model . CopyCPUToGPU ( self . LR , " LR " )
for param in model . GetParams ( ) :
param_grad = model . param_to_grad [ param ]
2017-04-14 00:15:51 +00:00
if not isinstance ( param_grad , core . GradientSlice ) :
model . WeightedSum ( [ param , ONE , param_grad , LR ] , param )
else :
param_momentum = model . param_init_net . ConstantFill (
[ param ] ,
2017-05-19 19:46:18 +00:00
param + ' _momentum ' ,
2017-04-14 00:15:51 +00:00
value = 0.0 ,
)
model . net . SparseMomentumSGDUpdate (
[
param_grad . values ,
param_momentum ,
LR ,
param ,
param_grad . indices ,
] ,
[
param_grad . values , param_momentum , param
] ,
momentum = 0.1 ,
nesterov = 0 ,
)
2017-03-14 22:43:42 +00:00
workspace . ResetWorkspace ( )
model = cnn . CNNModelHelper (
order = " NHWC " ,
name = " sparse_test {} " . format ( gpu_devices ) ,
)
with core . NameScope ( " cpu " ) :
with core . DeviceScope ( core . DeviceOption ( caffe2_pb2 . CPU ) ) :
self . ITER = model . Iter ( " ITER " )
self . LR = model . net . LearningRate (
[ self . ITER ] ,
" LR " ,
base_lr = ( - 0.1 ) ,
policy = " fixed " ,
2017-03-09 21:41:36 +00:00
)
2017-03-14 22:43:42 +00:00
self . vecs = model . param_init_net . UniformFill (
[ ] , " vecs " , shape = [ V , 16 ] )
2017-04-14 00:15:51 +00:00
if cpu_indices :
model . params . append ( self . vecs )
self . ONE_CPU = model . param_init_net . ConstantFill (
[ ] , " ONE_CPU " , shape = [ 1 ] , value = 1.0 ,
)
2017-03-14 22:43:42 +00:00
data_parallel_model . Parallelize_GPU (
model ,
input_builder_fun = input_builder_fun ,
forward_pass_builder_fun = model_build_fun ,
param_update_builder_fun = param_update_fun ,
devices = gpu_devices ,
)
# Update the vecs
2017-04-14 00:15:51 +00:00
if cpu_indices :
with core . NameScope ( " cpu " ) :
with core . DeviceScope ( core . DeviceOption ( caffe2_pb2 . CPU ) ) :
for param in model . GetParams ( ) :
param_grad = model . param_to_grad [ param ]
model . ScatterWeightedSum ( [ param , self . ONE_CPU ,
param_grad . indices ,
param_grad . values ,
self . LR ] ,
self . vecs )
else :
2018-11-29 21:58:11 +00:00
with core . DeviceScope ( core . DeviceOption ( workspace . GpuDeviceType , 0 ) ) :
2017-04-14 00:15:51 +00:00
model . CopyGPUToCPU ( " gpu_0/gpuvecs " , self . vecs )
2017-03-09 21:41:36 +00:00
2017-03-14 22:43:42 +00:00
np . random . seed ( 2603 )
2017-03-09 21:41:36 +00:00
2017-03-14 22:43:42 +00:00
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range ( 0 , 10 ) :
2017-04-14 00:15:51 +00:00
full_indices = np . random . permutation ( V ) [ : batch_size * 16 ] . reshape (
batch_size , 16
)
2017-03-14 22:43:42 +00:00
full_labels = full_indices [ : , 0 ] % 2
batch_per_device = batch_size / / len ( gpu_devices )
2017-03-09 21:41:36 +00:00
2017-03-14 22:43:42 +00:00
for ( j , g ) in enumerate ( gpu_devices ) :
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices [ st : en , : ] . astype ( np . int32 )
labels = full_labels [ st : en ] . astype ( np . float32 )
2017-03-09 21:41:36 +00:00
2017-04-14 00:15:51 +00:00
device_for_indices = core . DeviceOption ( caffe2_pb2 . CPU )
if not cpu_indices :
2018-11-29 21:58:11 +00:00
device_for_indices = core . DeviceOption ( workspace . GpuDeviceType , g )
2017-04-14 00:15:51 +00:00
with core . DeviceScope ( device_for_indices ) :
2017-03-14 22:43:42 +00:00
workspace . FeedBlob ( " gpu_ {} /indices " . format ( g ) , indices )
2018-11-29 21:58:11 +00:00
with core . DeviceScope ( core . DeviceOption ( workspace . GpuDeviceType , g ) ) :
2017-03-14 22:43:42 +00:00
workspace . FeedBlob ( " gpu_ {} /label " . format ( g ) , labels )
if i == 0 :
workspace . RunNetOnce ( model . param_init_net )
# Force vecs to be same on all runs
orig_vecs = np . random . rand ( V , 16 ) . astype ( np . float32 )
workspace . FeedBlob (
self . vecs ,
orig_vecs
)
2017-04-14 00:15:51 +00:00
if not cpu_indices :
for g in gpu_devices :
workspace . FeedBlob (
" gpu_ {} /gpuvecs " . format ( g ) ,
orig_vecs ,
2018-11-29 21:58:11 +00:00
device_option = core . DeviceOption ( workspace . GpuDeviceType , g ) ,
2017-04-14 00:15:51 +00:00
)
2017-03-14 22:43:42 +00:00
workspace . CreateNet ( model . net )
workspace . RunNet ( model . net . Proto ( ) . name )
2017-04-14 00:15:51 +00:00
if len ( gpu_devices ) == 2 :
if not cpu_indices :
idx = workspace . FetchBlob ( " gpu_0/indices " )
idx = list ( idx . flatten ( ) )
n = len ( idx )
nu = len ( set ( idx ) )
assert n == nu , " We cannot have duplicate indices "
2017-03-14 22:43:42 +00:00
# Sanity check to see the vecs were updated
self . assertFalse (
np . allclose ( workspace . FetchBlob ( self . vecs ) , orig_vecs ) )
2017-04-14 00:15:51 +00:00
return [ workspace . FetchBlob ( self . vecs if cpu_indices else " gpu_0/gpuvecs " ) ,
2017-03-14 22:43:42 +00:00
workspace . FetchBlob ( " gpu_0/fc_w " ) ]
2017-04-14 00:15:51 +00:00
def _test_equiv_sparse ( self , cpu_indices ) :
2017-03-14 22:43:42 +00:00
'''
2017-03-09 21:41:36 +00:00
Test that the model produces exactly same results given
total batchsize , independent of number of GPUs .
2017-04-14 00:15:51 +00:00
'''
2017-03-14 22:43:42 +00:00
V = 10000
2017-04-14 00:15:51 +00:00
result_2gpus = self . run_model ( V , [ 0 , 1 ] , cpu_indices )
result_1gpus = self . run_model ( V , [ 0 ] , cpu_indices )
2017-03-14 22:43:42 +00:00
self . assertTrue ( np . allclose ( result_1gpus [ 0 ] , result_2gpus [ 0 ] ) )
self . assertTrue ( np . allclose ( result_1gpus [ 1 ] , result_2gpus [ 1 ] ) )
if workspace . NumCudaDevices ( ) > = 4 :
2017-06-07 06:59:46 +00:00
result_4gpus = self . run_model ( V , list ( range ( 4 ) ) , cpu_indices )
2017-03-14 22:43:42 +00:00
self . assertTrue ( np . allclose ( result_1gpus [ 0 ] , result_4gpus [ 0 ] ) )
self . assertTrue ( np . allclose ( result_1gpus [ 1 ] , result_4gpus [ 1 ] ) )
if workspace . NumCudaDevices ( ) > = 8 :
2017-06-07 06:59:46 +00:00
result_8gpus = self . run_model ( V , list ( range ( 8 ) ) , cpu_indices )
2017-03-14 22:43:42 +00:00
self . assertTrue ( np . allclose ( result_1gpus [ 0 ] , result_8gpus [ 0 ] ) )
self . assertTrue ( np . allclose ( result_1gpus [ 1 ] , result_8gpus [ 1 ] ) )
2017-04-14 00:15:51 +00:00
def test_equiv_sparse ( self ) :
self . _test_equiv_sparse ( True )
self . _test_equiv_sparse ( False )
2017-05-16 01:02:16 +00:00
2018-11-29 21:58:11 +00:00
@unittest.skipIf ( not workspace . has_gpu_support , " No gpu support. " )
@unittest.skipIf ( workspace . NumGpuDevices ( ) < 2 , " Need at least 2 GPUs. " )
2017-11-20 07:28:40 +00:00
class ParallelizeBMUFTest ( TestCase ) :
2017-05-16 01:02:16 +00:00
def _run_model ( self , gpu_devices ) :
'''
Helper function for test_equiv
'''
def input_builder_fun ( model ) :
return None
def _model_build_fun ( self , model , loss_scale ) :
fc = model . FC (
" data " , " fc " , 16 , 1 , ( " ConstantFill " , { } ) , ( " ConstantFill " , { } )
)
fc_fl = model . FlattenToVec ( fc , " fc_fl " )
sigm = model . Sigmoid ( fc_fl , " sigm " )
sq = model . SquaredL2Distance ( [ sigm , " label " ] , " sq " )
loss = model . AveragedLoss ( sq , " loss " )
loss = model . Scale ( loss , scale = loss_scale )
2017-09-12 17:20:22 +00:00
2017-05-16 01:02:16 +00:00
return [ loss ]
def _param_update_fun ( self , model ) :
ITER = model . Iter ( " ITER " )
LR = model . net . LearningRate (
[ ITER ] ,
" LR " ,
base_lr = ( - 0.1 ) ,
policy = " fixed " ,
)
ONE = model . param_init_net . ConstantFill (
[ ] , " ONE " , shape = [ 1 ] , value = 1.0 ,
)
for param in model . GetParams ( ) :
grad = model . param_to_grad [ param ]
model . WeightedSum ( [ param , ONE , grad , LR ] , param )
2017-11-20 07:28:40 +00:00
def _generate_data ( self , devices , device_type , device_prefix ) :
2017-05-16 01:02:16 +00:00
np . random . seed ( 26 )
# Each run has same input, independent of number of gpus
batch_size = 64
for _ in range ( 0 , 10 ) :
full_data = np . random . rand ( batch_size , 16 )
full_labels = np . round ( full_data [ : , 0 ] )
2017-11-20 07:28:40 +00:00
batch_per_device = batch_size / / len ( devices )
2017-05-16 01:02:16 +00:00
2017-11-20 07:28:40 +00:00
for ( j , g ) in enumerate ( devices ) :
2017-05-16 01:02:16 +00:00
st = j * batch_per_device
en = st + batch_per_device
data = full_data [ st : en , : ] . astype ( np . float32 )
labels = full_labels [ st : en ] . astype ( np . float32 )
2017-11-20 07:28:40 +00:00
with core . DeviceScope ( core . DeviceOption ( device_type , g ) ) :
workspace . FeedBlob ( " {} _ {} /data " . format ( device_prefix , g ) , data )
workspace . FeedBlob ( " {} _ {} /label " . format ( device_prefix , g ) , labels )
@given (
cpu_device = st . booleans ( )
)
def test_parallelize_bmuf ( self , cpu_device ) :
2018-11-29 21:58:11 +00:00
assume ( cpu_device or workspace . has_gpu_support or workspace . has_hip_support )
2017-11-20 07:28:40 +00:00
workspace . ResetWorkspace ( )
2017-05-16 01:02:16 +00:00
model = cnn . CNNModelHelper (
order = " NHWC " ,
name = " test "
)
2017-11-20 07:28:40 +00:00
devices = [ 0 , 1 ]
2017-05-16 01:02:16 +00:00
def input_builder_fun ( model ) :
return None
2017-11-20 07:28:40 +00:00
if not cpu_device :
2018-11-29 21:58:11 +00:00
device_type = workspace . GpuDeviceType
2017-11-20 07:28:40 +00:00
device_prefix = " gpu "
else :
device_type = caffe2_pb2 . CPU
device_prefix = " cpu "
self . _generate_data ( devices , device_type , device_prefix )
2017-05-16 01:02:16 +00:00
2017-11-20 07:28:40 +00:00
data_parallel_model . Parallelize_BMUF (
2017-05-16 01:02:16 +00:00
model ,
input_builder_fun ,
self . _model_build_fun ,
self . _param_update_fun ,
2017-11-20 07:28:40 +00:00
devices = devices ,
cpu_device = cpu_device
2017-05-16 01:02:16 +00:00
)
data_parallel_model . RunInitNet ( model )
# Check initial momentum params are zeros
2017-06-29 23:52:01 +00:00
self . assertEqual (
list ( viewkeys ( model . _device_grouped_blobs ) ) , [ ' fc_w ' , ' fc_b ' ]
)
2017-11-20 07:28:40 +00:00
self . assertEqual ( workspace . FetchBlob ( ' {} _0/fc_b_v ' . format ( device_prefix ) ) , 0 )
2017-05-16 01:02:16 +00:00
np . testing . assert_equal (
2017-11-20 07:28:40 +00:00
workspace . FetchBlob ( ' {} _0/fc_w_v ' . format ( device_prefix ) ) ,
2017-05-16 01:02:16 +00:00
np . zeros ( 16 ) . astype ( np . float32 ) . reshape ( 1 , 16 )
)
# Run the algorithm for one iteration to have non-zero params.
data_parallel_model . RunNet ( model , 1 )
# Save iteration momentum and post local update params
2017-11-20 07:28:40 +00:00
v_b_ = workspace . FetchBlob ( ' {} _0/fc_b_v ' . format ( device_prefix ) )
v_w_ = workspace . FetchBlob ( ' {} _0/fc_w_v ' . format ( device_prefix ) )
2017-05-16 01:02:16 +00:00
workspace . RunNetOnce ( model . net )
2017-11-20 07:28:40 +00:00
b_0_ = workspace . FetchBlob ( ' {} _0/fc_b ' . format ( device_prefix ) )
w_0_ = workspace . FetchBlob ( ' {} _0/fc_w ' . format ( device_prefix ) )
b_1_ = workspace . FetchBlob ( ' {} _1/fc_b ' . format ( device_prefix ) )
w_1_ = workspace . FetchBlob ( ' {} _1/fc_w ' . format ( device_prefix ) )
2017-05-16 01:02:16 +00:00
# Compute block gradients.
2017-11-20 07:28:40 +00:00
b_g_ = workspace . FetchBlob ( ' {} _0/fc_b_g ' . format ( device_prefix ) )
w_g_ = workspace . FetchBlob ( ' {} _0/fc_w_g ' . format ( device_prefix ) )
2017-05-16 01:02:16 +00:00
workspace . RunNetOnce ( model . _global_model_param_updates_net )
g_b = ( b_0_ + b_1_ ) / 2 - b_g_
g_w = ( w_0_ + w_1_ ) / 2 - w_g_
2017-11-20 07:28:40 +00:00
v_b = workspace . FetchBlob ( ' {} _0/fc_b_v ' . format ( device_prefix ) )
v_w = workspace . FetchBlob ( ' {} _0/fc_w_v ' . format ( device_prefix ) )
w_g = workspace . FetchBlob ( ' {} _0/fc_w_g ' . format ( device_prefix ) )
b_g = workspace . FetchBlob ( ' {} _0/fc_b_g ' . format ( device_prefix ) )
w_0 = workspace . FetchBlob ( ' {} _0/fc_w ' . format ( device_prefix ) )
b_0 = workspace . FetchBlob ( ' {} _0/fc_b ' . format ( device_prefix ) )
w_1 = workspace . FetchBlob ( ' {} _1/fc_w ' . format ( device_prefix ) )
b_1 = workspace . FetchBlob ( ' {} _1/fc_b ' . format ( device_prefix ) )
2017-05-16 01:02:16 +00:00
# Check momentum update step
np . testing . assert_equal ( v_b , 0.5 * v_b_ + g_b )
np . testing . assert_equal ( v_w , 0.5 * v_w_ + g_w )
np . testing . assert_equal ( w_g , w_0 )
np . testing . assert_equal ( w_g , w_1 )
np . testing . assert_equal ( b_g , b_0 )
np . testing . assert_equal ( b_g , b_1 )
# Check params update step
np . testing . assert_equal ( w_0 , w_g_ + v_w )
np . testing . assert_equal ( b_0 , b_g_ + v_b )
2017-05-25 05:10:04 +00:00
@unittest.skipIf ( not workspace . has_gpu_support , " No gpu support. " )
2018-11-29 21:58:11 +00:00
@unittest.skipIf ( workspace . NumGpuDevices ( ) < 2 , " Need at least 2 GPUs. " )
2017-05-25 05:10:04 +00:00
class SparseDataParallelModelTestWithSharedIndices ( TestCase ) :
'''
Create and run the model . We try with both storing indices for gather
on CPU and on GPU
'''
def run_model ( self , V , gpu_devices ) :
def input_builder_fun ( model ) :
return None
def model_build_fun ( model , loss_scale ) :
gpu_vecs_gathered = [ ]
gpu_vecs = [ ]
for num , vec in enumerate ( self . vecs ) :
gpu_vec = model . param_init_net . CopyCPUToGPU (
vec , ' gpuvec_ {} ' . format ( num ) ,
)
if num != 2 :
model . params . append ( gpu_vec )
gpu_vecs . append ( gpu_vec )
for num , gpu_vec in enumerate ( gpu_vecs ) :
gpu_vec_gathered = model . net . Gather (
[ gpu_vec , ' indices ' ] ,
[ ' gpu_vec_gathered_ {} ' . format ( num ) ]
)
gpu_vecs_gathered . append ( gpu_vec_gathered )
assert len ( gpu_vecs_gathered ) == 3
fc = model . net . FC (
[
gpu_vecs_gathered [ 2 ] ,
gpu_vecs_gathered [ 0 ] ,
gpu_vecs_gathered [ 1 ] ,
] ,
[ ' fc ' ] ,
)
_ , loss = model . net . SoftmaxWithLoss (
[ fc , ' label ' ] ,
[ ' ce_loss ' , ' avg_loss ' ] ,
only_loss = True ,
)
loss = model . Scale ( loss , scale = loss_scale )
model . net . Print ( loss , [ ] , limit = 10 )
return [ loss ]
def param_update_fun ( model ) :
ONE = model . param_init_net . ConstantFill (
[ ] , " ONE " , shape = [ 1 ] , value = 1.0 ,
)
LR = model . CopyCPUToGPU ( self . LR , " LR " )
for param in model . GetParams ( ) :
param_grad = model . param_to_grad [ param ]
if not isinstance ( param_grad , core . GradientSlice ) :
model . WeightedSum ( [ param , ONE , param_grad , LR ] , param )
else :
model . net . ScatterWeightedSum (
[
param ,
ONE ,
param_grad . indices ,
param_grad . values ,
ONE ,
] ,
param ,
)
workspace . ResetWorkspace ( )
model = cnn . CNNModelHelper (
order = " NHWC " ,
name = " sparse_test {} " . format ( gpu_devices ) ,
)
batch_size = 32
batch_per_device = batch_size / / len ( gpu_devices )
with core . NameScope ( " cpu " ) :
with core . DeviceScope ( core . DeviceOption ( caffe2_pb2 . CPU ) ) :
self . ITER = model . Iter ( " ITER " )
self . LR = model . net . LearningRate (
[ self . ITER ] ,
" LR " ,
base_lr = ( - 0.1 ) ,
policy = " fixed " ,
)
'''
self . vecs consists of 3 big blobs on which we call Gather :
1 ) FC weights , shape = ( V , 16 )
2 ) FC bias , shape = ( V )
3 ) FC input , shape = ( batch_per_device , 16 )
'''
self . vecs = [
model . param_init_net . UniformFill (
[ ] , " vec_ {} " . format ( num ) , shape = [ V , 16 ] )
for num in range ( 2 )
]
self . vecs . append (
model . param_init_net . UniformFill (
[ ] ,
" vec_2 " , shape = [ batch_per_device , 16 ]
)
)
self . ONE_CPU = model . param_init_net . ConstantFill (
[ ] , " ONE_CPU " , shape = [ 1 ] , value = 1.0 ,
)
data_parallel_model . Parallelize_GPU (
model ,
input_builder_fun = input_builder_fun ,
forward_pass_builder_fun = model_build_fun ,
param_update_builder_fun = param_update_fun ,
devices = gpu_devices ,
)
# Update the vecs
2018-11-29 21:58:11 +00:00
with core . DeviceScope ( core . DeviceOption ( workspace . GpuDeviceType , 0 ) ) :
2017-05-25 05:10:04 +00:00
for num , vec in enumerate ( self . vecs [ : - 1 ] ) :
model . CopyGPUToCPU ( " gpu_0/gpuvec_ {} " . format ( num ) , vec )
# Each run has same input, independent of number of gpus
for i in range ( 0 , 10 ) :
np . random . seed ( 2603 )
full_indices = np . random . permutation ( V ) [ : batch_size ] . reshape (
batch_size
)
full_labels = full_indices [ : ] % batch_per_device
for ( j , g ) in enumerate ( gpu_devices ) :
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices [ st : en ] . astype ( np . int32 )
labels = full_labels [ st : en ] . astype ( np . int32 )
2018-11-29 21:58:11 +00:00
with core . DeviceScope ( core . DeviceOption ( workspace . GpuDeviceType , g ) ) :
2017-05-25 05:10:04 +00:00
workspace . FeedBlob ( " gpu_ {} /indices " . format ( g ) , indices )
workspace . FeedBlob ( " gpu_ {} /label " . format ( g ) , labels )
if i == 0 :
workspace . RunNetOnce ( model . param_init_net )
# Force vecs to be same on all runs
orig_vecs = [
np . random . rand ( V , 16 ) . astype ( np . float32 ) ,
np . random . rand ( V ) . astype ( np . float32 ) ,
np . random . rand ( V , 16 ) . astype ( np . float32 ) ,
]
for vec , orig_vec in zip ( self . vecs , orig_vecs ) :
workspace . FeedBlob (
vec ,
orig_vec
)
for g in gpu_devices :
for num , orig_vec in enumerate ( orig_vecs ) :
workspace . FeedBlob (
" gpu_ {} /gpuvec_ {} " . format ( g , num ) ,
orig_vec ,
device_option = core . DeviceOption (
2018-11-29 21:58:11 +00:00
workspace . GpuDeviceType , g ) ,
2017-05-25 05:10:04 +00:00
)
workspace . CreateNet ( model . net )
workspace . RunNet ( model . net . Proto ( ) . name )
idx = workspace . FetchBlob ( ' gpu_0/indices ' )
grad_slices = [
workspace . FetchBlob (
' gpu_ {} /gpu_vec_gathered_ {} _grad ' . format ( g , num ) )
for g in gpu_devices for num in range ( 2 )
]
for grad_slice in grad_slices :
# print (len(idx), len(grad_slice))
assert len ( idx ) == len ( grad_slice ) , (
' Number of indices {} is not same as number of gradient '
' slices {} . This might lead to illegal memory access ' . format (
len ( idx ) , len ( grad_slice )
)
)
def test_sparse_shared_indices_gpu ( self ) :
'''
Test that the model has same number of indices and gradient rows
given total batchsize , independent of number of GPUs .
'''
V = 10000
self . run_model ( V , [ 0 , 1 ] )
self . run_model ( V , [ 0 ] )
2018-11-29 21:58:11 +00:00
if workspace . NumGpuDevices ( ) > = 4 :
2017-06-07 06:59:46 +00:00
self . run_model ( V , list ( range ( 4 ) ) )
2017-05-25 05:10:04 +00:00
2018-11-29 21:58:11 +00:00
if workspace . NumGpuDevices ( ) > = 8 :
2017-06-07 06:59:46 +00:00
self . run_model ( V , list ( range ( 8 ) ) )
2017-09-06 20:18:06 +00:00
if __name__ == " __main__ " :
import unittest
unittest . main ( )