Explicit semantic for TSan [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-03 21:00:46 +03:00
parent 07d8db9f2a
commit 91a6a88102
3 changed files with 13 additions and 10 deletions

View File

@ -35,7 +35,7 @@ MergeInfo MergeListElement::getInfo() const
res.table = table; res.table = table;
res.result_part_name = result_part_name; res.result_part_name = result_part_name;
res.elapsed = watch.elapsedSeconds(); res.elapsed = watch.elapsedSeconds();
res.progress = progress; res.progress = progress.load(std::memory_order_relaxed);
res.num_parts = num_parts; res.num_parts = num_parts;
res.total_size_bytes_compressed = total_size_bytes_compressed; res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_marks = total_size_marks; res.total_size_marks = total_size_marks;

View File

@ -50,7 +50,7 @@ struct MergeListElement : boost::noncopyable
const std::string table; const std::string table;
const std::string result_part_name; const std::string result_part_name;
Stopwatch watch; Stopwatch watch;
Float64 progress{}; std::atomic<Float64> progress{};
UInt64 num_parts{}; UInt64 num_parts{};
Names source_part_names; Names source_part_names;
UInt64 total_size_bytes_compressed{}; UInt64 total_size_bytes_compressed{};

View File

@ -466,7 +466,7 @@ public:
merge_entry->bytes_read_uncompressed += value.bytes; merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->rows_read += value.rows; merge_entry->rows_read += value.rows;
merge_entry->progress = average_elem_progress * merge_entry->rows_read; merge_entry->progress.store(average_elem_progress * merge_entry->rows_read, std::memory_order_relaxed);
}; };
}; };
@ -476,10 +476,9 @@ public:
class MergeProgressCallbackVerticalStep : public MergeProgressCallback class MergeProgressCallbackVerticalStep : public MergeProgressCallback
{ {
public: public:
MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact, MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact,
const ColumnSizeEstimator & column_sizes, const String & column_name, UInt64 & watch_prev_elapsed_) const ColumnSizeEstimator & column_sizes, const String & column_name, UInt64 & watch_prev_elapsed_)
: MergeProgressCallback(merge_entry_, watch_prev_elapsed_), initial_progress(merge_entry->progress) : MergeProgressCallback(merge_entry_, watch_prev_elapsed_), initial_progress(merge_entry->progress.load(std::memory_order_relaxed))
{ {
average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact); average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact);
updateWatch(); updateWatch();
@ -496,7 +495,9 @@ public:
rows_read_internal += value.rows; rows_read_internal += value.rows;
Float64 local_progress = average_elem_progress * rows_read_internal; Float64 local_progress = average_elem_progress * rows_read_internal;
merge_entry->progress = initial_progress + local_progress;
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->progress.store(initial_progress + local_progress, std::memory_order_relaxed);
}; };
}; };
@ -678,7 +679,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compability /// But now we are using inaccurate row-based estimation in Horizontal case for backward compability
Float64 progress = (merge_alg == MergeAlgorithm::Horizontal) Float64 progress = (merge_alg == MergeAlgorithm::Horizontal)
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound) ? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress); : std::min(1., merge_entry->progress.load(std::memory_order_relaxed));
disk_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation)); disk_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
} }
@ -696,7 +697,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{ {
size_t sum_input_rows_exact = merge_entry->rows_read; size_t sum_input_rows_exact = merge_entry->rows_read;
merge_entry->columns_written = merging_column_names.size(); merge_entry->columns_written = merging_column_names.size();
merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact); merge_entry->progress.store(column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed);
BlockInputStreams column_part_streams(parts.size()); BlockInputStreams column_part_streams(parts.size());
NameSet offset_columns_written; NameSet offset_columns_written;
@ -715,7 +716,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
const DataTypePtr & column_type = it_name_and_type->type; const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = Nested::extractTableName(column_name); const String offset_column_name = Nested::extractTableName(column_name);
Names column_name_{column_name}; Names column_name_{column_name};
Float64 progress_before = merge_entry->progress; Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed);
bool offset_written = offset_columns_written.count(offset_column_name); bool offset_written = offset_columns_written.count(offset_column_name);
for (size_t part_num = 0; part_num < parts.size(); ++part_num) for (size_t part_num = 0; part_num < parts.size(); ++part_num)
@ -753,9 +754,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
if (typeid_cast<const DataTypeArray *>(column_type.get())) if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name); offset_columns_written.emplace(offset_column_name);
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->columns_written = merging_column_names.size() + column_num; merge_entry->columns_written = merging_column_names.size() + column_num;
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes; merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact); merge_entry->progress.store(progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed);
if (merges_blocker.isCancelled()) if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);