diff --git a/.gitignore b/.gitignore index 85585b2fcd..4adf00c749 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,7 @@ java/gradlew.bat java/gradle java/.gradle java/hs_*.log +/tools/perf_util/target/classes/com/msft/send_perf_metrics +/tools/perf_util/send_perf_metrics.iml +/tools/perf_util/target/classes +/tools/perf_util/src/main/resources diff --git a/cmake/onnxruntime_training.cmake b/cmake/onnxruntime_training.cmake index 9b55424c5b..a2d81e0056 100644 --- a/cmake/onnxruntime_training.cmake +++ b/cmake/onnxruntime_training.cmake @@ -46,10 +46,9 @@ add_dependencies(onnxruntime_training_runner ${onnxruntime_EXTERNAL_DEPENDENCIES onnxruntime_add_include_to_target(onnxruntime_training_runner onnxruntime_common onnx onnx_proto protobuf::libprotobuf onnxruntime_training) +target_include_directories(onnxruntime_training_runner PRIVATE ${ONNXRUNTIME_ROOT} ${ORTTRAINING_ROOT} ${eigen_INCLUDE_DIRS} ${PROJECT_SOURCE_DIR}/external/json PUBLIC ${onnxruntime_graph_header}) if (onnxruntime_USE_CUDA) - target_include_directories(onnxruntime_training_runner PRIVATE ${ONNXRUNTIME_ROOT} ${ORTTRAINING_ROOT} ${eigen_INCLUDE_DIRS} PUBLIC ${onnxruntime_graph_header} ${onnxruntime_CUDNN_HOME}/include ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES}) -else() - target_include_directories(onnxruntime_training_runner PRIVATE ${ONNXRUNTIME_ROOT} ${ORTTRAINING_ROOT} ${eigen_INCLUDE_DIRS} PUBLIC ${onnxruntime_graph_header}) + target_include_directories(onnxruntime_training_runner PUBLIC ${onnxruntime_CUDNN_HOME}/include ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES}) endif() if(UNIX AND NOT APPLE) target_compile_options(onnxruntime_training_runner PUBLIC "-Wno-maybe-uninitialized") diff --git a/orttraining/orttraining/models/bert/main.cc b/orttraining/orttraining/models/bert/main.cc index d828600cfb..9d48b920c8 100644 --- a/orttraining/orttraining/models/bert/main.cc +++ b/orttraining/orttraining/models/bert/main.cc @@ -67,6 +67,8 @@ Status ParseArguments(int argc, char* argv[], BertParameters& params, OrtParamet cxxopts::value()->default_value("")) ("output_dir", "The output directory where the trained model files will be written.", cxxopts::value()->default_value("")) + ("perf_output_dir", "The output directory where the trained perf metrics files will be written.", + cxxopts::value()->default_value("")) ("checkpoints_dir", "The output directory where the checkpoint files will be written.", cxxopts::value()->default_value("")) ("checkpoint_to_load_path", @@ -239,6 +241,10 @@ Status ParseArguments(int argc, char* argv[], BertParameters& params, OrtParamet if (params.output_dir.empty()) { printf("No output directory specified. Trained model files will not be saved.\n"); } + params.perf_output_dir = ToPathString(flags["perf_output_dir"].as()); + if (params.perf_output_dir.empty()) { + printf("No perf output directory specified. Trained perf metrics will not be saved.\n"); + } params.checkpoints_dir = ToPathString(flags["checkpoints_dir"].as()); if (params.checkpoints_dir.empty()) { printf("No checkpoints directory specified. Checkpoint files will not be saved.\n"); @@ -346,16 +352,14 @@ Status ParseArguments(int argc, char* argv[], BertParameters& params, OrtParamet {"lambda", zero_lambda ? 0.f : lambda}, {"epsilon", epsilon}, {"ratio_min", ratio_min}, - {"ratio_max", ratio_max} - }; + {"ratio_max", ratio_max}}; }; // Optimizer's int attributes. params.optimizer_int_attributes = [=](const std::string& /*weight*/) { return std::unordered_map{ {"do_bias_correction", do_bias_correction ? static_cast(1) : static_cast(0)}, - {"weight_decay_mode", weight_decay_mode} - }; + {"weight_decay_mode", weight_decay_mode}}; }; params.data_parallel_size = flags["data_parallel_size"].as(); @@ -377,7 +381,7 @@ Status ParseArguments(int argc, char* argv[], BertParameters& params, OrtParamet int64_t seed = flags["seed"].as(); if (params.horizontal_parallel_size > 1 && seed <= 0) { - seed = 8211; // Megatron needs a random seed. + seed = 8211; // Megatron needs a random seed. } if (seed > 0) { utils::SetRandomSeed(seed); @@ -428,6 +432,12 @@ float GetLossValue(const Tensor& loss_tensor) { return loss; } +// mapping of max_sequence_length and max_predictions_per_sequence position derived from training data +std::map> input_to_dimension_mapping; + +// generic properties for storing perf metrics +MapStringToString mapped_dimensions; + void setup_training_params(BertParameters& params) { params.model_path = ToPathString(params.model_name) + ORT_TSTR(".onnx"); params.model_with_loss_func_path = ToPathString(params.model_name) + ORT_TSTR("_with_cost.onnx"); @@ -444,7 +454,7 @@ void setup_training_params(BertParameters& params) { } auto data_group_size = params.mpi_context.world_size / (params.horizontal_parallel_size * params.pipeline_parallel_size); - ORT_ENFORCE(data_group_size > 0, "Insufficient processes lead to zero-way data parallelism, which should be at least one-way."); + ORT_ENFORCE(data_group_size > 0, "Insufficient processes lead to zero-way data parallelism, which should be at least one-way."); if (data_group_size != params.data_parallel_size) { LOGS_DEFAULT(WARNING) << "WARNING: data_parallel_size is not correct, tuned automatically to " << data_group_size << std::endl; @@ -479,7 +489,7 @@ void setup_training_params(BertParameters& params) { params.weights_not_to_train = { "position_01", // Slice's dat input "op_min_ends_expand_10", //op_min_ends_expand_10 - "72", // [BERT-tiny only] input of expand + "72", // [BERT-tiny only] input of expand }; params.fetch_names = {"total_loss", "mlm_loss", "nsp_loss"}; @@ -506,6 +516,19 @@ void setup_training_params(BertParameters& params) { {"masked_lm_weights", "masked_lm_weights"}, {"next_sentence_label", "next_sentence_labels"}}; + // use this table mapping to define what to be stored in mapped_dimensions, and ultimately in json structure + // Be mindful on the position, if it's invalid or out of bound, the property population process will be + // either incorrect or aborted. Also make sure to substract the index position by 1 to get valid correspondent value + // namely, in the graph, sequence is at position 1, but in initial tensor shape vector loaded from training data is at position 0, + // batch is not part of the initial tensor shape vector till later + // see GetTensorDimensionsFromInputs() in training_util.h and training_runner.cc for more details + input_to_dimension_mapping = { + {"input1", {"SeqLen", 0}}, // int64[batch,sequence] "sequence" -> "SeqLen", 0 + {"masked_lm_ids", {"PredictionsPerSeq", 0}} // int64[batch,dynamic_prediction_count] + }; + + params.model_type = "bert"; + params.skip_evaluation = params.is_perf_test; params.error_function = [params](const std::vector& /*feed_names*/, @@ -655,14 +678,20 @@ static Status RunTraining(const BertParameters& params, const Environment& env) max_num_files_preload); } - ORT_RETURN_IF_ERROR(runner->Run(training_data_loader.get(), test_data_loader.get())); + if (!params.perf_output_dir.empty()) { + // collecting Bert related params from training data + auto training_data = training_data_loader->CurrentDataSet(); + ORT_RETURN_IF_ERROR(training_data->GetTensorDimensionsFromInputs(input_to_dimension_mapping, mapped_dimensions)); + } + + ORT_RETURN_IF_ERROR(runner->Run(training_data_loader.get(), test_data_loader.get(), mapped_dimensions)); ORT_RETURN_IF_ERROR(runner->ResetLossScaler()); } auto test_data_loader = onnxruntime::make_unique(params_for_phase.input_name_map, - params_for_phase.test_data_dir, - max_num_files_preload); + params_for_phase.test_data_dir, + max_num_files_preload); ORT_RETURN_IF_ERROR(runner->EndTraining(test_data_loader.get())); return Status::OK(); diff --git a/orttraining/orttraining/models/runner/training_runner.cc b/orttraining/orttraining/models/runner/training_runner.cc index f5381cb6a0..dc408c9440 100644 --- a/orttraining/orttraining/models/runner/training_runner.cc +++ b/orttraining/orttraining/models/runner/training_runner.cc @@ -16,6 +16,9 @@ #include "orttraining/core/framework/checkpointing.h" #include "orttraining/core/graph/optimizer_graph_builder.h" #include "orttraining/models/runner/training_util.h" +#include "single_include/nlohmann/json.hpp" + +using json = nlohmann::json; namespace onnxruntime { namespace training { @@ -263,7 +266,8 @@ Status TrainingRunner::Initialize() { return Status::OK(); } -Status TrainingRunner::Run(IDataLoader* training_data_loader, IDataLoader* test_data_loader) { +Status TrainingRunner::Run(IDataLoader* training_data_loader, IDataLoader* test_data_loader, + const MapStringToString& mapped_dimensions) { if (params_.mpi_context.world_rank == 0 && !params_.model_actual_running_graph_path.empty()) { session_.Save(params_.model_actual_running_graph_path, TrainingSession::SaveOption::NO_RELOAD); } @@ -274,7 +278,7 @@ Status TrainingRunner::Run(IDataLoader* training_data_loader, IDataLoader* test_ return Status::OK(); } - ORT_RETURN_IF_ERROR(TrainingLoop(*training_data_loader, test_data_loader)); + ORT_RETURN_IF_ERROR(TrainingLoop(*training_data_loader, test_data_loader, mapped_dimensions)); // after successful Run(), update counters ++round_; @@ -571,7 +575,8 @@ Status TrainingRunner::RunWithoutUpdate(VectorString& feed_names, return Status::OK(); } -Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoader* test_data_loader) { +Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoader* test_data_loader, + const MapStringToString& mapped_dimensions) { const bool enable_checkpoint_saving = params_.mpi_context.world_rank == 0 && checkpoint_registry_ && params_.checkpoint_period > 0; @@ -712,16 +717,109 @@ Status TrainingRunner::TrainingLoop(IDataLoader& training_data_loader, IDataLoad ++epoch; } + const size_t number_of_batches = step_ - step_start; + const size_t weight_update_steps = weight_update_step_count_ - weight_update_step_count_start; + const double avg_time_per_batch = total_time / (step_ - step_start) * 1000; + const double throughput = params_.batch_size * (step_ - step_start) / total_time; + const double stabilized_throughput = params_.batch_size / (stabilized_total_time / stabilized_perf_total_step_count); + + if (params_.perf_output_dir.empty()) { + printf("No perf output directory specified, skipping save of trained perf metrics.\n"); + } else { + ORT_RETURN_IF_ERROR(Env::Default().CreateFolder(params_.perf_output_dir)); + // saving json file + ORT_RETURN_IF_ERROR(SavePerfMetrics(number_of_batches, gradient_accumulation_step_count, weight_update_steps, + total_time, avg_time_per_batch, throughput, stabilized_throughput, mapped_dimensions)); + } + std::cout << "Round: " << round_ << "\n" << "Batch size: " << params_.batch_size << "\n" - << "Number of Batches: " << (step_ - step_start) << "\n" + << "Number of Batches: " << number_of_batches << "\n" << "Gradient Accumulation Steps: " << gradient_accumulation_step_count << "\n" - << "Weight Update Steps: " << (weight_update_step_count_ - weight_update_step_count_start) << "\n" + << "Weight Update Steps: " << weight_update_steps << "\n" << "Total Running Time: " << total_time << " Seconds \n" - << "Average Running Time Per Batch: " << total_time / (step_ - step_start) * 1000 << " ms\n" - << "Throughput: " << params_.batch_size * (step_ - step_start) / total_time << " Examples / Second\n" - << "Stabilized Throughput: " << params_.batch_size / (stabilized_total_time / stabilized_perf_total_step_count) - << " Examples / Second\n"; + << "Average Running Time Per Batch: " << avg_time_per_batch << " ms\n" + << "Throughput: " << throughput << " Examples / Second\n" + << "Stabilized Throughput: " << stabilized_throughput << " Examples / Second\n"; + + return Status::OK(); +} + +Status TrainingRunner::SavePerfMetrics(const size_t number_of_batches, const size_t gradient_accumulation_steps, + const size_t weight_update_steps, const double total_time, + const double avg_time_per_batch, const double throughput, const double stabilized_throughput, + const MapStringToString& mapped_dimensions) { + // populate metrics for reporting + json perf_metrics; + perf_metrics["Model"] = params_.model_type; + + // loop thru the mapped_dimensions and put it in json sub-structure + std::string seq_len; + for (auto const& it : mapped_dimensions) { + if (it.first == "SeqLen") { + seq_len = it.second; + } + perf_metrics["DerivedProperties"][it.first] = it.second; + } + + perf_metrics["Round"] = round_; + perf_metrics["BatchSize"] = params_.batch_size; + perf_metrics["NumOfBatches"] = number_of_batches; + perf_metrics["GradAccSteps"] = gradient_accumulation_steps; + perf_metrics["WeightUpdateSteps"] = weight_update_steps; + perf_metrics["TotalTime"] = total_time; + perf_metrics["AvgTimePerBatch"] = avg_time_per_batch; + perf_metrics["Throughput"] = throughput; + perf_metrics["StabilizedThroughput"] = stabilized_throughput; + perf_metrics["UseMixedPrecision"] = params_.use_mixed_precision; + + std::string optimizer = params_.training_optimizer_name; + std::size_t pos = optimizer.find("Optimizer"); + if (pos != std::string::npos) + optimizer = optimizer.substr(0, pos); + perf_metrics["Optimizer"] = optimizer; + + Path model_path{}; + ORT_RETURN_IF_ERROR(Path::Parse(params_.model_path, model_path)); + PathString leaf = model_path.GetComponents().back(); + std::string model_name = ToMBString(leaf.c_str()); + perf_metrics["ModelName"] = model_name; + + std::string display_name = model_name + "_" + params_.model_type + "_" + (params_.use_mixed_precision ? "fp16" : "fp32") + + (seq_len.empty() ? "" : "_" + seq_len) + "_" + optimizer; + perf_metrics["DisplayName"] = display_name; + + + // TODO - add memory/cpu + //j["Memory"] = ; + //j["AvgCPU"] = ; + + // + // we will get date/time and commitId in post-run pipeline + // + + // populate other basic params for bookkeeping - add more as needed + json bookkeeping_params; + bookkeeping_params["LearningRate"] = params_.lr_params.initial_lr; + bookkeeping_params["WarmupRatio"] = params_.lr_params.warmup_ratio; + bookkeeping_params["WarmupMode"] = params_.lr_params.warmup_mode; + bookkeeping_params["TrainSteps"] = params_.num_train_steps; + bookkeeping_params["ModelPath"] = ToMBString(params_.model_path.c_str()); + bookkeeping_params["TrainDataDir"] = ToMBString(params_.train_data_dir.c_str()); + bookkeeping_params["TestDataDir"] = ToMBString(params_.test_data_dir.c_str()); + + perf_metrics["RunConfig"] = bookkeeping_params.dump(); // serialize the params as json string + + std::string json_string = perf_metrics.dump(); + + // write to a file - the next task in CI will pick up all files with the same prefix + const PathString perf_metrics_path = + params_.perf_output_dir + GetPathSep() + ORT_TSTR("onnxruntime_perf_metrics_") + ToPathString(display_name) + ORT_TSTR(".json"); + + std::ofstream perf_metrics_stream; + perf_metrics_stream.open(perf_metrics_path, std::ios::out | std::ios::trunc); + ORT_RETURN_IF_NOT(perf_metrics_stream << json_string << "\n", "Failed to write to output file."); + return Status::OK(); } diff --git a/orttraining/orttraining/models/runner/training_runner.h b/orttraining/orttraining/models/runner/training_runner.h index e5e0d88a59..c0be37949e 100644 --- a/orttraining/orttraining/models/runner/training_runner.h +++ b/orttraining/orttraining/models/runner/training_runner.h @@ -31,6 +31,8 @@ class TrainingRunner { PathString train_data_dir; PathString test_data_dir; PathString output_dir; // Output of training, e.g., trained model files. + PathString perf_output_dir; // training perf metrics + std::string model_type; // bert/gpt2/... LossFunctionInfo loss_func_info; @@ -167,7 +169,8 @@ class TrainingRunner { common::Status Initialize(); - common::Status Run(IDataLoader* training_data_loader, IDataLoader* test_data_loader); + common::Status Run(IDataLoader* training_data_loader, IDataLoader* test_data_loader, + const MapStringToString& mapped_dimensions = {}); common::Status EndTraining(IDataLoader* data_loader); @@ -198,7 +201,8 @@ class TrainingRunner { VectorString& fetch_names, std::vector& feeds, size_t& gradient_accumulation_step_count); - Status TrainingLoop(IDataLoader& training_data_loader, IDataLoader* test_data_loader); + Status TrainingLoop(IDataLoader& training_data_loader, IDataLoader* test_data_loader, + const MapStringToString& mapped_dimensions); Status Evaluate(InferenceSession& session, IDataLoader& data_loader); Status SaveCheckpoint(const PathString& checkpoint_path); @@ -206,6 +210,11 @@ class TrainingRunner { Status SaveCheckpointProperties(std::unordered_map& properties) const; Status LoadCheckpointProperties(const std::unordered_map& properties); + Status SavePerfMetrics(const size_t number_of_batches, const size_t gradient_accumulation_steps, + const size_t weight_update_steps, const double total_time, + const double avg_time_per_batch, const double throughput, const double stabilized_throughput, + const MapStringToString& mapped_dimensions); + size_t step_; size_t round_; size_t weight_update_step_count_; diff --git a/orttraining/orttraining/models/runner/training_util.cc b/orttraining/orttraining/models/runner/training_util.cc index c351301789..d2d122c036 100644 --- a/orttraining/orttraining/models/runner/training_util.cc +++ b/orttraining/orttraining/models/runner/training_util.cc @@ -72,6 +72,29 @@ size_t DataSet::TotalBatch(size_t batch_size) const { return NumSamples() / batch_size + ((NumSamples() % batch_size > 0) ? 1 : 0); } +// gather additional training params from tensor dimensions +// see input_to_dimension_mapping in bert/main.cc for example, and training_utils.h for more explanation +common::Status DataSet::GetTensorDimensionsFromInputs(const std::map>& input_to_dimension_mapping, + MapStringToString& mapped_dimensions) const { + if (input_to_dimension_mapping.size() == 0) return Status::OK(); + + for (size_t input_index = 0; input_index < NumInputs(); ++input_index) { + std::string input_name = GetInputName(input_index); + const auto it = input_to_dimension_mapping.find(input_name); + if (it == input_to_dimension_mapping.end()) continue; + auto metric = it->second; + + const Tensor& first_tensor = data_[0]->at(input_index).Get(); + std::vector shape_vector = first_tensor.Shape().GetDims(); + + ORT_RETURN_IF_NOT(metric.second < shape_vector.size(), "Index out of bounds for input: ", input_name.c_str(), + "; requested index: ", metric.second, ", actual size: ", shape_vector.size()); + + mapped_dimensions.insert({metric.first, std::to_string(shape_vector[metric.second])}); + } + return Status::OK(); +} + std::vector DataSet::GetKthBatch(size_t batch_size, size_t k_th, AllocatorPtr allocator) const { batch_size = min(batch_size, data_.size()); diff --git a/orttraining/orttraining/models/runner/training_util.h b/orttraining/orttraining/models/runner/training_util.h index ab87c2e9af..8dcdffe23f 100644 --- a/orttraining/orttraining/models/runner/training_util.h +++ b/orttraining/orttraining/models/runner/training_util.h @@ -41,6 +41,8 @@ class DataSet { size_t NumInputs() const { return tensor_names_.size(); } + std::string GetInputName(size_t input_index) const { return tensor_names_[input_index]; } + common::Status AddData(SampleType&& single_sample); common::Status AddData(const std::vector& features); @@ -54,6 +56,26 @@ class DataSet { void RandomShuffle(); + /** + * The method is for getting model training params that are part of training data + * first load .onnx model in Netron to get the mapping between input data and the graph + * for example, a bert model (see input_name_map in bert/main.cc) requires 7 inputs + * each input may have different tensor shape, like so + * intput1 : int64[batch,sequence] + * masked_lm_ids: int64[batch,dynamic_prediction_count] + * When loading training data, the actual shape vector of tensor would not include "batch", thus caller needs to adjust + * the index position (i.e., subtract by 1) to get the correspondent value. For example, + * to get sequence length, we can look for input name "input1" and get its value in shape vector's position 0 (NOT 1) element + * based on input_to_dimension_mapping (see input_to_dimension_mapping example in bert/main.cc) to map the name with the vector position, + * like so + * {"input1", {"SeqLen", 0}} => sequence->SeqLen , where SeqLen will be populated as key in mapped_dimensions + * @param input_to_dimension_mapping tensor shape dimension mapping from training data, example above {"input1", {"SeqLen", 0}} to map + * input1's "sequence" at position 0 into "SeqLen" as mapped_dimensions key + * @param mapped_dimensions perf properties to be populated from training data; e.g., SeqLen->128 + */ + common::Status GetTensorDimensionsFromInputs(const std::map>& input_to_dimension_mapping, + MapStringToString& mapped_dimensions) const; + private: // The names of the tensors. std::vector tensor_names_; diff --git a/tools/perf_util/pom.xml b/tools/perf_util/pom.xml new file mode 100644 index 0000000000..1a2d79079b --- /dev/null +++ b/tools/perf_util/pom.xml @@ -0,0 +1,56 @@ + + 4.0.0 + + com.msft + send_perf_metrics + 0.0.1-SNAPSHOT + jar + + send_perf_metrics + http://maven.apache.org + + + + + maven-assembly-plugin + 3.1.1 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + UTF-8 + 1.8 + 1.8 + + + + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + + mysql + mysql-connector-java + 8.0.15 + + + diff --git a/tools/perf_util/src/main/java/com/msft/send_perf_metrics/App.java b/tools/perf_util/src/main/java/com/msft/send_perf_metrics/App.java new file mode 100644 index 0000000000..4948b785d8 --- /dev/null +++ b/tools/perf_util/src/main/java/com/msft/send_perf_metrics/App.java @@ -0,0 +1,178 @@ +package com.msft.send_perf_metrics; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; + +import java.io.*; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +import java.sql.Connection; +import java.sql.Types; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +public class App { + + static String exec_command(Path source_dir, String... commands) throws Exception { + ProcessBuilder sb = new ProcessBuilder(commands).directory(source_dir.toFile()).redirectErrorStream(true); + Process p = sb.start(); + if (p.waitFor() != 0) + throw new RuntimeException("execute " + String.join(" ", commands) + " failed"); + try (BufferedReader r = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + return r.readLine(); + } + } + + public static void main(String[] args) throws Exception { + + final Path source_dir = Paths.get(args[0]); + final List perf_metrics = new ArrayList(); + Files.walkFileTree(source_dir, new SimpleFileVisitor() { + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + String dirname = dir.getFileName().toString(); + if (dirname != "." && dirname.startsWith(".")) + return FileVisitResult.SKIP_SUBTREE; + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + String filename = file.getFileName().toString(); + + if (!filename.startsWith(".") && filename.endsWith(".json")) { + perf_metrics.add(file); + } + return FileVisitResult.CONTINUE; + } + + }); + System.out.println(perf_metrics); + + final Path cwd_dir = Paths.get(System.getProperty("user.dir")); + // git rev-parse HEAD + String commit_id = exec_command(cwd_dir, "git", "rev-parse", "HEAD"); + String date = exec_command(cwd_dir, "git", "show", "-s", "--format=%ci", commit_id); + final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z"); + java.util.Date commitDate = sdf.parse(date); + final SimpleDateFormat simple_date_format = new SimpleDateFormat("yyyy-MM-dd"); + String batch_id = simple_date_format.format(commitDate); + System.out.println(String.format("Commit change date: %s", batch_id)); + + // collect all json files list + processPerfMetrics(perf_metrics, commit_id, batch_id); + + // TODO - add e2e tests later, run it w/ process command + } + + private static void processPerfMetrics(final List perf_metrics, String commit_id, + String batch_id) throws Exception { + try { + Connection conn = JdbcUtil.GetConn(); + // go thru each json file + JSONParser jsonParser = new JSONParser(); + for (Path metrics_json : perf_metrics) { + try (FileReader reader = new FileReader(metrics_json.toAbsolutePath().toString())) { + // Read JSON file + Object obj = jsonParser.parse(reader); + loadMetricsIntoMySQL(conn, commit_id, batch_id, (JSONObject) obj); + } + } + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + static private void loadMetricsIntoMySQL(java.sql.Connection conn, String commit_id, String batch_id, + JSONObject json_object) throws Exception { + + try (java.sql.PreparedStatement st = conn.prepareStatement( + "INSERT INTO perf_test_training_data (BatchId,CommitId,Model,ModelName,DisplayName,UseMixedPrecision,Optimizer,BatchSize,SeqLen,PredictionsPerSeq," + + "NumOfBatches,WeightUpdateSteps,Round,GradAccSteps,AvgTimePerBatch,Throughput,StabilizedThroughput,TotalTime,AvgCPU,Memory,RunConfig,Time) " + + "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,Now())" + + " ON DUPLICATE KEY UPDATE AvgTimePerBatch=?,Throughput=?,StabilizedThroughput=?,TotalTime=?,AvgCPU=?,Memory=?")) { + + int i = 0; + + // unique key section + st.setString(++i, batch_id); + st.setString(++i, commit_id.substring(0, 8)); + st.setString(++i, (String) json_object.get("Model")); + st.setString(++i, (String) json_object.get("ModelName")); + st.setString(++i, (String) json_object.get("DisplayName")); + st.setBoolean(++i, (Boolean) json_object.get("UseMixedPrecision")); + st.setString(++i, (String) json_object.get("Optimizer")); + st.setInt(++i, (int)(long) json_object.get("BatchSize")); + + // non-key section + JSONObject properties = (JSONObject) json_object.get("DerivedProperties"); + if (properties != null) { + if (properties.get("SeqLen") == null) // mysql allows null value in unique key column + st.setNull(++i, Types.INTEGER); + else + st.setInt(++i, Integer.parseInt((String) properties.get("SeqLen"))); + + if (properties.get("PredictionsPerSeq") == null) // mysql allows null value in unique key column + st.setNull(++i, Types.INTEGER); + else + st.setInt(++i, Integer.parseInt((String) properties.get("PredictionsPerSeq"))); + } else { + st.setNull(++i, Types.INTEGER); + st.setNull(++i, Types.INTEGER); + } + + st.setInt(++i, (int)(long) json_object.get("NumOfBatches")); + st.setInt(++i, (int)(long) json_object.get("WeightUpdateSteps")); + st.setInt(++i, (int)(long) json_object.get("Round")); + st.setInt(++i, (int)(long) json_object.get("GradAccSteps")); + st.setFloat(++i, (float)(double) json_object.get("AvgTimePerBatch")); // ms + st.setFloat(++i, (float)(double) json_object.get("Throughput")); // examples/sec + st.setFloat(++i, (float)(double) json_object.get("StabilizedThroughput")); // examples/sec + st.setFloat(++i, (float)(double) json_object.get("TotalTime")); // secs + // TODO - remove "if" check later + if (json_object.get("AvgCPU") == null) + st.setNull(++i, Types.FLOAT); + else + st.setFloat(++i, (float)(double) json_object.get("AvgCPU")); + + if (json_object.get("Memory") == null) + st.setNull(++i, Types.INTEGER); + else + st.setInt(++i, (int)(long) json_object.get("Memory")); // mb + + st.setString(++i, (String) json_object.get("RunConfig")); + + // update section + st.setFloat(++i, (float)(double) json_object.get("AvgTimePerBatch")); // ms + st.setFloat(++i, (float)(double) json_object.get("Throughput")); // examples/sec + st.setFloat(++i, (float)(double) json_object.get("StabilizedThroughput")); // examples/sec + st.setFloat(++i, (float)(double) json_object.get("TotalTime")); // secs + if (json_object.get("AvgCPU") == null) + st.setNull(++i, Types.FLOAT); + else + st.setFloat(++i, (float)(double) json_object.get("AvgCPU")); + + if (json_object.get("Memory") == null) + st.setNull(++i, Types.INTEGER); + else + st.setInt(++i, (int)(long) json_object.get("Memory")); // mb + + st.executeUpdate(); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + + } + +} diff --git a/tools/perf_util/src/main/java/com/msft/send_perf_metrics/JdbcUtil.java b/tools/perf_util/src/main/java/com/msft/send_perf_metrics/JdbcUtil.java new file mode 100644 index 0000000000..da88f5fe64 --- /dev/null +++ b/tools/perf_util/src/main/java/com/msft/send_perf_metrics/JdbcUtil.java @@ -0,0 +1,17 @@ +package com.msft.send_perf_metrics; + +import java.sql.DriverManager; +import java.util.Properties; + +public class JdbcUtil { + static java.sql.Connection GetConn() throws Exception { + try (java.io.InputStream in = App.class.getResourceAsStream("/jdbc.properties")) { + if (in == null) + throw new RuntimeException("err"); + Properties props = new Properties(); + props.load(in); + return DriverManager.getConnection(props.getProperty("url"), props.getProperty("user"), + props.getProperty("password")); + } + } +}