From fca64bb9da28836cf82b47546086862171955210 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 17:16:56 +0300 Subject: [PATCH 01/12] Updated instruction [#CLICKHOUSE-2] --- dbms/tests/instructions/sanitizers.txt | 2 +- dbms/tests/instructions/tsan_suppressions | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 dbms/tests/instructions/tsan_suppressions diff --git a/dbms/tests/instructions/sanitizers.txt b/dbms/tests/instructions/sanitizers.txt index 552a313e5e2..d2f95745236 100644 --- a/dbms/tests/instructions/sanitizers.txt +++ b/dbms/tests/instructions/sanitizers.txt @@ -35,4 +35,4 @@ scp ./dbms/src/Server/clickhouse yourserver:~/clickhouse-tsan # Start ClickHouse and run tests -sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1' ./clickhouse-tsan --config /etc/clickhouse-server/config.xml +sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1 suppressions=tsan_suppressions' ./clickhouse-tsan --config /etc/clickhouse-server/config.xml diff --git a/dbms/tests/instructions/tsan_suppressions b/dbms/tests/instructions/tsan_suppressions new file mode 100644 index 00000000000..1bc366e12be --- /dev/null +++ b/dbms/tests/instructions/tsan_suppressions @@ -0,0 +1,2 @@ +# ZooKeeper C library is a trash: +race:contrib/zookeeper From 1aad435d527018dd0ccd65c526440fec2867bae5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 17:39:16 +0300 Subject: [PATCH 02/12] Fixed race condition at startup [#CLICKHOUSE-2] --- dbms/src/Common/Config/ConfigReloader.cpp | 8 +++++++- dbms/src/Common/Config/ConfigReloader.h | 3 +++ dbms/src/Server/Server.cpp | 3 +++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/Config/ConfigReloader.cpp b/dbms/src/Common/Config/ConfigReloader.cpp index 53a5aa61cfa..54cbf507055 100644 --- a/dbms/src/Common/Config/ConfigReloader.cpp +++ b/dbms/src/Common/Config/ConfigReloader.cpp @@ -24,7 +24,11 @@ ConfigReloader::ConfigReloader( { if (!already_loaded) reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true); +} + +void ConfigReloader::start() +{ thread = std::thread(&ConfigReloader::run, this); } @@ -35,7 +39,9 @@ ConfigReloader::~ConfigReloader() { quit = true; zk_node_cache.getChangedEvent().set(); - thread.join(); + + if (thread.joinable()) + thread.join(); } catch (...) { diff --git a/dbms/src/Common/Config/ConfigReloader.h b/dbms/src/Common/Config/ConfigReloader.h index 2dcbea7a8bc..94b2d3c629c 100644 --- a/dbms/src/Common/Config/ConfigReloader.h +++ b/dbms/src/Common/Config/ConfigReloader.h @@ -39,6 +39,9 @@ public: ~ConfigReloader(); + /// Call this method to run the backround thread. + void start(); + private: void run(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 44d4bb602c5..9e957dce461 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -473,6 +473,9 @@ int Server::main(const std::vector & /*args*/) for (auto & server : servers) server->start(); + main_config_reloader->start(); + users_config_reloader->start(); + { std::stringstream message; message << "Available RAM = " << formatReadableSizeWithBinarySuffix(getMemoryAmount()) << ";" From 49bab773fddd849ec989962f9c49c34b83348416 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 17:42:45 +0300 Subject: [PATCH 03/12] Updated instructions [#CLICKHOUSE-2] --- dbms/tests/instructions/sanitizers.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/tests/instructions/sanitizers.txt b/dbms/tests/instructions/sanitizers.txt index d2f95745236..9df261f5155 100644 --- a/dbms/tests/instructions/sanitizers.txt +++ b/dbms/tests/instructions/sanitizers.txt @@ -17,7 +17,7 @@ scp ./dbms/src/Server/clickhouse yourserver:~/clickhouse-asan # Start ClickHouse and run tests -sudo -u clickhouse ./clickhouse-asan --config /etc/clickhouse-server/config.xml +sudo -u clickhouse ./clickhouse-asan server --config /etc/clickhouse-server/config.xml # How to use Thread Sanitizer @@ -35,4 +35,4 @@ scp ./dbms/src/Server/clickhouse yourserver:~/clickhouse-tsan # Start ClickHouse and run tests -sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1 suppressions=tsan_suppressions' ./clickhouse-tsan --config /etc/clickhouse-server/config.xml +sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1 suppressions=tsan_suppressions' ./clickhouse-tsan server --config /etc/clickhouse-server/config.xml From c4138f64866b15d2069fdb1c667dca34a65d2a26 Mon Sep 17 00:00:00 2001 From: Marsel Arduanov Date: Sat, 3 Mar 2018 20:05:21 +0500 Subject: [PATCH 04/12] Correct MacOS version detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First version of MacOS Sierra have Darwin version 16.0.0, not 16.1.0 Wiki info https://ru.wikipedia.org/wiki/Darwin#Хронология_версий --- libs/libcommon/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/libcommon/CMakeLists.txt b/libs/libcommon/CMakeLists.txt index 44b3e4767d0..53133d69d76 100644 --- a/libs/libcommon/CMakeLists.txt +++ b/libs/libcommon/CMakeLists.txt @@ -1,5 +1,5 @@ if (APPLE) - if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Darwin" AND NOT "${CMAKE_SYSTEM_VERSION}" VERSION_LESS "16.1.0") + if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Darwin" AND NOT "${CMAKE_SYSTEM_VERSION}" VERSION_LESS "16.0.0") set (APPLE_SIERRA_OR_NEWER 1) else () set (APPLE_SIERRA_OR_NEWER 0) From 8c37547bfefd012b7927e01009e6d3d78e7d2b1f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 18:35:24 +0300 Subject: [PATCH 05/12] Fixed harmless data race [#CLICKHOUSE-2] --- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 4 ++++ dbms/src/Storages/StorageReplicatedMergeTree.h | 2 -- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 1b692e0397a..b412921643e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -140,6 +140,10 @@ static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000; extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER = 5 * 60; +/** For randomized selection of replicas. */ +thread_local pcg64 rng{randomSeed()}; + + void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper) { std::lock_guard lock(current_zookeeper_mutex); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 21c77cfde5d..b1cbf630b1b 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -284,8 +284,6 @@ private: Logger * log; - pcg64 rng{randomSeed()}; - /// Initialization. /** Creates the minimum set of nodes in ZooKeeper. From c897e5ca393226f137b5e2952c970513023eefb2 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 18:36:20 +0300 Subject: [PATCH 06/12] Better [#CLICKHOUSE-2] --- .../AggregateFunctionUniq.h | 8 +--- .../AggregateFunctionUniqUpTo.h | 4 +- dbms/src/Columns/ColumnArray.cpp | 2 +- dbms/src/Columns/ColumnNullable.cpp | 2 +- dbms/src/Columns/ColumnVector.cpp | 2 +- dbms/src/Common/FieldVisitors.cpp | 24 +++++------ dbms/src/Common/SipHash.h | 16 +++++++ dbms/src/Common/randomSeed.cpp | 11 ++++- .../tests/integer_hash_tables_and_hashes.cpp | 2 +- dbms/src/IO/UncompressedCache.h | 2 +- dbms/src/Interpreters/Compiler.cpp | 2 +- dbms/src/Parsers/IAST.cpp | 5 +-- .../Storages/MergeTree/MergeTreeDataPart.cpp | 42 +++++++++---------- .../Storages/MergeTree/MergeTreeDataPart.h | 28 ++++++------- 14 files changed, 82 insertions(+), 68 deletions(-) diff --git a/dbms/src/AggregateFunctions/AggregateFunctionUniq.h b/dbms/src/AggregateFunctions/AggregateFunctionUniq.h index 058e40ed73a..f5949a6d468 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionUniq.h @@ -206,9 +206,7 @@ template <> struct AggregateFunctionUniqTraits { static UInt64 hash(UInt128 x) { - SipHash hash; - hash.update(reinterpret_cast(&x), sizeof(x)); - return hash.get64(); + return sipHash64(x); } }; @@ -243,9 +241,7 @@ template <> struct AggregateFunctionUniqCombinedTraits { static UInt32 hash(UInt128 x) { - SipHash hash; - hash.update(reinterpret_cast(&x), sizeof(x)); - return static_cast(hash.get64()); + return sipHash64(x); } }; diff --git a/dbms/src/AggregateFunctions/AggregateFunctionUniqUpTo.h b/dbms/src/AggregateFunctions/AggregateFunctionUniqUpTo.h index 0ff04d569f0..bb206bfa77e 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionUniqUpTo.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionUniqUpTo.h @@ -118,9 +118,7 @@ struct AggregateFunctionUniqUpToData : AggregateFunctionUniqUpToData &>(column).getData()[row_num]; - SipHash hash; - hash.update(reinterpret_cast(&value), sizeof(value)); - insert(hash.get64(), threshold); + insert(sipHash64(value), threshold); } }; diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index dfbc7699072..4674c57eef6 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -202,7 +202,7 @@ void ColumnArray::updateHashWithValue(size_t n, SipHash & hash) const size_t array_size = sizeAt(n); size_t offset = offsetAt(n); - hash.update(reinterpret_cast(&array_size), sizeof(array_size)); + hash.update(array_size); for (size_t i = 0; i < array_size; ++i) getData().updateHashWithValue(offset + i, hash); } diff --git a/dbms/src/Columns/ColumnNullable.cpp b/dbms/src/Columns/ColumnNullable.cpp index b37670d2796..59f225c962f 100644 --- a/dbms/src/Columns/ColumnNullable.cpp +++ b/dbms/src/Columns/ColumnNullable.cpp @@ -36,7 +36,7 @@ ColumnNullable::ColumnNullable(const ColumnPtr & nested_column_, const ColumnPtr void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const { const auto & arr = getNullMapData(); - hash.update(reinterpret_cast(&arr[n]), sizeof(arr[0])); + hash.update(arr[n]); if (arr[n] == 0) getNestedColumn().updateHashWithValue(n, hash); } diff --git a/dbms/src/Columns/ColumnVector.cpp b/dbms/src/Columns/ColumnVector.cpp index a43f4a2f1c1..b1a3e55cb86 100644 --- a/dbms/src/Columns/ColumnVector.cpp +++ b/dbms/src/Columns/ColumnVector.cpp @@ -48,7 +48,7 @@ const char * ColumnVector::deserializeAndInsertFromArena(const char * pos) template void ColumnVector::updateHashWithValue(size_t n, SipHash & hash) const { - hash.update(reinterpret_cast(&data[n]), sizeof(T)); + hash.update(data[n]); } template diff --git a/dbms/src/Common/FieldVisitors.cpp b/dbms/src/Common/FieldVisitors.cpp index 4a20b1e7e2e..2243a838a99 100644 --- a/dbms/src/Common/FieldVisitors.cpp +++ b/dbms/src/Common/FieldVisitors.cpp @@ -145,45 +145,43 @@ FieldVisitorHash::FieldVisitorHash(SipHash & hash) : hash(hash) {} void FieldVisitorHash::operator() (const Null &) const { UInt8 type = Field::Types::Null; - hash.update(reinterpret_cast(&type), sizeof(type)); + hash.update(type); } void FieldVisitorHash::operator() (const UInt64 & x) const { UInt8 type = Field::Types::UInt64; - hash.update(reinterpret_cast(&type), sizeof(type)); - hash.update(reinterpret_cast(&x), sizeof(x)); + hash.update(type); + hash.update(x); } void FieldVisitorHash::operator() (const Int64 & x) const { UInt8 type = Field::Types::Int64; - hash.update(reinterpret_cast(&type), sizeof(type)); - hash.update(reinterpret_cast(&x), sizeof(x)); + hash.update(type); + hash.update(x); } void FieldVisitorHash::operator() (const Float64 & x) const { UInt8 type = Field::Types::Float64; - hash.update(reinterpret_cast(&type), sizeof(type)); - hash.update(reinterpret_cast(&x), sizeof(x)); + hash.update(type); + hash.update(x); } void FieldVisitorHash::operator() (const String & x) const { UInt8 type = Field::Types::String; - hash.update(reinterpret_cast(&type), sizeof(type)); - size_t size = x.size(); - hash.update(reinterpret_cast(&size), sizeof(size)); + hash.update(type); + hash.update(x.size()); hash.update(x.data(), x.size()); } void FieldVisitorHash::operator() (const Array & x) const { UInt8 type = Field::Types::Array; - hash.update(reinterpret_cast(&type), sizeof(type)); - size_t size = x.size(); - hash.update(reinterpret_cast(&size), sizeof(size)); + hash.update(type); + hash.update(x.size()); for (const auto & elem : x) applyVisitor(*this, elem); diff --git a/dbms/src/Common/SipHash.h b/dbms/src/Common/SipHash.h index a3c20d4d743..a9f007f7aa4 100644 --- a/dbms/src/Common/SipHash.h +++ b/dbms/src/Common/SipHash.h @@ -14,6 +14,7 @@ */ #include +#include #define ROTL(x, b) static_cast(((x) << (b)) | ((x) >> (64 - (b)))) @@ -130,6 +131,13 @@ public: } } + /// NOTE: std::has_unique_object_representations is only available since clang 6. As of Mar 2017 we still use clang 5 sometimes. + template + std::enable_if_t, void> update(const T & x) + { + update(reinterpret_cast(&x), sizeof(x)); + } + /// Get the result in some form. This can only be done once! void get128(char * out) @@ -173,6 +181,14 @@ inline UInt64 sipHash64(const char * data, const size_t size) return hash.get64(); } +template +std::enable_if_t, UInt64> sipHash64(const T & x) +{ + SipHash hash; + hash.update(x); + return hash.get64(); +} + #include inline UInt64 sipHash64(const std::string & s) diff --git a/dbms/src/Common/randomSeed.cpp b/dbms/src/Common/randomSeed.cpp index 39ca7837ced..62925f55dd3 100644 --- a/dbms/src/Common/randomSeed.cpp +++ b/dbms/src/Common/randomSeed.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #ifdef __APPLE__ @@ -25,5 +26,13 @@ DB::UInt64 randomSeed() struct timespec times; if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×)) DB::throwFromErrno("Cannot clock_gettime.", DB::ErrorCodes::CANNOT_CLOCK_GETTIME); - return times.tv_nsec + times.tv_sec + getpid(); + + /// Not cryptographically secure as time, pid and stack address can be predictable. + + SipHash hash; + hash.update(times.tv_nsec); + hash.update(times.tv_sec); + hash.update(getpid()); + hash.update(×); + return hash.get64(); } diff --git a/dbms/src/Common/tests/integer_hash_tables_and_hashes.cpp b/dbms/src/Common/tests/integer_hash_tables_and_hashes.cpp index d86d77c5144..59e64a407bc 100644 --- a/dbms/src/Common/tests/integer_hash_tables_and_hashes.cpp +++ b/dbms/src/Common/tests/integer_hash_tables_and_hashes.cpp @@ -201,7 +201,7 @@ namespace Hashes size_t operator()(Key x) const { ::SipHash hash; - hash.update(reinterpret_cast(&x), sizeof(x)); + hash.update(x); return hash.get64(); } }; diff --git a/dbms/src/IO/UncompressedCache.h b/dbms/src/IO/UncompressedCache.h index a4a7780321d..55d36a093c9 100644 --- a/dbms/src/IO/UncompressedCache.h +++ b/dbms/src/IO/UncompressedCache.h @@ -51,7 +51,7 @@ public: SipHash hash; hash.update(path_to_file.data(), path_to_file.size() + 1); - hash.update(reinterpret_cast(&offset), sizeof(offset)); + hash.update(offset); hash.get128(key.low, key.high); return key; diff --git a/dbms/src/Interpreters/Compiler.cpp b/dbms/src/Interpreters/Compiler.cpp index b932054910b..a76f18ae835 100644 --- a/dbms/src/Interpreters/Compiler.cpp +++ b/dbms/src/Interpreters/Compiler.cpp @@ -59,7 +59,7 @@ static Compiler::HashedKey getHash(const std::string & key) SipHash hash; auto revision = ClickHouseRevision::get(); - hash.update(reinterpret_cast(&revision), sizeof(revision)); + hash.update(revision); hash.update(key.data(), key.size()); Compiler::HashedKey res; diff --git a/dbms/src/Parsers/IAST.cpp b/dbms/src/Parsers/IAST.cpp index 8a687b3ad97..c47eb79324e 100644 --- a/dbms/src/Parsers/IAST.cpp +++ b/dbms/src/Parsers/IAST.cpp @@ -75,10 +75,7 @@ void IAST::getTreeHashImpl(SipHash & hash_state) const { auto id = getID(); hash_state.update(id.data(), id.size()); - - size_t num_children = children.size(); - hash_state.update(reinterpret_cast(&num_children), sizeof(num_children)); - + hash_state.update(children.size()); for (const auto & child : children) child->getTreeHashImpl(hash_state); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 96b27d66758..df273f2d6b5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -20,7 +20,7 @@ #include -#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t)) +#define MERGE_TREE_MARK_SIZE (2 * sizeof(UInt64)) namespace DB @@ -64,7 +64,7 @@ void MergeTreeDataPartChecksum::checkSize(const String & path) const Poco::File file(path); if (!file.exists()) throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST); - size_t size = file.getSize(); + UInt64 size = file.getSize(); if (size != file_size) throw Exception(path + " has unexpected size: " + toString(size) + " instead of " + toString(file_size), ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); @@ -223,7 +223,7 @@ void MergeTreeDataPartChecksums::write(WriteBuffer & to) const } } -void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_size, MergeTreeDataPartChecksum::uint128 file_hash) +void MergeTreeDataPartChecksums::addFile(const String & file_name, UInt64 file_size, MergeTreeDataPartChecksum::uint128 file_hash) { files[file_name] = Checksum(file_size, file_hash); } @@ -248,11 +248,11 @@ void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const if (!endsWith(name, ".bin")) continue; - size_t len = name.size(); - hash.update(reinterpret_cast(&len), sizeof(len)); + UInt64 len = name.size(); + hash.update(len); hash.update(name.data(), len); - hash.update(reinterpret_cast(&sum.uncompressed_size), sizeof(sum.uncompressed_size)); - hash.update(reinterpret_cast(&sum.uncompressed_hash), sizeof(sum.uncompressed_hash)); + hash.update(sum.uncompressed_size); + hash.update(sum.uncompressed_hash); } } @@ -391,7 +391,7 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na } /// Returns the size of .bin file for column `name` if found, zero otherwise. -size_t MergeTreeDataPart::getColumnCompressedSize(const String & name) const +UInt64 MergeTreeDataPart::getColumnCompressedSize(const String & name) const { const Checksum * checksum = tryGetBinChecksum(name); @@ -399,14 +399,14 @@ size_t MergeTreeDataPart::getColumnCompressedSize(const String & name) const return checksum ? checksum->file_size : 0; } -size_t MergeTreeDataPart::getColumnUncompressedSize(const String & name) const +UInt64 MergeTreeDataPart::getColumnUncompressedSize(const String & name) const { const Checksum * checksum = tryGetBinChecksum(name); return checksum ? checksum->uncompressed_size : 0; } -size_t MergeTreeDataPart::getColumnMrkSize(const String & name) const +UInt64 MergeTreeDataPart::getColumnMrkSize(const String & name) const { const Checksum * checksum = tryGetMrkChecksum(name); return checksum ? checksum->file_size : 0; @@ -420,7 +420,7 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const { const auto & columns = storage.getColumnsList(); const std::string * minimum_size_column = nullptr; - size_t minimum_size = std::numeric_limits::max(); + UInt64 minimum_size = std::numeric_limits::max(); for (const auto & column : columns) { @@ -507,14 +507,14 @@ MergeTreeDataPart::~MergeTreeDataPart() } } -size_t MergeTreeDataPart::calculateTotalSize(const String & from) +UInt64 MergeTreeDataPart::calculateTotalSize(const String & from) { Poco::File cur(from); if (cur.isFile()) return cur.getSize(); std::vector files; cur.list(files); - size_t res = 0; + UInt64 res = 0; for (const auto & file : files) res += calculateTotalSize(from + file); return res; @@ -886,7 +886,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata) /// Check that all marks are nonempty and have the same size. - std::optional marks_size; + std::optional marks_size; for (const NameAndTypePair & name_type : columns) { name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path) @@ -896,7 +896,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata) /// Missing file is Ok for case when new column was added. if (file.exists()) { - size_t file_size = file.getSize(); + UInt64 file_size = file.getSize(); if (!file_size) throw Exception("Part " + path + " is broken: " + file.path() + " is empty.", @@ -926,25 +926,25 @@ bool MergeTreeDataPart::hasColumnFiles(const String & column) const } -size_t MergeTreeDataPart::getIndexSizeInBytes() const +UInt64 MergeTreeDataPart::getIndexSizeInBytes() const { - size_t res = 0; + UInt64 res = 0; for (const ColumnPtr & column : index) res += column->byteSize(); return res; } -size_t MergeTreeDataPart::getIndexSizeInAllocatedBytes() const +UInt64 MergeTreeDataPart::getIndexSizeInAllocatedBytes() const { - size_t res = 0; + UInt64 res = 0; for (const ColumnPtr & column : index) res += column->allocatedBytes(); return res; } -size_t MergeTreeDataPart::getTotalMrkSizeInBytes() const +UInt64 MergeTreeDataPart::getTotalMrkSizeInBytes() const { - size_t res = 0; + UInt64 res = 0; for (const NameAndTypePair & it : columns) { const Checksum * checksum = tryGetMrkChecksum(it.name); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index a1cff20a3a8..3671ee2cb6a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -21,16 +21,16 @@ struct MergeTreeDataPartChecksum { using uint128 = CityHash_v1_0_2::uint128; - size_t file_size {}; + UInt64 file_size {}; uint128 file_hash {}; bool is_compressed = false; - size_t uncompressed_size {}; + UInt64 uncompressed_size {}; uint128 uncompressed_hash {}; MergeTreeDataPartChecksum() {} - MergeTreeDataPartChecksum(size_t file_size_, uint128 file_hash_) : file_size(file_size_), file_hash(file_hash_) {} - MergeTreeDataPartChecksum(size_t file_size_, uint128 file_hash_, size_t uncompressed_size_, uint128 uncompressed_hash_) + MergeTreeDataPartChecksum(UInt64 file_size_, uint128 file_hash_) : file_size(file_size_), file_hash(file_hash_) {} + MergeTreeDataPartChecksum(UInt64 file_size_, uint128 file_hash_, UInt64 uncompressed_size_, uint128 uncompressed_hash_) : file_size(file_size_), file_hash(file_hash_), is_compressed(true), uncompressed_size(uncompressed_size_), uncompressed_hash(uncompressed_hash_) {} @@ -50,7 +50,7 @@ struct MergeTreeDataPartChecksums using FileChecksums = std::map; FileChecksums files; - void addFile(const String & file_name, size_t file_size, Checksum::uint128 file_hash); + void addFile(const String & file_name, UInt64 file_size, Checksum::uint128 file_hash); void add(MergeTreeDataPartChecksums && rhs_checksums); @@ -104,10 +104,10 @@ struct MergeTreeDataPart const Checksum * tryGetMrkChecksum(const String & name) const; /// Returns the size of .bin file for column `name` if found, zero otherwise - size_t getColumnCompressedSize(const String & name) const; - size_t getColumnUncompressedSize(const String & name) const; + UInt64 getColumnCompressedSize(const String & name) const; + UInt64 getColumnUncompressedSize(const String & name) const; /// Returns the size of .mrk file for column `name` if found, zero otherwise - size_t getColumnMrkSize(const String & name) const; + UInt64 getColumnMrkSize(const String & name) const; /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// If no checksums are present returns the name of the first physically existing column. @@ -136,7 +136,7 @@ struct MergeTreeDataPart size_t rows_count = 0; size_t marks_count = 0; - std::atomic size_in_bytes {0}; /// size in bytes, 0 - if not counted; + std::atomic size_in_bytes {0}; /// size in bytes, 0 - if not counted; /// is used from several threads without locks (it is changed with ALTER). time_t modification_time = 0; mutable time_t remove_time = std::numeric_limits::max(); /// When the part is removed from the working set. @@ -255,7 +255,7 @@ struct MergeTreeDataPart /// Columns description. NamesAndTypesList columns; - using ColumnToSize = std::map; + using ColumnToSize = std::map; /** It is blocked for writing when changing columns, checksums or any part files. * Locked to read when reading columns, checksums or any part files. @@ -275,7 +275,7 @@ struct MergeTreeDataPart ~MergeTreeDataPart(); /// Calculate the total size of the entire directory with all the files - static size_t calculateTotalSize(const String & from); + static UInt64 calculateTotalSize(const String & from); void remove() const; @@ -297,10 +297,10 @@ struct MergeTreeDataPart bool hasColumnFiles(const String & column) const; /// For data in RAM ('index') - size_t getIndexSizeInBytes() const; - size_t getIndexSizeInAllocatedBytes() const; + UInt64 getIndexSizeInBytes() const; + UInt64 getIndexSizeInAllocatedBytes() const; /// Total size of *.mrk files - size_t getTotalMrkSizeInBytes() const; + UInt64 getTotalMrkSizeInBytes() const; private: /// Reads columns names and types from columns.txt From ac6a2870b8b623b0664be2a3177d71c7161bf0ce Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 18:55:30 +0300 Subject: [PATCH 07/12] Fix usage of atomic flag in Poco [#CLICKHOUSE-2] --- contrib/poco | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/poco b/contrib/poco index cf1ad2e9a30..8238852d7ab 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit cf1ad2e9a30ee9161772dc7bc9bf6e165cc51768 +Subproject commit 8238852d7ab2a4abdf87adff233b3b83686f4fe4 From 3b0d71312043485f42cd282ad64d2c3c60c65ca5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 19:26:06 +0300 Subject: [PATCH 08/12] Cleanup semantics [#CLICKHOUSE-2] --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 50 +++++++++---------- .../MergeTree/ReplicatedMergeTreeQueue.h | 26 +++++----- 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index c334093b4b1..ee625fe0df5 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -67,7 +67,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) time_t prev_min_unprocessed_insert_time = min_unprocessed_insert_time; - insertUnlocked(entry); + insertUnlocked(entry, lock); updated = true; if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time) @@ -96,7 +96,7 @@ void ReplicatedMergeTreeQueue::initialize( } -void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry) +void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::lock_guard &) { virtual_parts.add(entry->new_part_name); queue.push_back(entry); @@ -118,7 +118,7 @@ void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPt { std::lock_guard lock(mutex); prev_min_unprocessed_insert_time = min_unprocessed_insert_time; - insertUnlocked(entry); + insertUnlocked(entry, lock); } if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time) @@ -129,7 +129,8 @@ void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPt void ReplicatedMergeTreeQueue::updateTimesOnRemoval( const LogEntryPtr & entry, bool & min_unprocessed_insert_time_changed, - bool & max_processed_insert_time_changed) + bool & max_processed_insert_time_changed, + std::unique_lock &) { if (entry->type != LogEntry::GET_PART) return; @@ -158,7 +159,7 @@ void ReplicatedMergeTreeQueue::updateTimesOnRemoval( void ReplicatedMergeTreeQueue::updateTimesInZooKeeper( zkutil::ZooKeeperPtr zookeeper, bool min_unprocessed_insert_time_changed, - bool max_processed_insert_time_changed) + bool max_processed_insert_time_changed) const { /// Here there can be a race condition (with different remove at the same time). /// Consider it unimportant (for a short time, ZK will have a slightly different time value). @@ -197,7 +198,7 @@ void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPt bool max_processed_insert_time_changed = false; { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); /// Remove the job from the queue in the RAM. /// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task. @@ -213,7 +214,7 @@ void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPt } } - updateTimesOnRemoval(entry, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); + updateTimesOnRemoval(entry, min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); } updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); @@ -228,7 +229,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri bool max_processed_insert_time_changed = false; { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); for (Queue::iterator it = queue.begin(); it != queue.end();) { @@ -236,7 +237,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri { found = *it; queue.erase(it++); - updateTimesOnRemoval(found, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); + updateTimesOnRemoval(found, min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); break; } else @@ -384,7 +385,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z String path_created = dynamic_cast(*ops[i]).getPathCreated(); copied_entries[i]->znode_name = path_created.substr(path_created.find_last_of('/') + 1); - insertUnlocked(copied_entries[i]); + insertUnlocked(copied_entries[i], lock); } last_queue_update = time(nullptr); @@ -476,7 +477,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": " << zkutil::ZooKeeper::error2string(code)); - updateTimesOnRemoval(*it, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); + updateTimesOnRemoval(*it, min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); queue.erase(it++); ++removed_entries; } @@ -496,7 +497,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z ReplicatedMergeTreeQueue::Queue ReplicatedMergeTreeQueue::getConflictsForClearColumnCommand( - const LogEntry & entry, String * out_conflicts_description) + const LogEntry & entry, String * out_conflicts_description, std::lock_guard &) const { Queue conflicts; @@ -541,7 +542,7 @@ void ReplicatedMergeTreeQueue::disableMergesAndFetchesInRange(const LogEntry & e std::lock_guard lock(mutex); String conflicts_description; - if (!getConflictsForClearColumnCommand(entry, &conflicts_description).empty()) + if (!getConflictsForClearColumnCommand(entry, &conflicts_description, lock).empty()) throw Exception(conflicts_description, ErrorCodes::UNFINISHED); if (!future_parts.count(entry.new_part_name)) @@ -549,10 +550,8 @@ void ReplicatedMergeTreeQueue::disableMergesAndFetchesInRange(const LogEntry & e } -bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason) +bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard &) const { - /// mutex should been already acquired - /// Let's check if the same part is now being created by another action. if (future_parts.count(new_part_name)) { @@ -589,7 +588,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa { std::lock_guard lock(mutex); - if (isNotCoveredByFuturePartsImpl(part_name, reject_reason)) + if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock)) { CurrentlyExecuting::setActualPartName(entry, part_name, *this); return true; @@ -603,13 +602,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, - MergeTreeData & data) + MergeTreeData & data, + std::lock_guard & lock) const { - /// mutex has already been acquired. The function is called only from `selectEntryToProcess`. - if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART) { - if (!isNotCoveredByFuturePartsImpl(entry.new_part_name, out_postpone_reason)) + if (!isNotCoveredByFuturePartsImpl(entry.new_part_name, out_postpone_reason, lock)) { if (!out_postpone_reason.empty()) LOG_DEBUG(log, out_postpone_reason); @@ -669,7 +667,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (entry.type == LogEntry::CLEAR_COLUMN) { String conflicts_description; - if (!getConflictsForClearColumnCommand(entry, &conflicts_description).empty()) + if (!getConflictsForClearColumnCommand(entry, &conflicts_description, lock).empty()) { LOG_DEBUG(log, conflicts_description); return false; @@ -740,7 +738,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP if ((*it)->currently_executing) continue; - if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger, data)) + if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger, data, lock)) { entry = *it; queue.splice(queue.end(), queue, it); @@ -800,7 +798,7 @@ void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name) -ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() +ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const { std::lock_guard lock(mutex); @@ -848,7 +846,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() } -void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) +void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const { res.clear(); std::lock_guard lock(mutex); @@ -859,7 +857,7 @@ void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) } -size_t ReplicatedMergeTreeQueue::countMerges() +size_t ReplicatedMergeTreeQueue::countMerges() const { size_t all_merges = 0; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 70f58c1a32e..0ec17ceda9e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -87,27 +87,31 @@ private: /// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). bool load(zkutil::ZooKeeperPtr zookeeper); - void insertUnlocked(LogEntryPtr & entry); + void insertUnlocked(LogEntryPtr & entry, std::lock_guard &); void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); /** Can I now try this action. If not, you need to leave it in the queue and try another one. * Called under queue_mutex. */ - bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, MergeTreeData & data); + bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, MergeTreeData & data, + std::lock_guard &) const; /** Check that part isn't in currently generating parts and isn't covered by them. * Should be called under queue's mutex. */ - bool isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason); + bool isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard &) const; /// After removing the queue element, update the insertion times in the RAM. Running under queue_mutex. /// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper. - void updateTimesOnRemoval(const LogEntryPtr & entry, bool & min_unprocessed_insert_time_changed, bool & max_processed_insert_time_changed); + void updateTimesOnRemoval(const LogEntryPtr & entry, bool & min_unprocessed_insert_time_changed, bool & max_processed_insert_time_changed, + std::unique_lock &); /// Update the insertion times in ZooKeeper. - void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, bool min_unprocessed_insert_time_changed, bool max_processed_insert_time_changed); + void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, bool min_unprocessed_insert_time_changed, bool max_processed_insert_time_changed) const; + /// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command + Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard &) const; /// Marks the element of the queue as running. class CurrentlyExecuting @@ -165,12 +169,6 @@ public: */ void disableMergesAndFetchesInRange(const LogEntry & entry); - /** Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command - * Call it under mutex - */ - Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description); - - /** In the case where there are not enough parts to perform the merge in part_name * - move actions with merged parts to the end of the queue * (in order to download a already merged part from another replica). @@ -203,7 +201,7 @@ public: bool addFuturePartIfNotCoveredByThem(const String & part_name, const LogEntry & entry, String & reject_reason); /// Count the number of merges in the queue. - size_t countMerges(); + size_t countMerges() const; struct Status { @@ -220,11 +218,11 @@ public: }; /// Get information about the queue. - Status getStatus(); + Status getStatus() const; /// Get the data of the queue elements. using LogEntriesData = std::vector; - void getEntries(LogEntriesData & res); + void getEntries(LogEntriesData & res) const; /// Get information about the insertion times. void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; From 2e371822ad430c64ce4fd9dc4070b084aed9262a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 19:46:32 +0300 Subject: [PATCH 09/12] Changed low level data race to high level data race [#CLICKHOUSE-2] --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 69 +++++++++---------- .../MergeTree/ReplicatedMergeTreeQueue.h | 12 +++- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index ee625fe0df5..a2d43381cfc 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -31,7 +31,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) LOG_DEBUG(log, "Loading queue from " << queue_path); bool updated = false; - bool min_unprocessed_insert_time_changed = false; + std::optional min_unprocessed_insert_time_changed; { std::lock_guard lock(mutex); @@ -65,17 +65,12 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) LogEntryPtr entry = LogEntry::parse(res.value, res.stat); entry->znode_name = future.first; - time_t prev_min_unprocessed_insert_time = min_unprocessed_insert_time; - - insertUnlocked(entry, lock); - + insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); updated = true; - if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time) - min_unprocessed_insert_time_changed = true; } } - updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, false); + updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); LOG_TRACE(log, "Loaded queue"); return updated; @@ -96,7 +91,7 @@ void ReplicatedMergeTreeQueue::initialize( } -void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::lock_guard &) +void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, std::lock_guard &) { virtual_parts.add(entry->new_part_name); queue.push_back(entry); @@ -106,30 +101,32 @@ void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::lock_gua inserts_by_time.insert(entry); if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time)) + { min_unprocessed_insert_time = entry->create_time; + min_unprocessed_insert_time_changed = min_unprocessed_insert_time; + } } } void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) { - time_t prev_min_unprocessed_insert_time; + std::optional min_unprocessed_insert_time_changed; { std::lock_guard lock(mutex); - prev_min_unprocessed_insert_time = min_unprocessed_insert_time; - insertUnlocked(entry, lock); + insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); } - if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time) - updateTimesInZooKeeper(zookeeper, true, false); + if (min_unprocessed_insert_time_changed) + updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); } void ReplicatedMergeTreeQueue::updateTimesOnRemoval( const LogEntryPtr & entry, - bool & min_unprocessed_insert_time_changed, - bool & max_processed_insert_time_changed, + std::optional & min_unprocessed_insert_time_changed, + std::optional & max_processed_insert_time_changed, std::unique_lock &) { if (entry->type != LogEntry::GET_PART) @@ -140,39 +137,40 @@ void ReplicatedMergeTreeQueue::updateTimesOnRemoval( if (inserts_by_time.empty()) { min_unprocessed_insert_time = 0; - min_unprocessed_insert_time_changed = true; + min_unprocessed_insert_time_changed = min_unprocessed_insert_time; } else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time) { min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time; - min_unprocessed_insert_time_changed = true; + min_unprocessed_insert_time_changed = min_unprocessed_insert_time; } if (entry->create_time > max_processed_insert_time) { max_processed_insert_time = entry->create_time; - max_processed_insert_time_changed = true; + max_processed_insert_time_changed = max_processed_insert_time; } } void ReplicatedMergeTreeQueue::updateTimesInZooKeeper( zkutil::ZooKeeperPtr zookeeper, - bool min_unprocessed_insert_time_changed, - bool max_processed_insert_time_changed) const + std::optional min_unprocessed_insert_time_changed, + std::optional max_processed_insert_time_changed) const { - /// Here there can be a race condition (with different remove at the same time). + /// Here there can be a race condition (with different remove at the same time) + /// because we update times in ZooKeeper with unlocked mutex, while these times may change. /// Consider it unimportant (for a short time, ZK will have a slightly different time value). - /// We also read values of `min_unprocessed_insert_time`, `max_processed_insert_time` without synchronization. + zkutil::Ops ops; if (min_unprocessed_insert_time_changed) ops.emplace_back(std::make_unique( - replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1)); + replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1)); if (max_processed_insert_time_changed) ops.emplace_back(std::make_unique( - replica_path + "/max_processed_insert_time", toString(max_processed_insert_time), -1)); + replica_path + "/max_processed_insert_time", toString(*max_processed_insert_time_changed), -1)); if (!ops.empty()) { @@ -194,8 +192,8 @@ void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPt LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": " << zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often."); - bool min_unprocessed_insert_time_changed = false; - bool max_processed_insert_time_changed = false; + std::optional min_unprocessed_insert_time_changed; + std::optional max_processed_insert_time_changed; { std::unique_lock lock(mutex); @@ -225,8 +223,8 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri { LogEntryPtr found; - bool min_unprocessed_insert_time_changed = false; - bool max_processed_insert_time_changed = false; + std::optional min_unprocessed_insert_time_changed; + std::optional max_processed_insert_time_changed; { std::unique_lock lock(mutex); @@ -330,7 +328,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z std::vector copied_entries; copied_entries.reserve(end - begin); - bool min_unprocessed_insert_time_changed = false; + std::optional min_unprocessed_insert_time_changed; for (auto & future : futures) { @@ -347,7 +345,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time)) { min_unprocessed_insert_time = entry.create_time; - min_unprocessed_insert_time_changed = true; + min_unprocessed_insert_time_changed = min_unprocessed_insert_time; } } } @@ -357,7 +355,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z if (min_unprocessed_insert_time_changed) ops.emplace_back(std::make_unique( - replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1)); + replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1)); try { @@ -385,7 +383,8 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z String path_created = dynamic_cast(*ops[i]).getPathCreated(); copied_entries[i]->znode_name = path_created.substr(path_created.find_last_of('/') + 1); - insertUnlocked(copied_entries[i], lock); + std::optional unused = false; + insertUnlocked(copied_entries[i], unused, lock); } last_queue_update = time(nullptr); @@ -460,8 +459,8 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z { Queue to_wait; size_t removed_entries = 0; - bool min_unprocessed_insert_time_changed = false; - bool max_processed_insert_time_changed = false; + std::optional min_unprocessed_insert_time_changed; + std::optional max_processed_insert_time_changed; /// Remove operations with parts, contained in the range to be deleted, from the queue. std::unique_lock lock(mutex); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 0ec17ceda9e..4215a312ee3 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -87,7 +89,7 @@ private: /// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). bool load(zkutil::ZooKeeperPtr zookeeper); - void insertUnlocked(LogEntryPtr & entry, std::lock_guard &); + void insertUnlocked(LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, std::lock_guard &); void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); @@ -104,11 +106,15 @@ private: /// After removing the queue element, update the insertion times in the RAM. Running under queue_mutex. /// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper. - void updateTimesOnRemoval(const LogEntryPtr & entry, bool & min_unprocessed_insert_time_changed, bool & max_processed_insert_time_changed, + void updateTimesOnRemoval(const LogEntryPtr & entry, + std::optional & min_unprocessed_insert_time_changed, + std::optional & max_processed_insert_time_changed, std::unique_lock &); /// Update the insertion times in ZooKeeper. - void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, bool min_unprocessed_insert_time_changed, bool max_processed_insert_time_changed) const; + void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, + std::optional min_unprocessed_insert_time_changed, + std::optional max_processed_insert_time_changed) const; /// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard &) const; From 368f46d764a4f3ee5ddfa6703a0cb6f6e0dfdddb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 19:48:24 +0300 Subject: [PATCH 10/12] Addition to prev. revision [#CLICKHOUSE-2] --- dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index a2d43381cfc..b2fe028f5d7 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -118,8 +118,7 @@ void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPt insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); } - if (min_unprocessed_insert_time_changed) - updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); + updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); } From 07d8db9f2a7422435881b5fac59c6c6ca5c362ab Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 20:44:53 +0300 Subject: [PATCH 11/12] Explicit semantic for TSan [#CLICKHOUSE-2] --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 16 +++++++++------- dbms/src/Storages/MergeTree/MergeTreeDataPart.h | 2 +- dbms/src/Storages/System/StorageSystemParts.cpp | 2 +- .../System/StorageSystemPartsColumns.cpp | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index ad7c143b93e..db830d77bf5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -561,7 +561,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it) { - (*it)->remove_time = (*it)->modification_time; + (*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed); modifyPartState(it, DataPartState::Outdated); }; @@ -677,9 +677,11 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() { const DataPartPtr & part = *it; + auto part_remove_time = part->remove_time.load(std::memory_order_relaxed); + if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example). - part->remove_time < now && - now - part->remove_time > settings.old_parts_lifetime.totalSeconds()) + part_remove_time < now && + now - part_remove_time > settings.old_parts_lifetime.totalSeconds()) { parts_to_delete.emplace_back(it); } @@ -1519,7 +1521,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( auto current_time = time(nullptr); for (const DataPartPtr & covered_part : covered_parts) { - covered_part->remove_time = current_time; + covered_part->remove_time.store(current_time, std::memory_order_relaxed); modifyPartState(covered_part, DataPartState::Outdated); removePartContributionToColumnSizes(covered_part); } @@ -1550,7 +1552,7 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo removePartContributionToColumnSizes(part); modifyPartState(part, DataPartState::Outdated); - part->remove_time = remove_time; + part->remove_time.store(remove_time, std::memory_order_relaxed); } } @@ -2175,7 +2177,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit() LOG_WARNING(data->log, "Tried to commit obsolete part " << part->name << " covered by " << covering_part->getNameWithState()); - part->remove_time = 0; /// The part will be removed without waiting for old_parts_lifetime seconds. + part->remove_time.store(0, std::memory_order_relaxed); /// The part will be removed without waiting for old_parts_lifetime seconds. data->modifyPartState(part, DataPartState::Outdated); } else @@ -2183,7 +2185,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit() total_covered_parts.insert(total_covered_parts.end(), covered_parts.begin(), covered_parts.end()); for (const DataPartPtr & covered_part : covered_parts) { - covered_part->remove_time = current_time; + covered_part->remove_time.store(current_time, std::memory_order_relaxed); data->modifyPartState(covered_part, DataPartState::Outdated); data->removePartContributionToColumnSizes(covered_part); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index 3671ee2cb6a..43ddf7abfdd 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -139,7 +139,7 @@ struct MergeTreeDataPart std::atomic size_in_bytes {0}; /// size in bytes, 0 - if not counted; /// is used from several threads without locks (it is changed with ALTER). time_t modification_time = 0; - mutable time_t remove_time = std::numeric_limits::max(); /// When the part is removed from the working set. + mutable std::atomic remove_time { std::numeric_limits::max() }; /// When the part is removed from the working set. Changes once. /// If true, the destructor will delete the directory with the part. bool is_temp = false; diff --git a/dbms/src/Storages/System/StorageSystemParts.cpp b/dbms/src/Storages/System/StorageSystemParts.cpp index 927ff272dab..0914cae72da 100644 --- a/dbms/src/Storages/System/StorageSystemParts.cpp +++ b/dbms/src/Storages/System/StorageSystemParts.cpp @@ -66,7 +66,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor columns[i++]->insert(static_cast(part->rows_count)); columns[i++]->insert(static_cast(part->size_in_bytes)); columns[i++]->insert(static_cast(part->modification_time)); - columns[i++]->insert(static_cast(part->remove_time)); + columns[i++]->insert(static_cast(part->remove_time.load(std::memory_order_relaxed))); /// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts. columns[i++]->insert(static_cast(part.use_count() - 1)); diff --git a/dbms/src/Storages/System/StorageSystemPartsColumns.cpp b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp index fe9351323d9..b6acae5d5a3 100644 --- a/dbms/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp @@ -112,7 +112,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con columns[j++]->insert(static_cast(part->rows_count)); columns[j++]->insert(static_cast(part->size_in_bytes)); columns[j++]->insert(static_cast(part->modification_time)); - columns[j++]->insert(static_cast(part->remove_time)); + columns[j++]->insert(static_cast(part->remove_time.load(std::memory_order_relaxed))); columns[j++]->insert(static_cast(use_count)); From 91a6a88102e9454280d9aa7b5d8057d8bb2dae6a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 3 Mar 2018 21:00:46 +0300 Subject: [PATCH 12/12] Explicit semantic for TSan [#CLICKHOUSE-2] --- dbms/src/Storages/MergeTree/MergeList.cpp | 2 +- dbms/src/Storages/MergeTree/MergeList.h | 2 +- .../MergeTree/MergeTreeDataMerger.cpp | 19 +++++++++++-------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeList.cpp b/dbms/src/Storages/MergeTree/MergeList.cpp index eaca961fd88..9714d1b80da 100644 --- a/dbms/src/Storages/MergeTree/MergeList.cpp +++ b/dbms/src/Storages/MergeTree/MergeList.cpp @@ -35,7 +35,7 @@ MergeInfo MergeListElement::getInfo() const res.table = table; res.result_part_name = result_part_name; res.elapsed = watch.elapsedSeconds(); - res.progress = progress; + res.progress = progress.load(std::memory_order_relaxed); res.num_parts = num_parts; res.total_size_bytes_compressed = total_size_bytes_compressed; res.total_size_marks = total_size_marks; diff --git a/dbms/src/Storages/MergeTree/MergeList.h b/dbms/src/Storages/MergeTree/MergeList.h index 783ddb90b21..ed7241e4482 100644 --- a/dbms/src/Storages/MergeTree/MergeList.h +++ b/dbms/src/Storages/MergeTree/MergeList.h @@ -50,7 +50,7 @@ struct MergeListElement : boost::noncopyable const std::string table; const std::string result_part_name; Stopwatch watch; - Float64 progress{}; + std::atomic progress{}; UInt64 num_parts{}; Names source_part_names; UInt64 total_size_bytes_compressed{}; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 5d60f4bbb63..bc56bd61fe4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -466,7 +466,7 @@ public: merge_entry->bytes_read_uncompressed += value.bytes; merge_entry->rows_read += value.rows; - merge_entry->progress = average_elem_progress * merge_entry->rows_read; + merge_entry->progress.store(average_elem_progress * merge_entry->rows_read, std::memory_order_relaxed); }; }; @@ -476,10 +476,9 @@ public: class MergeProgressCallbackVerticalStep : public MergeProgressCallback { public: - MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact, const ColumnSizeEstimator & column_sizes, const String & column_name, UInt64 & watch_prev_elapsed_) - : MergeProgressCallback(merge_entry_, watch_prev_elapsed_), initial_progress(merge_entry->progress) + : MergeProgressCallback(merge_entry_, watch_prev_elapsed_), initial_progress(merge_entry->progress.load(std::memory_order_relaxed)) { average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact); updateWatch(); @@ -496,7 +495,9 @@ public: rows_read_internal += value.rows; Float64 local_progress = average_elem_progress * rows_read_internal; - merge_entry->progress = initial_progress + local_progress; + + /// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges). + merge_entry->progress.store(initial_progress + local_progress, std::memory_order_relaxed); }; }; @@ -678,7 +679,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart /// But now we are using inaccurate row-based estimation in Horizontal case for backward compability Float64 progress = (merge_alg == MergeAlgorithm::Horizontal) ? std::min(1., 1. * rows_written / sum_input_rows_upper_bound) - : std::min(1., merge_entry->progress); + : std::min(1., merge_entry->progress.load(std::memory_order_relaxed)); disk_reservation->update(static_cast((1. - progress) * initial_reservation)); } @@ -696,7 +697,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart { size_t sum_input_rows_exact = merge_entry->rows_read; merge_entry->columns_written = merging_column_names.size(); - merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact); + merge_entry->progress.store(column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed); BlockInputStreams column_part_streams(parts.size()); NameSet offset_columns_written; @@ -715,7 +716,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart const DataTypePtr & column_type = it_name_and_type->type; const String offset_column_name = Nested::extractTableName(column_name); Names column_name_{column_name}; - Float64 progress_before = merge_entry->progress; + Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed); bool offset_written = offset_columns_written.count(offset_column_name); for (size_t part_num = 0; part_num < parts.size(); ++part_num) @@ -753,9 +754,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart if (typeid_cast(column_type.get())) offset_columns_written.emplace(offset_column_name); + /// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges). + merge_entry->columns_written = merging_column_names.size() + column_num; merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes; - merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact); + merge_entry->progress.store(progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed); if (merges_blocker.isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);