Add e2e measurement for training (#4049)

* add e2e measurement
This commit is contained in:
pengwa 2020-05-29 10:08:29 +08:00 committed by GitHub
parent 26be762b35
commit 6d03470587
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -68,9 +68,9 @@ TrainingRunner::TrainingRunner(Parameters params, const Environment& env, Sessio
ORT_ENFORCE(!params_.training_optimizer_name.empty());
if (params.partition_optimizer)
ORT_ENFORCE(params.use_nccl,
"Optimizer partitioning is only supported with NCCL distributed training.");
"Optimizer partitioning is only supported with NCCL distributed training.");
ORT_ENFORCE(params.num_train_steps % params.gradient_accumulation_steps == 0,
"Number of training steps must be a multiple of number of gradient accumulation step.");
"Number of training steps must be a multiple of number of gradient accumulation step.");
}
Status TrainingRunner::Initialize() {
@ -323,7 +323,7 @@ Status TrainingRunner::Initialize() {
}
Status TrainingRunner::Run(IDataLoader* training_data_loader, IDataLoader* test_data_loader,
const MapStringToString& mapped_dimensions) {
const MapStringToString& mapped_dimensions) {
if (params_.mpi_context.world_rank == 0 && !params_.model_actual_running_graph_path.empty()) {
session_.Save(params_.model_actual_running_graph_path, TrainingSession::SaveOption::NO_RELOAD);
}
@ -401,13 +401,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.forward_waited_event_name);
OrtValue event_id;
const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardWaitedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
const int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetForwardWaitedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -416,13 +418,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.forward_waited_event_after_recv_name);
OrtValue event_id;
const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardWaitedEventIdAfterRecv(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
const int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetForwardWaitedEventIdAfterRecv(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -431,13 +435,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.forward_recorded_event_before_send_name);
OrtValue event_id;
const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardRecordedEventIdBeforeSend(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
const int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetForwardRecordedEventIdBeforeSend(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -446,13 +452,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.forward_recorded_event_name);
OrtValue event_id;
const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardRecordedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
const int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetForwardRecordedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -461,13 +469,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.backward_waited_event_name);
OrtValue event_id;
const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardWaitedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
const int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetBackwardWaitedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -476,13 +486,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.backward_waited_event_after_recv_name);
OrtValue event_id;
const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardWaitedEventIdAfterRecv(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
const int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetBackwardWaitedEventIdAfterRecv(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -491,13 +503,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.backward_recorded_event_before_send_name);
OrtValue event_id;
int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardRecordedEventIdBeforeSend(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetBackwardRecordedEventIdBeforeSend(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -506,13 +520,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode,
ORT_ENFORCE(params_.pipeline_parallel_size > 1);
feed_names.push_back(pipeline_context_.backward_recorded_event_name);
OrtValue event_id;
int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardRecordedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
int64_t id =
(mode == EvaluateStep) ? -1
: pipeline_schedule_.GetBackwardRecordedEventId(
pipeline_context_.pipeline_stage_id,
static_cast<int>(step_) % pipeline_context_.num_pipeline_batches);
TrainingUtil::CreateCpuMLScalar(
id,
&event_id,
input_allocator_);
id,
&event_id,
input_allocator_);
feeds.push_back(event_id);
}
@ -569,18 +585,18 @@ Status TrainingRunner::PrepareFetchNamesAndFetches(const SessionMode mode,
// Always execute event operators to avoid deadlock if pipeline is used.
// TODO: create a list of must-to-fetch tensors and pass it to all graph transformer.
if (params_.pipeline_parallel_size) {
if (!pipeline_context_.forward_wait_output_name.empty()) {
fetch_names.push_back(pipeline_context_.forward_wait_output_name);
}
if (!pipeline_context_.forward_record_output_name.empty()) {
fetch_names.push_back(pipeline_context_.forward_record_output_name);
}
if (!pipeline_context_.backward_wait_output_name.empty()) {
fetch_names.push_back(pipeline_context_.backward_wait_output_name);
}
if (!pipeline_context_.backward_record_output_name.empty()) {
fetch_names.push_back(pipeline_context_.backward_record_output_name);
}
if (!pipeline_context_.forward_wait_output_name.empty()) {
fetch_names.push_back(pipeline_context_.forward_wait_output_name);
}
if (!pipeline_context_.forward_record_output_name.empty()) {
fetch_names.push_back(pipeline_context_.forward_record_output_name);
}
if (!pipeline_context_.backward_wait_output_name.empty()) {
fetch_names.push_back(pipeline_context_.backward_wait_output_name);
}
if (!pipeline_context_.backward_record_output_name.empty()) {
fetch_names.push_back(pipeline_context_.backward_record_output_name);
}
}
} else if (mode == EvaluateStep) {
// Set up tensor to be fetched when doing model evaluation.
@ -647,7 +663,7 @@ Status TrainingRunner::RunWithUpdate(VectorString& feed_names,
// Assume that only the last pipeline stage can see loss, predicted value, and so on.
// Thus, the error function should only be called when we are at the last stage.
const bool session_can_see_loss = params_.pipeline_parallel_size == 1 ||
pipeline_context_.pipeline_stage_id == params_.pipeline_parallel_size - 1;
pipeline_context_.pipeline_stage_id == params_.pipeline_parallel_size - 1;
if (session_can_see_loss &&
!params_.is_perf_test &&
weight_update_step_count_ % params_.display_loss_steps == 0) {
@ -692,25 +708,26 @@ Status TrainingRunner::RunWithoutUpdate(VectorString& feed_names,
// Async launch of a session.
pipeline_worker_pool_.workers[worker_id] = std::thread([&](
const size_t worker_id, const size_t step) {
const size_t worker_id, const size_t step) {
#if !defined(NDEBUG) && defined(USE_CUDA) && !defined(_WIN32)
// Store the tag for the thread which runs session_.Run(...).
// It will be used to name range in Nvidia's visual profiler.
auto& profile_context = profile::Context::GetInstance();
profile_context.SetThreadTag(
std::this_thread::get_id(), std::to_string(step));
std::this_thread::get_id(), std::to_string(step));
#endif
// Dummy use of step to avoid warning when the code above is disabled.
// Dummy use of step to avoid warning when the code above is disabled.
ORT_ENFORCE(step + 1 > 0);
RunOptions run_options;
run_options.only_execute_path_to_fetches = true;
ORT_ENFORCE(session_.Run(
run_options,
pipeline_worker_pool_.worker_states[worker_id].feed_names,
pipeline_worker_pool_.worker_states[worker_id].feeds,
pipeline_worker_pool_.worker_states[worker_id].fetch_names,
&(pipeline_worker_pool_.worker_states[worker_id].fetches)) == Status::OK());
}, worker_id, step_);
run_options,
pipeline_worker_pool_.worker_states[worker_id].feed_names,
pipeline_worker_pool_.worker_states[worker_id].feeds,
pipeline_worker_pool_.worker_states[worker_id].fetch_names,
&(pipeline_worker_pool_.worker_states[worker_id].fetches)) == Status::OK());
},
worker_id, step_);
// Add one after process one batch.
++step_;
@ -749,6 +766,9 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad
const size_t stabilized_perf_total_step_count = std::min(static_cast<size_t>(128), params_.num_train_steps);
const size_t stabilized_perf_start_step = params_.num_train_steps - stabilized_perf_total_step_count;
double stabilized_total_time{0};
const size_t end_to_end_perf_start_step = 128;
auto end_to_end_start = std::chrono::high_resolution_clock::now();
bool end_to_end_measurement_started = false;
while (step_ < params_.num_train_steps) {
for (size_t shard_it = 0; shard_it < num_shards_to_visit; ++shard_it) {
@ -772,6 +792,12 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad
for (size_t batch = 0; batch < batch_num_cur_shard && step_ < params_.num_train_steps; ++batch) {
const bool is_weight_update_step = (step_ + 1) % params_.gradient_accumulation_steps == 0;
const bool stablized_perf_measurement_started = step_ >= stabilized_perf_start_step;
if (!end_to_end_measurement_started && step_ >= end_to_end_perf_start_step) {
end_to_end_start = std::chrono::high_resolution_clock::now();
end_to_end_measurement_started = true;
}
VectorString feed_names;
VectorString fetch_names;
std::vector<MLValue> feeds;
@ -781,53 +807,53 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad
if (is_weight_update_step) {
PrepareFeedNamesAndFeeds(ModelUpdateStep,
training_data_loader,
*training_data,
lr_scheduler.get(),
batch,
feed_names,
feeds);
training_data_loader,
*training_data,
lr_scheduler.get(),
batch,
feed_names,
feeds);
PrepareFetchNamesAndFetches(ModelUpdateStep,
fetch_names,
fetches);
RunWithUpdate(feed_names, fetch_names, feeds, fetches);
} else {
PrepareFeedNamesAndFeeds(GradientAccumulateStep,
training_data_loader,
*training_data,
lr_scheduler.get(),
batch,
feed_names,
feeds);
training_data_loader,
*training_data,
lr_scheduler.get(),
batch,
feed_names,
feeds);
PrepareFetchNamesAndFetches(GradientAccumulateStep,
fetch_names,
fetches);
RunWithoutUpdate(feed_names, fetch_names, feeds,
gradient_accumulation_step_count);
}
// at this point, step_ already be increased by 1.
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration_seconds = end - start;
total_time += duration_seconds.count();
if (step_ >= stabilized_perf_start_step) {
if (stablized_perf_measurement_started) {
stabilized_total_time += duration_seconds.count();
}
printf("Stage %d, Round %d, Step: %d, epoch: %d, batch: %d/%d, shard_iteration: %d/%d, time: %.2f ms, throughput: %.2f ex/sec \n",
pipeline_context_.pipeline_stage_id,
static_cast<int>(round_),
static_cast<int>(step_),
static_cast<int>(epoch),
static_cast<int>(batch),
static_cast<int>(batch_num_cur_shard),
static_cast<int>(shard_it + 1),
static_cast<int>(num_shards_to_visit),
duration_seconds.count() * 1000,
params_.batch_size * (step_ - step_start) / total_time);
pipeline_context_.pipeline_stage_id,
static_cast<int>(round_),
static_cast<int>(step_),
static_cast<int>(epoch),
static_cast<int>(batch),
static_cast<int>(batch_num_cur_shard),
static_cast<int>(shard_it + 1),
static_cast<int>(num_shards_to_visit),
duration_seconds.count() * 1000,
params_.batch_size * (step_ - step_start) / total_time);
printf("Training data range: [%d - %d)\n",
static_cast<int>(batch * params_.batch_size),
static_cast<int>((batch + 1) * params_.batch_size - 1));
static_cast<int>(batch * params_.batch_size),
static_cast<int>((batch + 1) * params_.batch_size - 1));
if (test_data_loader &&
params_.do_eval && step_ % params_.evaluation_period == 0) {
@ -887,6 +913,15 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad
average_cpu_usage, peak_workingset_size));
}
double e2e_throughput{0};
if (end_to_end_perf_start_step < params_.num_train_steps) {
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration_seconds = end - end_to_end_start;
const double total_e2e_time = duration_seconds.count();
const size_t end_to_end_step_count = params_.num_train_steps - std::max(step_start, end_to_end_perf_start_step);
e2e_throughput = params_.batch_size * end_to_end_step_count / total_e2e_time;
}
std::cout << "Round: " << round_ << "\n"
<< "Batch size: " << params_.batch_size << "\n"
<< "Number of Batches: " << number_of_batches << "\n"
@ -895,7 +930,8 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad
<< "Total Running Time: " << total_time << " Seconds \n"
<< "Average Running Time Per Batch: " << avg_time_per_batch << " ms\n"
<< "Throughput: " << throughput << " Examples / Second\n"
<< "Stabilized Throughput: " << stabilized_throughput << " Examples / Second\n";
<< "Stabilized Throughput: " << stabilized_throughput << " Examples / Second\n"
<< "EndToEnd Throughput: " << e2e_throughput << " Examples / Second\n";
return Status::OK();
}
@ -968,7 +1004,8 @@ Status TrainingRunner::SavePerfMetrics(const size_t number_of_batches, const siz
// write to a file - the next task in CI will pick up all files with the same prefix
const PathString perf_metrics_path =
params_.perf_output_dir + GetPathSep<PathChar>() + ORT_TSTR("onnxruntime_perf_metrics_") + ToPathString(display_name) + ORT_TSTR(".json");
params_.perf_output_dir + GetPathSep<PathChar>() + ORT_TSTR("onnxruntime_perf_metrics_") +
ToPathString(display_name) + ORT_TSTR(".json");
std::ofstream perf_metrics_stream;
perf_metrics_stream.open(perf_metrics_path, std::ios::out | std::ios::trunc);