From e954fa6d00bcee265e22344a073cb6ec768bcd84 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Thu, 18 Sep 2014 14:23:12 +0400 Subject: [PATCH] add `elapsed` and `progress` rows to system.merges. [#METR-12574] Fix data-race introduced due to misconception of atomic instructions on integers. --- dbms/include/DB/Interpreters/ProcessList.h | 8 +++++--- dbms/include/DB/Storages/MergeTree/MergeList.h | 4 ++++ .../Storages/MergeTree/MergeTreeDataMerger.cpp | 9 ++++++--- dbms/src/Storages/StorageSystemMerges.cpp | 16 ++++++++++++---- 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 2769310da3e..db7fa99a7e5 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -58,9 +58,11 @@ public: bool update(size_t rows, size_t bytes) volatile { - /// x86 atomic operations on properly aligned integral values - rows_processed += rows; - rows_processed += bytes; + /** operator+= may not be atomic (in contrary to operator++) + * because it first requires to read original value from the memory, + * add increment value and then store new value back to memory. */ + __sync_add_and_fetch(&rows_processed, rows); + __sync_add_and_fetch(&bytes_processed, bytes); return !is_cancelled; } }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index c14c9d349fa..1eeb1cea2e6 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -18,6 +19,8 @@ class MergeList const std::string database; const std::string table; const std::string result_part_name; + Stopwatch watch; + Float64 progress{}; std::uint64_t num_parts{}; std::uint64_t total_size_bytes_compressed{}; std::uint64_t total_size_marks{}; @@ -26,6 +29,7 @@ class MergeList std::uint64_t bytes_written_uncompressed{}; std::uint64_t rows_written{}; + MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name) : database{database}, table{table}, result_part_name{result_part_name} { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 744efdd6cce..9cefa266a70 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -341,6 +341,8 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( size_t sum_rows_approx = 0; + const auto rows_total = merge_entry->total_size_marks * data.index_granularity; + for (size_t i = 0; i < parts.size(); ++i) { MarkRanges ranges(1, MarkRange(0, parts[i]->size)); @@ -348,9 +350,10 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( auto input = stdext::make_unique( data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data, parts[i], ranges, false, nullptr, ""); - input->setProgressCallback([&merge_entry] (const std::size_t rows, const std::size_t bytes) { - merge_entry->rows_read += rows; - merge_entry->bytes_read_uncompressed += bytes; + input->setProgressCallback([&merge_entry, rows_total] (const std::size_t rows, const std::size_t bytes) { + const auto new_rows_read = __sync_add_and_fetch(&merge_entry->rows_read, rows); + merge_entry->progress = static_cast(new_rows_read) / rows_total; + __sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, bytes); }); src_streams.push_back(new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())); diff --git a/dbms/src/Storages/StorageSystemMerges.cpp b/dbms/src/Storages/StorageSystemMerges.cpp index 2ba7c9d831a..a7de97573b9 100644 --- a/dbms/src/Storages/StorageSystemMerges.cpp +++ b/dbms/src/Storages/StorageSystemMerges.cpp @@ -13,6 +13,8 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name, const Context , columns{ { "database", new DataTypeString }, { "table", new DataTypeString }, + { "elapsed", new DataTypeFloat64 }, + { "progress", new DataTypeFloat64 }, { "num_parts", new DataTypeUInt64 }, { "result_part_name", new DataTypeString }, { "total_size_bytes_compressed", new DataTypeUInt64 }, @@ -39,6 +41,8 @@ BlockInputStreams StorageSystemMerges::read( ColumnWithNameAndType col_database{new ColumnString, new DataTypeString, "database"}; ColumnWithNameAndType col_table{new ColumnString, new DataTypeString, "table"}; + ColumnWithNameAndType col_elapsed{new ColumnFloat64, new DataTypeFloat64, "elapsed"}; + ColumnWithNameAndType col_progress{new ColumnFloat64, new DataTypeFloat64, "progress"}; ColumnWithNameAndType col_num_parts{new ColumnUInt64, new DataTypeUInt64, "num_parts"}; ColumnWithNameAndType col_result_part_name{new ColumnString, new DataTypeString, "result_part_name"}; ColumnWithNameAndType col_total_size_bytes_compressed{new ColumnUInt64, new DataTypeUInt64, "total_size_bytes_compressed"}; @@ -52,6 +56,8 @@ BlockInputStreams StorageSystemMerges::read( { col_database.column->insert(merge.database); col_table.column->insert(merge.table); + col_elapsed.column->insert(merge.watch.elapsedSeconds()); + col_progress.column->insert(merge.progress); col_num_parts.column->insert(merge.num_parts); col_result_part_name.column->insert(merge.result_part_name); col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed); @@ -64,14 +70,16 @@ BlockInputStreams StorageSystemMerges::read( Block block{ col_database, - col_num_parts, col_table, - col_total_size_bytes_compressed, + col_elapsed, + col_progress, + col_num_parts, col_result_part_name, - col_bytes_read_uncompressed, + col_total_size_bytes_compressed, col_total_size_marks, - col_bytes_written_uncompressed, + col_bytes_read_uncompressed, col_rows_read, + col_bytes_written_uncompressed, col_rows_written };