From 6d03470587412f63dbb343be3a130e178a2f189b Mon Sep 17 00:00:00 2001 From: pengwa Date: Fri, 29 May 2020 10:08:29 +0800 Subject: [PATCH] Add e2e measurement for training (#4049) * add e2e measurement --- .../models/runner/training_runner.cc | 239 ++++++++++-------- 1 file changed, 138 insertions(+), 101 deletions(-) diff --git a/orttraining/orttraining/models/runner/training_runner.cc b/orttraining/orttraining/models/runner/training_runner.cc index 1870237bf3..a5bc149564 100644 --- a/orttraining/orttraining/models/runner/training_runner.cc +++ b/orttraining/orttraining/models/runner/training_runner.cc @@ -68,9 +68,9 @@ TrainingRunner::TrainingRunner(Parameters params, const Environment& env, Sessio ORT_ENFORCE(!params_.training_optimizer_name.empty()); if (params.partition_optimizer) ORT_ENFORCE(params.use_nccl, - "Optimizer partitioning is only supported with NCCL distributed training."); + "Optimizer partitioning is only supported with NCCL distributed training."); ORT_ENFORCE(params.num_train_steps % params.gradient_accumulation_steps == 0, - "Number of training steps must be a multiple of number of gradient accumulation step."); + "Number of training steps must be a multiple of number of gradient accumulation step."); } Status TrainingRunner::Initialize() { @@ -323,7 +323,7 @@ Status TrainingRunner::Initialize() { } Status TrainingRunner::Run(IDataLoader* training_data_loader, IDataLoader* test_data_loader, - const MapStringToString& mapped_dimensions) { + const MapStringToString& mapped_dimensions) { if (params_.mpi_context.world_rank == 0 && !params_.model_actual_running_graph_path.empty()) { session_.Save(params_.model_actual_running_graph_path, TrainingSession::SaveOption::NO_RELOAD); } @@ -401,13 +401,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.forward_waited_event_name); OrtValue event_id; - const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardWaitedEventId( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + const int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetForwardWaitedEventId( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -416,13 +418,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.forward_waited_event_after_recv_name); OrtValue event_id; - const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardWaitedEventIdAfterRecv( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + const int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetForwardWaitedEventIdAfterRecv( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -431,13 +435,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.forward_recorded_event_before_send_name); OrtValue event_id; - const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardRecordedEventIdBeforeSend( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + const int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetForwardRecordedEventIdBeforeSend( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -446,13 +452,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.forward_recorded_event_name); OrtValue event_id; - const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetForwardRecordedEventId( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + const int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetForwardRecordedEventId( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -461,13 +469,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.backward_waited_event_name); OrtValue event_id; - const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardWaitedEventId( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + const int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetBackwardWaitedEventId( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -476,13 +486,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.backward_waited_event_after_recv_name); OrtValue event_id; - const int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardWaitedEventIdAfterRecv( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + const int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetBackwardWaitedEventIdAfterRecv( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -491,13 +503,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.backward_recorded_event_before_send_name); OrtValue event_id; - int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardRecordedEventIdBeforeSend( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetBackwardRecordedEventIdBeforeSend( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -506,13 +520,15 @@ Status TrainingRunner::PrepareFeedNamesAndFeeds(const SessionMode mode, ORT_ENFORCE(params_.pipeline_parallel_size > 1); feed_names.push_back(pipeline_context_.backward_recorded_event_name); OrtValue event_id; - int64_t id = (mode == EvaluateStep) ? -1 : pipeline_schedule_.GetBackwardRecordedEventId( - pipeline_context_.pipeline_stage_id, - static_cast(step_) % pipeline_context_.num_pipeline_batches); + int64_t id = + (mode == EvaluateStep) ? -1 + : pipeline_schedule_.GetBackwardRecordedEventId( + pipeline_context_.pipeline_stage_id, + static_cast(step_) % pipeline_context_.num_pipeline_batches); TrainingUtil::CreateCpuMLScalar( - id, - &event_id, - input_allocator_); + id, + &event_id, + input_allocator_); feeds.push_back(event_id); } @@ -569,18 +585,18 @@ Status TrainingRunner::PrepareFetchNamesAndFetches(const SessionMode mode, // Always execute event operators to avoid deadlock if pipeline is used. // TODO: create a list of must-to-fetch tensors and pass it to all graph transformer. if (params_.pipeline_parallel_size) { - if (!pipeline_context_.forward_wait_output_name.empty()) { - fetch_names.push_back(pipeline_context_.forward_wait_output_name); - } - if (!pipeline_context_.forward_record_output_name.empty()) { - fetch_names.push_back(pipeline_context_.forward_record_output_name); - } - if (!pipeline_context_.backward_wait_output_name.empty()) { - fetch_names.push_back(pipeline_context_.backward_wait_output_name); - } - if (!pipeline_context_.backward_record_output_name.empty()) { - fetch_names.push_back(pipeline_context_.backward_record_output_name); - } + if (!pipeline_context_.forward_wait_output_name.empty()) { + fetch_names.push_back(pipeline_context_.forward_wait_output_name); + } + if (!pipeline_context_.forward_record_output_name.empty()) { + fetch_names.push_back(pipeline_context_.forward_record_output_name); + } + if (!pipeline_context_.backward_wait_output_name.empty()) { + fetch_names.push_back(pipeline_context_.backward_wait_output_name); + } + if (!pipeline_context_.backward_record_output_name.empty()) { + fetch_names.push_back(pipeline_context_.backward_record_output_name); + } } } else if (mode == EvaluateStep) { // Set up tensor to be fetched when doing model evaluation. @@ -647,7 +663,7 @@ Status TrainingRunner::RunWithUpdate(VectorString& feed_names, // Assume that only the last pipeline stage can see loss, predicted value, and so on. // Thus, the error function should only be called when we are at the last stage. const bool session_can_see_loss = params_.pipeline_parallel_size == 1 || - pipeline_context_.pipeline_stage_id == params_.pipeline_parallel_size - 1; + pipeline_context_.pipeline_stage_id == params_.pipeline_parallel_size - 1; if (session_can_see_loss && !params_.is_perf_test && weight_update_step_count_ % params_.display_loss_steps == 0) { @@ -692,25 +708,26 @@ Status TrainingRunner::RunWithoutUpdate(VectorString& feed_names, // Async launch of a session. pipeline_worker_pool_.workers[worker_id] = std::thread([&]( - const size_t worker_id, const size_t step) { + const size_t worker_id, const size_t step) { #if !defined(NDEBUG) && defined(USE_CUDA) && !defined(_WIN32) // Store the tag for the thread which runs session_.Run(...). // It will be used to name range in Nvidia's visual profiler. auto& profile_context = profile::Context::GetInstance(); profile_context.SetThreadTag( - std::this_thread::get_id(), std::to_string(step)); + std::this_thread::get_id(), std::to_string(step)); #endif - // Dummy use of step to avoid warning when the code above is disabled. + // Dummy use of step to avoid warning when the code above is disabled. ORT_ENFORCE(step + 1 > 0); RunOptions run_options; run_options.only_execute_path_to_fetches = true; ORT_ENFORCE(session_.Run( - run_options, - pipeline_worker_pool_.worker_states[worker_id].feed_names, - pipeline_worker_pool_.worker_states[worker_id].feeds, - pipeline_worker_pool_.worker_states[worker_id].fetch_names, - &(pipeline_worker_pool_.worker_states[worker_id].fetches)) == Status::OK()); - }, worker_id, step_); + run_options, + pipeline_worker_pool_.worker_states[worker_id].feed_names, + pipeline_worker_pool_.worker_states[worker_id].feeds, + pipeline_worker_pool_.worker_states[worker_id].fetch_names, + &(pipeline_worker_pool_.worker_states[worker_id].fetches)) == Status::OK()); + }, + worker_id, step_); // Add one after process one batch. ++step_; @@ -749,6 +766,9 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad const size_t stabilized_perf_total_step_count = std::min(static_cast(128), params_.num_train_steps); const size_t stabilized_perf_start_step = params_.num_train_steps - stabilized_perf_total_step_count; double stabilized_total_time{0}; + const size_t end_to_end_perf_start_step = 128; + auto end_to_end_start = std::chrono::high_resolution_clock::now(); + bool end_to_end_measurement_started = false; while (step_ < params_.num_train_steps) { for (size_t shard_it = 0; shard_it < num_shards_to_visit; ++shard_it) { @@ -772,6 +792,12 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad for (size_t batch = 0; batch < batch_num_cur_shard && step_ < params_.num_train_steps; ++batch) { const bool is_weight_update_step = (step_ + 1) % params_.gradient_accumulation_steps == 0; + const bool stablized_perf_measurement_started = step_ >= stabilized_perf_start_step; + if (!end_to_end_measurement_started && step_ >= end_to_end_perf_start_step) { + end_to_end_start = std::chrono::high_resolution_clock::now(); + end_to_end_measurement_started = true; + } + VectorString feed_names; VectorString fetch_names; std::vector feeds; @@ -781,53 +807,53 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad if (is_weight_update_step) { PrepareFeedNamesAndFeeds(ModelUpdateStep, - training_data_loader, - *training_data, - lr_scheduler.get(), - batch, - feed_names, - feeds); + training_data_loader, + *training_data, + lr_scheduler.get(), + batch, + feed_names, + feeds); PrepareFetchNamesAndFetches(ModelUpdateStep, fetch_names, fetches); RunWithUpdate(feed_names, fetch_names, feeds, fetches); } else { PrepareFeedNamesAndFeeds(GradientAccumulateStep, - training_data_loader, - *training_data, - lr_scheduler.get(), - batch, - feed_names, - feeds); + training_data_loader, + *training_data, + lr_scheduler.get(), + batch, + feed_names, + feeds); PrepareFetchNamesAndFetches(GradientAccumulateStep, fetch_names, fetches); RunWithoutUpdate(feed_names, fetch_names, feeds, gradient_accumulation_step_count); - } + // at this point, step_ already be increased by 1. auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration duration_seconds = end - start; total_time += duration_seconds.count(); - if (step_ >= stabilized_perf_start_step) { + if (stablized_perf_measurement_started) { stabilized_total_time += duration_seconds.count(); } printf("Stage %d, Round %d, Step: %d, epoch: %d, batch: %d/%d, shard_iteration: %d/%d, time: %.2f ms, throughput: %.2f ex/sec \n", - pipeline_context_.pipeline_stage_id, - static_cast(round_), - static_cast(step_), - static_cast(epoch), - static_cast(batch), - static_cast(batch_num_cur_shard), - static_cast(shard_it + 1), - static_cast(num_shards_to_visit), - duration_seconds.count() * 1000, - params_.batch_size * (step_ - step_start) / total_time); + pipeline_context_.pipeline_stage_id, + static_cast(round_), + static_cast(step_), + static_cast(epoch), + static_cast(batch), + static_cast(batch_num_cur_shard), + static_cast(shard_it + 1), + static_cast(num_shards_to_visit), + duration_seconds.count() * 1000, + params_.batch_size * (step_ - step_start) / total_time); printf("Training data range: [%d - %d)\n", - static_cast(batch * params_.batch_size), - static_cast((batch + 1) * params_.batch_size - 1)); + static_cast(batch * params_.batch_size), + static_cast((batch + 1) * params_.batch_size - 1)); if (test_data_loader && params_.do_eval && step_ % params_.evaluation_period == 0) { @@ -887,6 +913,15 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad average_cpu_usage, peak_workingset_size)); } + double e2e_throughput{0}; + if (end_to_end_perf_start_step < params_.num_train_steps) { + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration duration_seconds = end - end_to_end_start; + const double total_e2e_time = duration_seconds.count(); + const size_t end_to_end_step_count = params_.num_train_steps - std::max(step_start, end_to_end_perf_start_step); + e2e_throughput = params_.batch_size * end_to_end_step_count / total_e2e_time; + } + std::cout << "Round: " << round_ << "\n" << "Batch size: " << params_.batch_size << "\n" << "Number of Batches: " << number_of_batches << "\n" @@ -895,7 +930,8 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad << "Total Running Time: " << total_time << " Seconds \n" << "Average Running Time Per Batch: " << avg_time_per_batch << " ms\n" << "Throughput: " << throughput << " Examples / Second\n" - << "Stabilized Throughput: " << stabilized_throughput << " Examples / Second\n"; + << "Stabilized Throughput: " << stabilized_throughput << " Examples / Second\n" + << "EndToEnd Throughput: " << e2e_throughput << " Examples / Second\n"; return Status::OK(); } @@ -968,7 +1004,8 @@ Status TrainingRunner::SavePerfMetrics(const size_t number_of_batches, const siz // write to a file - the next task in CI will pick up all files with the same prefix const PathString perf_metrics_path = - params_.perf_output_dir + GetPathSep() + ORT_TSTR("onnxruntime_perf_metrics_") + ToPathString(display_name) + ORT_TSTR(".json"); + params_.perf_output_dir + GetPathSep() + ORT_TSTR("onnxruntime_perf_metrics_") + + ToPathString(display_name) + ORT_TSTR(".json"); std::ofstream perf_metrics_stream; perf_metrics_stream.open(perf_metrics_path, std::ios::out | std::ios::trunc);