From 0093425201b1532e6da17326d8984708ddda0fe8 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 21 Jul 2020 22:31:27 +0300 Subject: [PATCH 01/25] CREATE USER IF NOT EXISTS now doesn't throw exception if the user exists. --- src/Access/IAccessStorage.cpp | 274 ++++++++++-------- .../0_stateless/01292_create_user.reference | 9 + .../queries/0_stateless/01292_create_user.sql | 18 ++ 3 files changed, 181 insertions(+), 120 deletions(-) diff --git a/src/Access/IAccessStorage.cpp b/src/Access/IAccessStorage.cpp index 6813b5eb558..ffedfb038a4 100644 --- a/src/Access/IAccessStorage.cpp +++ b/src/Access/IAccessStorage.cpp @@ -1,6 +1,4 @@ #include -#include -#include #include #include #include @@ -39,110 +37,71 @@ namespace } - template > - ResultType doTry(const Func & func) + template + bool tryCall(const Func & function) { try { - return func(); + function(); + return true; } - catch (Exception &) + catch (...) { - return {}; + return false; } } - template , - typename ResultType = std::conditional_t, void, std::vector>> - ResultType applyToMultipleEntities( - const std::vector & multiple_entities, - const ApplyFunc & apply_function, - const char * error_message_format [[maybe_unused]] = nullptr, - const GetNameFunc & get_name_function [[maybe_unused]] = nullptr) + class ErrorsTracker { - std::optional exception; - std::vector success; + public: + explicit ErrorsTracker(size_t count_) { succeed.reserve(count_); } - auto helper = [&](const auto & apply_and_store_result_function) + template + bool tryCall(const Func & func) { - for (size_t i = 0; i != multiple_entities.size(); ++i) + try { - try - { - apply_and_store_result_function(multiple_entities[i]); - if constexpr (!ignore_errors) - success[i] = true; - } - catch (Exception & e) - { - if (!ignore_errors && !exception) - exception.emplace(e); - } - catch (Poco::Exception & e) - { - if (!ignore_errors && !exception) - exception.emplace(Exception::CreateFromPocoTag{}, e); - } - catch (std::exception & e) - { - if (!ignore_errors && !exception) - exception.emplace(Exception::CreateFromSTDTag{}, e); - } + func(); } - }; - - if constexpr (std::is_same_v) - { - if (multiple_entities.empty()) - return; - - if (multiple_entities.size() == 1) + catch (Exception & e) { - apply_function(multiple_entities.front()); - return; + if (!exception) + exception.emplace(e); + succeed.push_back(false); + return false; } - - if constexpr (!ignore_errors) - success.resize(multiple_entities.size(), false); - - helper(apply_function); - - if (ignore_errors || !exception) - return; - } - else - { - ResultType result; - if (multiple_entities.empty()) - return result; - - if (multiple_entities.size() == 1) + catch (Poco::Exception & e) { - result.emplace_back(apply_function(multiple_entities.front())); - return result; + if (!exception) + exception.emplace(Exception::CreateFromPocoTag{}, e); + succeed.push_back(false); + return false; } - - result.reserve(multiple_entities.size()); - if constexpr (!ignore_errors) - success.resize(multiple_entities.size(), false); - - helper([&](const T & entity) { result.emplace_back(apply_function(entity)); }); - - if (ignore_errors || !exception) - return result; + catch (std::exception & e) + { + if (!exception) + exception.emplace(Exception::CreateFromSTDTag{}, e); + succeed.push_back(false); + return false; + } + succeed.push_back(true); + return true; } - if constexpr (!ignore_errors) + bool errors() const { return exception.has_value(); } + + void showErrors(const char * format, const std::function & get_name_function) { + if (!exception) + return; + Strings succeeded_names_list; Strings failed_names_list; - for (size_t i = 0; i != multiple_entities.size(); ++i) + for (size_t i = 0; i != succeed.size(); ++i) { - const auto & entity = multiple_entities[i]; - String name = get_name_function(entity); - if (success[i]) + String name = get_name_function(i); + if (succeed[i]) succeeded_names_list.emplace_back(name); else failed_names_list.emplace_back(name); @@ -152,14 +111,17 @@ namespace if (succeeded_names.empty()) succeeded_names = "none"; - String error_message = error_message_format; + String error_message = format; boost::replace_all(error_message, "{succeeded_names}", succeeded_names); boost::replace_all(error_message, "{failed_names}", failed_names); exception->addMessage(error_message); exception->rethrow(); } - __builtin_unreachable(); - } + + private: + std::vector succeed; + std::optional exception; + }; } @@ -216,7 +178,11 @@ bool IAccessStorage::exists(const UUID & id) const AccessEntityPtr IAccessStorage::tryReadBase(const UUID & id) const { - return doTry([&] { return readImpl(id); }); + AccessEntityPtr entity; + auto func = [&] { entity = readImpl(id); }; + if (!tryCall(func)) + return nullptr; + return entity; } @@ -228,7 +194,11 @@ String IAccessStorage::readName(const UUID & id) const std::optional IAccessStorage::tryReadName(const UUID & id) const { - return doTry([&] { return std::optional{readNameImpl(id)}; }); + String name; + auto func = [&] { name = readNameImpl(id); }; + if (!tryCall(func)) + return {}; + return name; } @@ -240,41 +210,77 @@ UUID IAccessStorage::insert(const AccessEntityPtr & entity) std::vector IAccessStorage::insert(const std::vector & multiple_entities) { - return applyToMultipleEntities( - multiple_entities, - [this](const AccessEntityPtr & entity) { return insertImpl(entity, /* replace_if_exists = */ false); }, - "Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", - [](const AccessEntityPtr & entity) { return entity->outputTypeAndName(); }); + ErrorsTracker tracker(multiple_entities.size()); + + std::vector ids; + for (const auto & entity : multiple_entities) + { + UUID id; + auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ false); }; + if (tracker.tryCall(func)) + ids.push_back(id); + } + + if (tracker.errors()) + { + auto get_name_function = [&](size_t i) { return multiple_entities[i]->outputTypeAndName(); }; + tracker.showErrors("Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", get_name_function); + } + + return ids; } std::optional IAccessStorage::tryInsert(const AccessEntityPtr & entity) { - return doTry([&] { return std::optional{insertImpl(entity, false)}; }); + UUID id; + auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ false); }; + if (!tryCall(func)) + return {}; + return id; } std::vector IAccessStorage::tryInsert(const std::vector & multiple_entities) { - return applyToMultipleEntities( - multiple_entities, - [this](const AccessEntityPtr & entity) { return insertImpl(entity, /* replace_if_exists = */ false); }); + std::vector ids; + for (const auto & entity : multiple_entities) + { + UUID id; + auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ false); }; + if (tryCall(func)) + ids.push_back(id); + } + return ids; } UUID IAccessStorage::insertOrReplace(const AccessEntityPtr & entity) { - return insertImpl(entity, true); + return insertImpl(entity, /* replace_if_exists = */ true); } std::vector IAccessStorage::insertOrReplace(const std::vector & multiple_entities) { - return applyToMultipleEntities( - multiple_entities, - [this](const AccessEntityPtr & entity) { return insertImpl(entity, /* replace_if_exists = */ true); }, - "Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", - [](const AccessEntityPtr & entity) -> String { return entity->outputTypeAndName(); }); + ErrorsTracker tracker(multiple_entities.size()); + + std::vector ids; + for (const auto & entity : multiple_entities) + { + UUID id; + auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ true); }; + if (tracker.tryCall(func)) + ids.push_back(id); + } + + if (tracker.errors()) + { + auto get_name_function = [&](size_t i) { return multiple_entities[i]->outputTypeAndName(); }; + tracker.showErrors("Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", get_name_function); + } + + return ids; } @@ -286,25 +292,39 @@ void IAccessStorage::remove(const UUID & id) void IAccessStorage::remove(const std::vector & ids) { - applyToMultipleEntities( - ids, - [this](const UUID & id) { removeImpl(id); }, - "Couldn't remove {failed_names}. Successfully removed: {succeeded_names}", - [this](const UUID & id) { return outputTypeAndNameOrID(*this, id); }); + ErrorsTracker tracker(ids.size()); + + for (const auto & id : ids) + { + auto func = [&] { removeImpl(id); }; + tracker.tryCall(func); + } + + if (tracker.errors()) + { + auto get_name_function = [&](size_t i) { return outputTypeAndNameOrID(*this, ids[i]); }; + tracker.showErrors("Couldn't remove {failed_names}. Successfully removed: {succeeded_names}", get_name_function); + } } bool IAccessStorage::tryRemove(const UUID & id) { - return doTry([&] { removeImpl(id); return true; }); + auto func = [&] { removeImpl(id); }; + return tryCall(func); } std::vector IAccessStorage::tryRemove(const std::vector & ids) { - return applyToMultipleEntities( - ids, - [this](const UUID & id) { removeImpl(id); return id; }); + std::vector removed_ids; + for (const auto & id : ids) + { + auto func = [&] { removeImpl(id); }; + if (tryCall(func)) + removed_ids.push_back(id); + } + return removed_ids; } @@ -316,25 +336,39 @@ void IAccessStorage::update(const UUID & id, const UpdateFunc & update_func) void IAccessStorage::update(const std::vector & ids, const UpdateFunc & update_func) { - applyToMultipleEntities( - ids, - [this, &update_func](const UUID & id) { updateImpl(id, update_func); }, - "Couldn't update {failed_names}. Successfully updated: {succeeded_names}", - [this](const UUID & id) { return outputTypeAndNameOrID(*this, id); }); + ErrorsTracker tracker(ids.size()); + + for (const auto & id : ids) + { + auto func = [&] { updateImpl(id, update_func); }; + tracker.tryCall(func); + } + + if (tracker.errors()) + { + auto get_name_function = [&](size_t i) { return outputTypeAndNameOrID(*this, ids[i]); }; + tracker.showErrors("Couldn't update {failed_names}. Successfully updated: {succeeded_names}", get_name_function); + } } bool IAccessStorage::tryUpdate(const UUID & id, const UpdateFunc & update_func) { - return doTry([&] { updateImpl(id, update_func); return true; }); + auto func = [&] { updateImpl(id, update_func); }; + return tryCall(func); } std::vector IAccessStorage::tryUpdate(const std::vector & ids, const UpdateFunc & update_func) { - return applyToMultipleEntities( - ids, - [this, &update_func](const UUID & id) { updateImpl(id, update_func); return id; }); + std::vector updated_ids; + for (const auto & id : ids) + { + auto func = [&] { updateImpl(id, update_func); }; + if (tryCall(func)) + updated_ids.push_back(id); + } + return updated_ids; } diff --git a/tests/queries/0_stateless/01292_create_user.reference b/tests/queries/0_stateless/01292_create_user.reference index 922ee54bef4..775bbaa6a26 100644 --- a/tests/queries/0_stateless/01292_create_user.reference +++ b/tests/queries/0_stateless/01292_create_user.reference @@ -81,6 +81,15 @@ CREATE USER u6_01292 DEFAULT ROLE NONE -- complex CREATE USER u1_01292 IDENTIFIED WITH plaintext_password HOST LOCAL SETTINGS readonly = 1 CREATE USER u1_01292 HOST LIKE \'%.%.myhost.com\' DEFAULT ROLE NONE SETTINGS PROFILE default +-- if not exists +CREATE USER u1_01292 +GRANT r1_01292 TO u1_01292 +-- if not exists-part2 +CREATE USER u1_01292 +GRANT r1_01292, r2_01292 TO u1_01292 +-- or replace +CREATE USER u1_01292 +CREATE USER u2_01292 -- multiple users in one command CREATE USER u1_01292 DEFAULT ROLE NONE CREATE USER u2_01292 DEFAULT ROLE NONE diff --git a/tests/queries/0_stateless/01292_create_user.sql b/tests/queries/0_stateless/01292_create_user.sql index 5ae7f3921e6..c8d408147e9 100644 --- a/tests/queries/0_stateless/01292_create_user.sql +++ b/tests/queries/0_stateless/01292_create_user.sql @@ -177,6 +177,24 @@ ALTER USER u1_01292 NOT IDENTIFIED HOST LIKE '%.%.myhost.com' DEFAULT ROLE NONE SHOW CREATE USER u1_01292; DROP USER u1_01292; +SELECT '-- if not exists'; +CREATE USER u1_01292; +GRANT r1_01292 TO u1_01292; +SHOW CREATE USER u1_01292; +SHOW GRANTS FOR u1_01292; +SELECT '-- if not exists-part2'; +CREATE USER IF NOT EXISTS u1_01292; +GRANT r2_01292 TO u1_01292; +SHOW CREATE USER u1_01292; +SHOW GRANTS FOR u1_01292; +SELECT '-- or replace'; +CREATE USER OR REPLACE u1_01292; +SHOW CREATE USER u1_01292; +SHOW GRANTS FOR u1_01292; +CREATE USER IF NOT EXISTS u2_01292; +SHOW CREATE USER u2_01292; +DROP USER u1_01292, u2_01292; + SELECT '-- multiple users in one command'; CREATE USER u1_01292, u2_01292 DEFAULT ROLE NONE; CREATE USER u3_01292, u4_01292 HOST LIKE '%.%.myhost.com'; From 72f403bb147b4823490693564412ae98a86f7a34 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Fri, 24 Jul 2020 10:05:13 -0400 Subject: [PATCH 02/25] Increasing shell default timeout to 120 sec. --- tests/testflows/helpers/cluster.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/testflows/helpers/cluster.py b/tests/testflows/helpers/cluster.py index 9f86d44124c..6d3ae97e000 100644 --- a/tests/testflows/helpers/cluster.py +++ b/tests/testflows/helpers/cluster.py @@ -167,17 +167,20 @@ class Cluster(object): self.docker_compose += f" --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\"" self.lock = threading.Lock() - def shell(self, node): + def shell(self, node, timeout=120): """Returns unique shell terminal to be used. """ if node is None: return Shell() - return Shell(command=[ + shell = Shell(command=[ "/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} bash --noediting" ], name=node) - def bash(self, node, timeout=60): + shell.timeout = timeout + return shell + + def bash(self, node, timeout=120): """Returns thread-local bash terminal to a specific node. From 67d4529783b602d8a3eabd7f0efef7b03ad06497 Mon Sep 17 00:00:00 2001 From: Ivan Babrou Date: Fri, 24 Jul 2020 14:48:16 -0700 Subject: [PATCH 03/25] Show total granules examined by skipping indices This change makes skipping index efficiency more obvious, changing this: ``` Index `idx_duration` has dropped 59 granules. ``` Into this: ``` Index `idx_duration` has dropped 59 / 61 granules. ``` --- src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 306bcd9000a..e70d2d3743a 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1516,6 +1516,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( part->index_granularity_info.index_granularity_bytes); size_t granules_dropped = 0; + size_t total_granules = 0; size_t marks_count = part->getMarksCount(); size_t final_mark = part->index_granularity.hasFinalMark(); @@ -1542,6 +1543,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( if (last_index_mark != index_range.begin || !granule) reader.seek(index_range.begin); + total_granules += index_range.end - index_range.begin; + for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark) { if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin) @@ -1566,7 +1569,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( last_index_mark = index_range.end - 1; } - LOG_DEBUG(log, "Index {} has dropped {} granules.", backQuote(index_helper->index.name), granules_dropped); + LOG_DEBUG(log, "Index {} has dropped {} / {} granules.", backQuote(index_helper->index.name), granules_dropped, total_granules); return res; } From 78d357f0d2c53305dd40306d2a264d3f54f48cd0 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 25 Jul 2020 17:42:20 +0300 Subject: [PATCH 04/25] Add a test for sticking mutations bug --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 23 +++++++++ .../MergeTree/MergeTreeDataPartWide.cpp | 13 +++++ .../01415_sticking_mutations.reference | 1 + .../0_stateless/01415_sticking_mutations.sh | 47 +++++++++++++++++++ 4 files changed, 84 insertions(+) create mode 100644 tests/queries/0_stateless/01415_sticking_mutations.reference create mode 100755 tests/queries/0_stateless/01415_sticking_mutations.sh diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 1d04118643f..0c329299209 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -552,6 +552,29 @@ void IMergeTreeDataPart::loadRowsCount() auto buf = openForReading(volume->getDisk(), path); readIntText(rows_count, *buf); assertEOF(*buf); + +#ifndef NDEBUG + /// columns have to be loaded + for (const auto & column : getColumns()) + { + if (column.type->isValueRepresentedByNumber()) + { + auto size = getColumnSize(column.name, *column.type); + + if (size.data_uncompressed == 0) + continue; + + size_t rows_in_column = size.data_uncompressed / column.type->getSizeOfValueInMemory(); + if (rows_in_column != rows_count) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Column {} has rows count {} according to size in memory " + "and size of single value, but data part {} has {} rows", backQuote(column.name), rows_in_column, name, rows_count); + } + } + } +#endif } else { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index b33a3d4645d..fb6784434a8 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -237,7 +237,20 @@ void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_col ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams); each_columns_size[column.name] = size; total_size.add(size); + +#ifndef NDEBUG + if (rows_count != 0 && column.type->isValueRepresentedByNumber()) + { + size_t rows_in_column = size.data_uncompressed / column.type->getSizeOfValueInMemory(); + if (rows_in_column != rows_count) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Column {} has rows count {} according to size in memory and size of single value, but data part {} has {} rows", backQuote(column.name), rows_in_column, name, rows_count); + } + } } +#endif } } diff --git a/tests/queries/0_stateless/01415_sticking_mutations.reference b/tests/queries/0_stateless/01415_sticking_mutations.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/01415_sticking_mutations.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/01415_sticking_mutations.sh b/tests/queries/0_stateless/01415_sticking_mutations.sh new file mode 100755 index 00000000000..fdc9b580274 --- /dev/null +++ b/tests/queries/0_stateless/01415_sticking_mutations.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS sticking_mutations" + +$CLICKHOUSE_CLIENT -n --query "CREATE TABLE sticking_mutations ( + key UInt64, + value1 String, + value2 UInt8 +) +ENGINE = MergeTree() +ORDER BY key;" + +$CLICKHOUSE_CLIENT --query "INSERT INTO sticking_mutations SELECT number, toString(number), number % 128 FROM numbers(1000)" + +# if merges stopped for normal merge tree mutations will stick +$CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES sticking_mutations" + +$CLICKHOUSE_CLIENT --query "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, MODIFY COLUMN value1 UInt64;" & + + +##### wait mutation to start ##### +check_query="SELECT count() FROM system.mutations WHERE table='sticking_mutations' and database='$CLICKHOUSE_DATABASE'" + +query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + +while [ "$query_result" == "0" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + sleep 0.5 +done +##### wait mutation to start ##### + +# Starting merges to execute sticked mutations + +$CLICKHOUSE_CLIENT --query "SYSTEM START MERGES sticking_mutations" + +# just to be sure, that previous mutations finished +$CLICKHOUSE_CLIENT --query "ALTER TABLE sticking_mutations DELETE WHERE value % 31 == 0 SETTINGS mutations_sync = 1" + +$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE sticking_mutations FINAL" + +$CLICKHOUSE_CLIENT --query "SELECT 1" + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS sticking_mutations" From 393a9195caac261f2a74da712a12a1eb74b182e5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 25 Jul 2020 17:45:46 +0300 Subject: [PATCH 05/25] Better exception message --- src/Storages/MergeTree/MergeTreeDataPartWide.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index fb6784434a8..b9454fcf818 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -246,7 +246,8 @@ void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_col { throw Exception( ErrorCodes::LOGICAL_ERROR, - "Column {} has rows count {} according to size in memory and size of single value, but data part {} has {} rows", backQuote(column.name), rows_in_column, name, rows_count); + "Column {} has rows count {} according to size in memory " + "and size of single value, but data part {} has {} rows", backQuote(column.name), rows_in_column, name, rows_count); } } } From eae7dc852ea2bfcd223c2ce3981b2687ed0ff987 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 25 Jul 2020 23:54:33 +0300 Subject: [PATCH 06/25] Fix build --- src/Storages/MergeTree/MergeTreeDataPartWide.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index b9454fcf818..131f875e9f8 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -250,8 +250,8 @@ void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_col "and size of single value, but data part {} has {} rows", backQuote(column.name), rows_in_column, name, rows_count); } } - } #endif + } } } From c66dc23a47106003748b306b6a5601f84def4f5e Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 25 Jul 2020 23:56:29 +0300 Subject: [PATCH 07/25] Fix error style --- src/Storages/MergeTree/MergeTreeDataPartWide.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index 131f875e9f8..a3e45a22d92 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -13,6 +13,7 @@ namespace ErrorCodes extern const int CANNOT_READ_ALL_DATA; extern const int NO_FILE_IN_DATA_PART; extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; + extern const int LOGICAL_ERROR; } From e0bf5913e77e0360661fb400ee7333b92eaec375 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 26 Jul 2020 17:21:57 +0300 Subject: [PATCH 08/25] Fix sticking mutations --- src/Interpreters/ExpressionActions.cpp | 2 +- src/Interpreters/ExpressionActions.h | 2 +- src/Interpreters/MutationsInterpreter.cpp | 19 +++++ src/Interpreters/MutationsInterpreter.h | 8 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 2 +- .../01415_sticking_mutations.reference | 13 ++++ .../0_stateless/01415_sticking_mutations.sh | 77 ++++++++++++------- 7 files changed, 91 insertions(+), 32 deletions(-) diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 32e3000a65d..0e1d0c51704 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1309,7 +1309,7 @@ void ExpressionActionsChain::finalize() } } -std::string ExpressionActionsChain::dumpChain() +std::string ExpressionActionsChain::dumpChain() const { std::stringstream ss; diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index 1aae3f5e021..49da9a5f810 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -347,7 +347,7 @@ struct ExpressionActionsChain return steps.back(); } - std::string dumpChain(); + std::string dumpChain() const; }; } diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 94740ae0bd4..bb83001852c 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -764,4 +764,23 @@ std::optional MutationsInterpreter::getStorageSortDescriptionIf return sort_description; } +bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const +{ + /// is subset + for (const auto & storage_column : storage_columns) + if (!output_columns.contains(storage_column)) + return false; + + return true; +} + +bool MutationsInterpreter::isAffectingAllColumns() const +{ + auto storage_columns = metadata_snapshot->getColumns().getNamesOfPhysical(); + for (const auto & stage : stages) + if (stage.isAffectingAllColumns(storage_columns)) + return true; + return false; +} + } diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 894d135a099..4ba96533210 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -42,6 +42,8 @@ public: /// Only changed columns. const Block & getUpdatedHeader() const; + bool isAffectingAllColumns() const; + private: ASTPtr prepare(bool dry_run); @@ -86,8 +88,8 @@ private: ASTs filters; std::unordered_map column_to_updated; - /// Contains columns that are changed by this stage, - /// columns changed by the previous stages and also columns needed by the next stages. + /// Contains columns that are changed by this stage, columns changed by + /// the previous stages and also columns needed by the next stages. NameSet output_columns; std::unique_ptr analyzer; @@ -97,6 +99,8 @@ private: /// then there is (possibly) an UPDATE step, and finally a projection step. ExpressionActionsChain expressions_chain; Names filter_column_names; + + bool isAffectingAllColumns(const Names & storage_columns) const; }; std::unique_ptr updated_header; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index b59070ca070..cd14a35edba 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -1092,7 +1092,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor need_remove_expired_values = true; /// All columns from part are changed and may be some more that were missing before in part - if (!isWidePart(source_part) || source_part->getColumns().isSubsetOf(updated_header.getNamesAndTypesList())) + if (!isWidePart(source_part) || (interpreter && interpreter->isAffectingAllColumns())) { auto part_indices = getIndicesForNewDataPart(metadata_snapshot->getSecondaryIndices(), for_file_renames); mutateAllPartColumns( diff --git a/tests/queries/0_stateless/01415_sticking_mutations.reference b/tests/queries/0_stateless/01415_sticking_mutations.reference index d00491fd7e5..5d2e2b67958 100644 --- a/tests/queries/0_stateless/01415_sticking_mutations.reference +++ b/tests/queries/0_stateless/01415_sticking_mutations.reference @@ -1 +1,14 @@ 1 +CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192 +1 +CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192 +1 +CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` String,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192 +1 +CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` String,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192 +1 +CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192 +1 +CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `renamed_value1` String,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192 +1 +CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nTTL date + toIntervalDay(1)\nSETTINGS index_granularity = 8192 diff --git a/tests/queries/0_stateless/01415_sticking_mutations.sh b/tests/queries/0_stateless/01415_sticking_mutations.sh index fdc9b580274..04846be06d6 100755 --- a/tests/queries/0_stateless/01415_sticking_mutations.sh +++ b/tests/queries/0_stateless/01415_sticking_mutations.sh @@ -1,47 +1,70 @@ #!/usr/bin/env bash +set -e + CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . $CURDIR/../shell_config.sh $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS sticking_mutations" -$CLICKHOUSE_CLIENT -n --query "CREATE TABLE sticking_mutations ( - key UInt64, - value1 String, - value2 UInt8 -) -ENGINE = MergeTree() -ORDER BY key;" +function check_sticky_mutations() +{ + $CLICKHOUSE_CLIENT -n --query "CREATE TABLE sticking_mutations ( + date Date, + key UInt64, + value1 String, + value2 UInt8 + ) + ENGINE = MergeTree() + ORDER BY key;" -$CLICKHOUSE_CLIENT --query "INSERT INTO sticking_mutations SELECT number, toString(number), number % 128 FROM numbers(1000)" + $CLICKHOUSE_CLIENT --query "INSERT INTO sticking_mutations SELECT toDate('2020-07-10'), number, toString(number), number % 128 FROM numbers(1000)" -# if merges stopped for normal merge tree mutations will stick -$CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES sticking_mutations" + $CLICKHOUSE_CLIENT --query "INSERT INTO sticking_mutations SELECT toDate('2100-01-10'), number, toString(number), number % 128 FROM numbers(1000)" -$CLICKHOUSE_CLIENT --query "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, MODIFY COLUMN value1 UInt64;" & + # if merges stopped for normal merge tree mutations will stick + $CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES sticking_mutations" + $CLICKHOUSE_CLIENT --query "$1" & -##### wait mutation to start ##### -check_query="SELECT count() FROM system.mutations WHERE table='sticking_mutations' and database='$CLICKHOUSE_DATABASE'" + ##### wait mutation to start ##### + check_query="SELECT count() FROM system.mutations WHERE table='sticking_mutations' and database='$CLICKHOUSE_DATABASE' and is_done = 0" -query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` - -while [ "$query_result" == "0" ] -do query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` - sleep 0.5 -done -##### wait mutation to start ##### -# Starting merges to execute sticked mutations + while [ "$query_result" == "0" ] + do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + sleep 0.5 + done + ##### wait mutation to start ##### -$CLICKHOUSE_CLIENT --query "SYSTEM START MERGES sticking_mutations" + # Starting merges to execute sticked mutations -# just to be sure, that previous mutations finished -$CLICKHOUSE_CLIENT --query "ALTER TABLE sticking_mutations DELETE WHERE value % 31 == 0 SETTINGS mutations_sync = 1" + $CLICKHOUSE_CLIENT --query "SYSTEM START MERGES sticking_mutations" -$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE sticking_mutations FINAL" + # just to be sure, that previous mutations finished + $CLICKHOUSE_CLIENT --query "ALTER TABLE sticking_mutations DELETE WHERE value2 % 31 == 0 SETTINGS mutations_sync = 1" -$CLICKHOUSE_CLIENT --query "SELECT 1" + $CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE sticking_mutations FINAL" -$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS sticking_mutations" + $CLICKHOUSE_CLIENT --query "SELECT sum(cityHash64(*)) > 1 FROM sticking_mutations WHERE key > 10" + + $CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE sticking_mutations" + + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS sticking_mutations" +} + +check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, MODIFY COLUMN value1 UInt64" + +check_sticky_mutations "ALTER TABLE sticking_mutations MODIFY COLUMN value1 UInt64, DELETE WHERE value2 % 32 == 0" + +check_sticky_mutations "ALTER TABLE sticking_mutations UPDATE value1 = 15 WHERE key < 2000, DELETE WHERE value2 % 32 == 0" + +check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, UPDATE value1 = 15 WHERE key < 2000" + +check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, DROP COLUMN value1" + +check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, RENAME COLUMN value1 TO renamed_value1" + +check_sticky_mutations "ALTER TABLE sticking_mutations MODIFY COLUMN value1 UInt64, MODIFY TTL date + INTERVAL 1 DAY" From 561906d5f47b50cae130c2fa33403272af17da55 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 26 Jul 2020 17:27:31 +0300 Subject: [PATCH 09/25] Better comments --- src/Interpreters/MutationsInterpreter.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 4ba96533210..8d189b121ee 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -42,6 +42,7 @@ public: /// Only changed columns. const Block & getUpdatedHeader() const; + /// At least one mutation stage affects all columns in storage bool isAffectingAllColumns() const; private: @@ -100,6 +101,7 @@ private: ExpressionActionsChain expressions_chain; Names filter_column_names; + /// Check that stage affects all storage columns bool isAffectingAllColumns(const Names & storage_columns) const; }; From 2d8e36ed5fd02ee3d8e464f51d10bec6d0eecfe2 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 26 Jul 2020 18:58:03 +0300 Subject: [PATCH 10/25] Add test for sticking mutations --- .../test_mutations_hardlinks/__init__.py | 0 .../test_mutations_hardlinks/test.py | 133 ++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 tests/integration/test_mutations_hardlinks/__init__.py create mode 100644 tests/integration/test_mutations_hardlinks/test.py diff --git a/tests/integration/test_mutations_hardlinks/__init__.py b/tests/integration/test_mutations_hardlinks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_mutations_hardlinks/test.py b/tests/integration/test_mutations_hardlinks/test.py new file mode 100644 index 00000000000..56852f572ff --- /dev/null +++ b/tests/integration/test_mutations_hardlinks/test.py @@ -0,0 +1,133 @@ +import pytest + +import os +import time +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import assert_eq_with_retry +from multiprocessing.dummy import Pool + + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1') + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def check_hardlinks(table, part_path, column_file, count): + column_path = os.path.join("/var/lib/clickhouse/data/default", table, part_path, column_file) + script = """ + export INODE=`ls -i {column_path} | awk '{{print $1}}'` + export COUNT=`find /var/lib/clickhouse -inum $INODE | wc -l` + test $COUNT = {count} + """.format(column_path=column_path, count=count) + + node1.exec_in_container(["bash", "-c", script]) + + +def check_exists(table, part_path, column_file): + column_path = os.path.join("/var/lib/clickhouse/data/default", table, part_path, column_file) + + node1.exec_in_container(["bash", "-c", "test -f {}".format(column_path)]) + + +def test_update_mutation(started_cluster): + node1.query("CREATE TABLE table_for_update(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()") + + node1.query("INSERT INTO table_for_update SELECT number, number, toString(number) from numbers(100)") + + assert int(node1.query("SELECT sum(value1) FROM table_for_update").strip()) == sum(range(100)) + + node1.query("ALTER TABLE table_for_update UPDATE value1 = value1 * value1 WHERE 1", settings={"mutations_sync" : "2"}) + assert int(node1.query("SELECT sum(value1) FROM table_for_update").strip()) == sum(i * i for i in range(100)) + + check_hardlinks("table_for_update", "all_1_1_0_2", "key.bin", 2) + check_hardlinks("table_for_update", "all_1_1_0_2", "value2.bin", 2) + check_hardlinks("table_for_update", "all_1_1_0_2", "value1.bin", 1) + + node1.query("ALTER TABLE table_for_update UPDATE key=key, value1=value1, value2=value2 WHERE 1", settings={"mutations_sync": "2"}) + + assert int(node1.query("SELECT sum(value1) FROM table_for_update").strip()) == sum(i * i for i in range(100)) + + check_hardlinks("table_for_update", "all_1_1_0_3", "key.bin", 1) + check_hardlinks("table_for_update", "all_1_1_0_3", "value1.bin", 1) + check_hardlinks("table_for_update", "all_1_1_0_3", "value2.bin", 1) + + +def test_modify_mutation(started_cluster): + node1.query("CREATE TABLE table_for_modify(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()") + + node1.query("INSERT INTO table_for_modify SELECT number, number, toString(number) from numbers(100)") + + assert int(node1.query("SELECT sum(value1) FROM table_for_modify").strip()) == sum(range(100)) + + node1.query("ALTER TABLE table_for_modify MODIFY COLUMN value2 UInt64", settings={"mutations_sync" : "2"}) + + assert int(node1.query("SELECT sum(value2) FROM table_for_modify").strip()) == sum(range(100)) + + check_hardlinks("table_for_modify", "all_1_1_0_2", "key.bin", 2) + check_hardlinks("table_for_modify", "all_1_1_0_2", "value1.bin", 2) + check_hardlinks("table_for_modify", "all_1_1_0_2", "value2.bin", 1) + + +def test_drop_mutation(started_cluster): + node1.query("CREATE TABLE table_for_drop(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()") + + node1.query("INSERT INTO table_for_drop SELECT number, number, toString(number) from numbers(100)") + + assert int(node1.query("SELECT sum(value1) FROM table_for_drop").strip()) == sum(range(100)) + + node1.query("ALTER TABLE table_for_drop DROP COLUMN value2", settings={"mutations_sync": "2"}) + + check_hardlinks("table_for_drop", "all_1_1_0_2", "key.bin", 2) + check_hardlinks("table_for_drop", "all_1_1_0_2", "value1.bin", 2) + + with pytest.raises(Exception): + check_exists("table_for_drop", "all_1_1_0_2", "value2.bin") + with pytest.raises(Exception): + check_exists("table_for_drop", "all_1_1_0_2", "value2.mrk") + + +def test_delete_and_drop_mutation(started_cluster): + node1.query("CREATE TABLE table_for_delete_and_drop(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()") + + node1.query("INSERT INTO table_for_delete_and_drop SELECT number, number, toString(number) from numbers(100)") + + assert int(node1.query("SELECT sum(value1) FROM table_for_delete_and_drop").strip()) == sum(range(100)) + + node1.query("SYSTEM STOP MERGES") + + def mutate(): + node1.query("ALTER TABLE table_for_delete_and_drop DELETE WHERE key % 2 == 0, DROP COLUMN value2") + + p = Pool(2) + p.apply_async(mutate) + + for _ in range(1, 100): + result = node1.query("SELECT COUNT() FROM system.mutations WHERE table = 'table_for_delete_and_drop' and is_done=0") + try: + if int(result.strip()) == 2: + break + except: + print "Result", result + pass + + time.sleep(0.5) + + node1.query("SYSTEM START MERGES") + + assert_eq_with_retry(node1, "SELECT COUNT() FROM table_for_delete_and_drop", str(sum(1 for i in range(100) if i % 2 != 0))) + + check_hardlinks("table_for_delete_and_drop", "all_1_1_0_3", "key.bin", 1) + check_hardlinks("table_for_delete_and_drop", "all_1_1_0_3", "value1.bin", 1) + + with pytest.raises(Exception): + check_exists("table_for_delete_and_drop", "all_1_1_0_3", "value2.bin") + with pytest.raises(Exception): + check_exists("table_for_delete_and_drop", "all_1_1_0_3", "value2.mrk") From 1707f84a44ce010710aefa097684c8e85bf6d35b Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 27 Jul 2020 12:42:37 +0300 Subject: [PATCH 11/25] Less strict check and rare rename bug --- src/Interpreters/MutationsInterpreter.cpp | 8 +-- src/Interpreters/MutationsInterpreter.h | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 3 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 55 +++++++++++++++---- .../MergeTree/MergeTreeDataPartWide.cpp | 3 +- 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index bb83001852c..e502f517ad8 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -777,10 +777,10 @@ bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_co bool MutationsInterpreter::isAffectingAllColumns() const { auto storage_columns = metadata_snapshot->getColumns().getNamesOfPhysical(); - for (const auto & stage : stages) - if (stage.isAffectingAllColumns(storage_columns)) - return true; - return false; + if (stages.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation interpreter has no stages"); + + return stages.back().isAffectingAllColumns(storage_columns); } } diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 8d189b121ee..c9130ad6613 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -42,7 +42,7 @@ public: /// Only changed columns. const Block & getUpdatedHeader() const; - /// At least one mutation stage affects all columns in storage + /// Latest mutation stage affects all columns in storage bool isAffectingAllColumns() const; private: diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 0c329299209..d5b59f3d7b6 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -557,7 +557,8 @@ void IMergeTreeDataPart::loadRowsCount() /// columns have to be loaded for (const auto & column : getColumns()) { - if (column.type->isValueRepresentedByNumber()) + /// Most trivial types + if (column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes()) { auto size = getColumnSize(column.name, *column.type); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index cd14a35edba..2b407cb7269 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -1478,13 +1478,14 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( return updated_header.getNamesAndTypesList(); NameSet removed_columns; - NameToNameMap renamed_columns; + NameToNameMap renamed_columns_to_from; + /// All commands are validated in AlterCommand so we don't care about order for (const auto & command : commands_for_removes) { if (command.type == MutationCommand::DROP_COLUMN) removed_columns.insert(command.column_name); if (command.type == MutationCommand::RENAME_COLUMN) - renamed_columns.emplace(command.rename_to, command.column_name); + renamed_columns_to_from.emplace(command.rename_to, command.column_name); } Names source_column_names = source_part->getColumns().getNames(); NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); @@ -1497,17 +1498,49 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( it->type = updated_type; ++it; } - else if (source_columns_name_set.count(it->name) && !removed_columns.count(it->name)) - { - ++it; - } - else if (renamed_columns.count(it->name) && source_columns_name_set.count(renamed_columns[it->name])) - { - ++it; - } else { - it = storage_columns.erase(it); + if (!source_columns_name_set.count(it->name)) + { + /// Source part doesn't have column but some other column + /// was renamed to it's name. + auto renamed_it = renamed_columns_to_from.find(it->name); + if (renamed_it != renamed_columns_to_from.end() + && source_columns_name_set.count(renamed_it->second)) + ++it; + else + it = storage_columns.erase(it); + } + else + { + bool was_renamed = false; + bool was_removed = removed_columns.count(it->name); + + /// Check that this column was renamed to some other name + for (const auto & [rename_to, rename_from] : renamed_columns_to_from) + { + if (rename_from == it->name) + { + was_renamed = true; + break; + } + } + + /// If we want to rename this column to some other name, than it + /// should it's previous version should be dropped or removed + if (renamed_columns_to_from.count(it->name) && !was_renamed && !was_removed) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Incorrect mutation commands, trying to rename column {} to {}, but part {} already has column {}", renamed_columns_to_from[it->name], it->name, source_part->name, it->name); + + + /// Column was renamed and no other column renamed to it's name + /// or column is dropped. + if (!renamed_columns_to_from.count(it->name) && (was_renamed || was_removed)) + it = storage_columns.erase(it); + else + ++it; + } } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index a3e45a22d92..f133d438866 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -240,7 +240,8 @@ void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_col total_size.add(size); #ifndef NDEBUG - if (rows_count != 0 && column.type->isValueRepresentedByNumber()) + /// Most trivial types + if (rows_count != 0 && column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes()) { size_t rows_in_column = size.data_uncompressed / column.type->getSizeOfValueInMemory(); if (rows_in_column != rows_count) From 3d743c5f75c1a05123a88c7b99be73d0d0fbacec Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 27 Jul 2020 12:52:04 +0300 Subject: [PATCH 12/25] Enable 00992 back --- tests/queries/skip_list.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 31cb38cc6bf..955c67b0b96 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -55,8 +55,7 @@ "01200_mutations_memory_consumption", "01103_check_cpu_instructions_at_startup", "01037_polygon_dicts_", - "hyperscan", - "00992_system_parts_race_condition_zookeeper" + "hyperscan" ], "unbundled-build": [ "00429", From 4b75be44a3734bd48d370f9caf8803e43aa3a38d Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 27 Jul 2020 16:04:13 +0300 Subject: [PATCH 13/25] Fix arcadia build --- src/Interpreters/MutationsInterpreter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index e502f517ad8..759543950de 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -768,7 +768,7 @@ bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_co { /// is subset for (const auto & storage_column : storage_columns) - if (!output_columns.contains(storage_column)) + if (!output_columns.count(storage_column)) return false; return true; From 4bd6261dc164c64172c641adafec271df4c12301 Mon Sep 17 00:00:00 2001 From: Yatsishin Ilya <2159081+qoega@users.noreply.github.com> Date: Mon, 27 Jul 2020 20:00:14 +0300 Subject: [PATCH 14/25] Use prebuilt helper_container for integration tests --- tests/integration/helpers/network.py | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/tests/integration/helpers/network.py b/tests/integration/helpers/network.py index 5d738126f07..f5c2b4f8d19 100644 --- a/tests/integration/helpers/network.py +++ b/tests/integration/helpers/network.py @@ -155,8 +155,6 @@ class _NetworkManager: def __init__( self, - image_name='clickhouse_tests_helper', - image_path=p.join(CLICKHOUSE_ROOT_DIR, 'docker', 'test', 'integration', 'helper_container'), container_expire_timeout=50, container_exit_timeout=60): self.container_expire_timeout = container_expire_timeout @@ -164,14 +162,6 @@ class _NetworkManager: self._docker_client = docker.from_env(version=os.environ.get("DOCKER_API_VERSION")) - try: - self._image = self._docker_client.images.get(image_name) - except docker.errors.ImageNotFound: - # Use docker console client instead of python API to work around https://github.com/docker/docker-py/issues/1397 - subprocess.check_call( - ['docker', 'build', '--force-rm', '--tag', image_name, '--network', 'host', image_path]) - self._image = self._docker_client.images.get(image_name) - self._container = None self._ensure_container() @@ -185,15 +175,11 @@ class _NetworkManager: except docker.errors.NotFound: pass - # Work around https://github.com/docker/docker-py/issues/1477 - host_config = self._docker_client.api.create_host_config(network_mode='host', auto_remove=True) - container_id = self._docker_client.api.create_container( - self._image.id, command=('sleep %s' % self.container_exit_timeout), - detach=True, host_config=host_config)['Id'] - + self._container = self._docker_client.containers.run('yandex/clickhouse-integration-helper', auto_remove=True, + command=('sleep %s' % self.container_exit_timeout), + detach=True, network_mode='host') + container_id = self._container.id self._container_expire_time = time.time() + self.container_expire_timeout - self._docker_client.api.start(container_id) - self._container = self._docker_client.containers.get(container_id) return self._container From 1f8fe725eca2db3a59e0b715f992c9021f206af0 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 27 Jul 2020 22:35:38 +0300 Subject: [PATCH 15/25] Add test for ability to restore from backup --- .../__init__.py | 0 .../test.py | 130 ++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 tests/integration/test_backup_with_other_granularity/__init__.py create mode 100644 tests/integration/test_backup_with_other_granularity/test.py diff --git a/tests/integration/test_backup_with_other_granularity/__init__.py b/tests/integration/test_backup_with_other_granularity/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_backup_with_other_granularity/test.py b/tests/integration/test_backup_with_other_granularity/test.py new file mode 100644 index 00000000000..43aa9562245 --- /dev/null +++ b/tests/integration/test_backup_with_other_granularity/test.py @@ -0,0 +1,130 @@ +import pytest + + +from helpers.cluster import ClickHouseCluster +cluster = ClickHouseCluster(__file__) + + +node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True) +node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True) +node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_backup_from_old_version(started_cluster): + node1.query("CREATE TABLE source_table(A Int64, B String) Engine = MergeTree order by tuple()") + + node1.query("INSERT INTO source_table VALUES(1, '1')") + + assert node1.query("SELECT COUNT() FROM source_table") == "1\n" + + node1.query("ALTER TABLE source_table ADD COLUMN Y String") + + node1.query("ALTER TABLE source_table FREEZE PARTITION tuple();") + + node1.restart_with_latest_version() + + node1.query("CREATE TABLE dest_table (A Int64, B String, Y String) ENGINE = ReplicatedMergeTree('/test/dest_table1', '1') ORDER BY tuple()") + + node1.query("INSERT INTO dest_table VALUES(2, '2', 'Hello')") + + assert node1.query("SELECT COUNT() FROM dest_table") == "1\n" + + node1.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached']) + + assert node1.query("SELECT COUNT() FROM dest_table") == "1\n" + + node1.query("ALTER TABLE dest_table ATTACH PARTITION tuple()") + + assert node1.query("SELECT sum(A) FROM dest_table") == "3\n" + + node1.query("ALTER TABLE dest_table DETACH PARTITION tuple()") + + node1.query("ALTER TABLE dest_table ATTACH PARTITION tuple()") + + assert node1.query("SELECT sum(A) FROM dest_table") == "3\n" + + assert node1.query("CHECK TABLE dest_table") == "1\n" + + +def test_backup_from_old_version_setting(started_cluster): + node2.query("CREATE TABLE source_table(A Int64, B String) Engine = MergeTree order by tuple()") + + node2.query("INSERT INTO source_table VALUES(1, '1')") + + assert node2.query("SELECT COUNT() FROM source_table") == "1\n" + + node2.query("ALTER TABLE source_table ADD COLUMN Y String") + + node2.query("ALTER TABLE source_table FREEZE PARTITION tuple();") + + node2.restart_with_latest_version() + + node2.query("CREATE TABLE dest_table (A Int64, B String, Y String) ENGINE = ReplicatedMergeTree('/test/dest_table2', '1') ORDER BY tuple() SETTINGS enable_mixed_granularity_parts = 1") + + node2.query("INSERT INTO dest_table VALUES(2, '2', 'Hello')") + + assert node2.query("SELECT COUNT() FROM dest_table") == "1\n" + + node2.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached']) + + assert node2.query("SELECT COUNT() FROM dest_table") == "1\n" + + node2.query("ALTER TABLE dest_table ATTACH PARTITION tuple()") + + assert node2.query("SELECT sum(A) FROM dest_table") == "3\n" + + node2.query("ALTER TABLE dest_table DETACH PARTITION tuple()") + + node2.query("ALTER TABLE dest_table ATTACH PARTITION tuple()") + + assert node2.query("SELECT sum(A) FROM dest_table") == "3\n" + + assert node1.query("CHECK TABLE dest_table") == "1\n" + + +def test_backup_from_old_version_config(started_cluster): + node3.query("CREATE TABLE source_table(A Int64, B String) Engine = MergeTree order by tuple()") + + node3.query("INSERT INTO source_table VALUES(1, '1')") + + assert node3.query("SELECT COUNT() FROM source_table") == "1\n" + + node3.query("ALTER TABLE source_table ADD COLUMN Y String") + + node3.query("ALTER TABLE source_table FREEZE PARTITION tuple();") + + def callback(n): + n.replace_config("/etc/clickhouse-server/merge_tree_settings.xml", "1") + + node3.restart_with_latest_version(callback_onstop=callback) + + node3.query("CREATE TABLE dest_table (A Int64, B String, Y String) ENGINE = ReplicatedMergeTree('/test/dest_table3', '1') ORDER BY tuple() SETTINGS enable_mixed_granularity_parts = 1") + + node3.query("INSERT INTO dest_table VALUES(2, '2', 'Hello')") + + assert node3.query("SELECT COUNT() FROM dest_table") == "1\n" + + node3.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached']) + + assert node3.query("SELECT COUNT() FROM dest_table") == "1\n" + + node3.query("ALTER TABLE dest_table ATTACH PARTITION tuple()") + + assert node3.query("SELECT sum(A) FROM dest_table") == "3\n" + + node3.query("ALTER TABLE dest_table DETACH PARTITION tuple()") + + node3.query("ALTER TABLE dest_table ATTACH PARTITION tuple()") + + assert node3.query("SELECT sum(A) FROM dest_table") == "3\n" + + assert node1.query("CHECK TABLE dest_table") == "1\n" From 1bece3de0aa7e431fbf2ba86ce161b0d46430f00 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 28 Jul 2020 12:36:08 +0300 Subject: [PATCH 16/25] Remove strange code and test --- src/Storages/MergeTree/MergeTreeData.cpp | 5 ----- .../test.py | 22 +++++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index c71172850ba..abd88ab36a8 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2408,11 +2408,6 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part) auto disk = part->volume->getDisk(); String full_part_path = part->getFullRelativePath(); - /// Earlier the list of columns was written incorrectly. Delete it and re-create. - /// But in compact parts we can't get list of columns without this file. - if (isWidePart(part)) - disk->removeIfExists(full_part_path + "columns.txt"); - part->loadColumnsChecksumsIndexes(false, true); part->modification_time = disk->getLastModified(full_part_path).epochTime(); } diff --git a/tests/integration/test_backup_with_other_granularity/test.py b/tests/integration/test_backup_with_other_granularity/test.py index 43aa9562245..d4ca9bd1bac 100644 --- a/tests/integration/test_backup_with_other_granularity/test.py +++ b/tests/integration/test_backup_with_other_granularity/test.py @@ -8,6 +8,7 @@ cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True) node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True) node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True) +node4 = cluster.add_instance('node4') @pytest.fixture(scope="module") @@ -128,3 +129,24 @@ def test_backup_from_old_version_config(started_cluster): assert node3.query("SELECT sum(A) FROM dest_table") == "3\n" assert node1.query("CHECK TABLE dest_table") == "1\n" + + +def test_backup_and_alter(started_cluster): + node4.query("CREATE TABLE backup_table(A Int64, B String, C Date) Engine = MergeTree order by tuple()") + + node4.query("INSERT INTO backup_table VALUES(2, '2', toDate('2019-10-01'))") + + node4.query("ALTER TABLE backup_table FREEZE PARTITION tuple();") + + node4.query("ALTER TABLE backup_table DROP COLUMN C") + + node4.query("ALTER TABLE backup_table MODIFY COLUMN B UInt64") + + node4.query("ALTER TABLE backup_table DROP PARTITION tuple()") + + node4.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/backup_table/all_1_1_0/ /var/lib/clickhouse/data/default/backup_table/detached']) + + node4.query("ALTER TABLE backup_table ATTACH PARTITION tuple()") + + assert node4.query("SELECT sum(A) FROM backup_table") == "2\n" + assert node4.query("SELECT B + 2 FROM backup_table") == "4\n" From eebb0233d93f033722ac8df370f9db14ec5ab763 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 28 Jul 2020 13:41:32 +0300 Subject: [PATCH 17/25] Fix header for totals and extremes in QueryPipeline::unitePipelines. --- src/Processors/QueryPipeline.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index f3635ac5408..f063b82e1b2 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -654,7 +654,7 @@ void QueryPipeline::unitePipelines( if (extremes.size() == 1) extremes_port = extremes.back(); else - extremes_port = uniteExtremes(extremes, current_header, processors); + extremes_port = uniteExtremes(extremes, common_header, processors); } if (!totals.empty()) @@ -662,7 +662,7 @@ void QueryPipeline::unitePipelines( if (totals.size() == 1) totals_having_port = totals.back(); else - totals_having_port = uniteTotals(totals, current_header, processors); + totals_having_port = uniteTotals(totals, common_header, processors); } current_header = common_header; From f047194e11e8b53beaa1d1f13599158e2c2172fb Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 28 Jul 2020 13:45:10 +0300 Subject: [PATCH 18/25] Added test. --- .../01416_join_totals_header_bug.reference | 0 .../01416_join_totals_header_bug.sql | 63 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 tests/queries/0_stateless/01416_join_totals_header_bug.reference create mode 100644 tests/queries/0_stateless/01416_join_totals_header_bug.sql diff --git a/tests/queries/0_stateless/01416_join_totals_header_bug.reference b/tests/queries/0_stateless/01416_join_totals_header_bug.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01416_join_totals_header_bug.sql b/tests/queries/0_stateless/01416_join_totals_header_bug.sql new file mode 100644 index 00000000000..089a1d4b72f --- /dev/null +++ b/tests/queries/0_stateless/01416_join_totals_header_bug.sql @@ -0,0 +1,63 @@ +DROP TABLE IF EXISTS tableCommon; +DROP TABLE IF EXISTS tableTrees; +DROP TABLE IF EXISTS tableFlowers; + +CREATE TABLE tableCommon (`key` FixedString(15), `value` Nullable(Int8)) ENGINE = Log(); +CREATE TABLE tableTrees (`key` FixedString(15), `name` Nullable(Int8), `name2` Nullable(Int8)) ENGINE = Log(); +CREATE TABLE tableFlowers (`key` FixedString(15), `name` Nullable(Int8)) ENGINE = Log(); + +SELECT * FROM ( + SELECT common.key, common.value, trees.name, trees.name2 + FROM ( + SELECT * + FROM tableCommon + ) as common + INNER JOIN ( + SELECT * + FROM tableTrees + ) trees ON (common.key = trees.key) +) +UNION ALL +( + SELECT common.key, common.value, + null as name, null as name2 + + FROM ( + SELECT * + FROM tableCommon + ) as common + INNER JOIN ( + SELECT * + FROM tableFlowers + ) flowers ON (common.key = flowers.key) +); + +SELECT * FROM ( + SELECT common.key, common.value, trees.name, trees.name2 + FROM ( + SELECT * + FROM tableCommon + ) as common + INNER JOIN ( + SELECT * + FROM tableTrees + ) trees ON (common.key = trees.key) +) +UNION ALL +( + SELECT common.key, common.value, + flowers.name, null as name2 + + FROM ( + SELECT * + FROM tableCommon + ) as common + INNER JOIN ( + SELECT * + FROM tableFlowers + ) flowers ON (common.key = flowers.key) +); + +DROP TABLE IF EXISTS tableCommon; +DROP TABLE IF EXISTS tableTrees; +DROP TABLE IF EXISTS tableFlowers; From 6dd731dd00e9afb3b3906a29559c2ac5689fef67 Mon Sep 17 00:00:00 2001 From: BayoNet Date: Tue, 28 Jul 2020 15:25:49 +0300 Subject: [PATCH 19/25] DOCS-590: parallel_distributed_insert_select (#12980) * DOCSUP-987: parallel_distributed_insert_select setting (#145) * add EN version * add EN version * changes after review in EN version * add RU version * CLICKHOUSEDOCS-590: Updated text Co-authored-by: Evgenia Sudarikova <56156889+otrazhenia@users.noreply.github.com> Co-authored-by: Sergei Shtykov --- docs/en/operations/settings/settings.md | 14 ++++++++++++++ docs/ru/operations/settings/settings.md | 13 +++++++++++++ 2 files changed, 27 insertions(+) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 829fedc8deb..033ee880860 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1459,6 +1459,20 @@ Possible values: Default value: 16. +## parallel_distributed_insert_select {#parallel_distributed_insert_select} + +Enables parallel distributed `INSERT ... SELECT` query. + +If we execute `INSERT INTO distributed_table_a SELECT ... FROM distributed_table_b` queries and both tables use the same cluster, and both tables are either [replicated](../../engines/table-engines/mergetree-family/replication.md) or non-replicated, then this query is processed locally on every shard. + + +Possible values: + +- 0 — Disabled. +- 1 — Enabled. + +Default value: 0. + ## insert_distributed_sync {#insert_distributed_sync} Enables or disables synchronous data insertion into a [Distributed](../../engines/table-engines/special/distributed.md#distributed) table. diff --git a/docs/ru/operations/settings/settings.md b/docs/ru/operations/settings/settings.md index b8b64ae65c4..afb6a56ceb8 100644 --- a/docs/ru/operations/settings/settings.md +++ b/docs/ru/operations/settings/settings.md @@ -1278,6 +1278,19 @@ Default value: 0. Значение по умолчанию: 16. +## parallel_distributed_insert_select {#parallel_distributed_insert_select} + +Включает параллельную обработку распределённых запросов `INSERT ... SELECT`. + +Если при выполнении запроса `INSERT INTO distributed_table_a SELECT ... FROM distributed_table_b` оказывается, что обе таблицы находятся в одном кластере, то независимо от того [реплицируемые](../../engines/table-engines/mergetree-family/replication.md) они или нет, запрос выполняется локально на каждом шарде. + +Допустимые значения: + +- 0 — выключена. +- 1 — включена. + +Значение по умолчанию: 0. + ## insert_distributed_sync {#insert_distributed_sync} Включает или отключает режим синхронного добавления данных в распределенные таблицы (таблицы с движком [Distributed](../../engines/table-engines/special/distributed.md#distributed)). From cf503be085d72363f772ecf5a6d5295b50906781 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 28 Jul 2020 15:27:23 +0300 Subject: [PATCH 20/25] Fix crash in ColumnTuple::updatePermutation --- src/Columns/ColumnTuple.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Columns/ColumnTuple.cpp b/src/Columns/ColumnTuple.cpp index 4ce5ab7b2a3..87e5e37db51 100644 --- a/src/Columns/ColumnTuple.cpp +++ b/src/Columns/ColumnTuple.cpp @@ -349,7 +349,7 @@ void ColumnTuple::updatePermutation(bool reverse, size_t limit, int nan_directio for (const auto& column : columns) { column->updatePermutation(reverse, limit, nan_direction_hint, res, equal_range); - while (limit && limit <= equal_range.back().first) + while (limit && !equal_range.empty() && limit <= equal_range.back().first) equal_range.pop_back(); if (equal_range.empty()) From c509ba691947ac771a2400112358ebf49af090e5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 28 Jul 2020 15:29:49 +0300 Subject: [PATCH 21/25] Added test. --- .../queries/0_stateless/01417_update_permutation_crash.reference | 1 + tests/queries/0_stateless/01417_update_permutation_crash.sql | 1 + 2 files changed, 2 insertions(+) create mode 100644 tests/queries/0_stateless/01417_update_permutation_crash.reference create mode 100644 tests/queries/0_stateless/01417_update_permutation_crash.sql diff --git a/tests/queries/0_stateless/01417_update_permutation_crash.reference b/tests/queries/0_stateless/01417_update_permutation_crash.reference new file mode 100644 index 00000000000..2b4f3eda2e0 --- /dev/null +++ b/tests/queries/0_stateless/01417_update_permutation_crash.reference @@ -0,0 +1 @@ +(1,1,0) diff --git a/tests/queries/0_stateless/01417_update_permutation_crash.sql b/tests/queries/0_stateless/01417_update_permutation_crash.sql new file mode 100644 index 00000000000..f5923781734 --- /dev/null +++ b/tests/queries/0_stateless/01417_update_permutation_crash.sql @@ -0,0 +1 @@ +select tuple(1, 1, number) as t from numbers_mt(1000001) order by t, number limit 1; From e0c207d6b4f1977c9166081122f2dee1f351a646 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 28 Jul 2020 16:15:41 +0300 Subject: [PATCH 22/25] Update entrypoint.sh --- docker/test/performance-comparison/entrypoint.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker/test/performance-comparison/entrypoint.sh b/docker/test/performance-comparison/entrypoint.sh index f9b90fce863..be087eb956c 100755 --- a/docker/test/performance-comparison/entrypoint.sh +++ b/docker/test/performance-comparison/entrypoint.sh @@ -4,6 +4,8 @@ set -ex # Use the packaged repository to find the revision we will compare to. function find_reference_sha { + git -C right/ch log -1 origin/master + git -C right/ch log -1 pr # Go back from the revision to be tested, trying to find the closest published # testing release. The PR branch may be either pull/*/head which is the # author's branch, or pull/*/merge, which is head merged with some master From 2bde39349978a5160eaf1d95349d5ef0786150f1 Mon Sep 17 00:00:00 2001 From: Alexander Kazakov Date: Tue, 28 Jul 2020 17:38:34 +0300 Subject: [PATCH 23/25] Merging #12548 - Correction to `merge_with_ttl_timeout` logic by @excitoon (#12982) * Fixed `merge_with_ttl_timeout` logic. * Separate TTL-merge timers for each partition Co-authored-by: Vladimir Chebotarev --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 66 +++++++++---------- .../MergeTree/MergeTreeDataMergerMutator.h | 7 +- src/Storages/MergeTree/TTLMergeSelector.cpp | 28 ++++++-- src/Storages/MergeTree/TTLMergeSelector.h | 15 ++++- tests/integration/test_ttl_replicated/test.py | 38 +++++++++++ 5 files changed, 112 insertions(+), 42 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2b407cb7269..9038e2a22f5 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -30,13 +30,15 @@ #include #include #include -#include -#include -#include +#include +#include +#include +#include #include + namespace ProfileEvents { extern const Event MergedRows; @@ -219,14 +221,13 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( return false; } - time_t current_time = time(nullptr); + time_t current_time = std::time(nullptr); IMergeSelector::Partitions partitions; const String * prev_partition_id = nullptr; /// Previous part only in boundaries of partition frame const MergeTreeData::DataPartPtr * prev_part = nullptr; - bool has_part_with_expired_ttl = false; for (const MergeTreeData::DataPartPtr & part : data_parts) { /// Check predicate only for first part in each partition. @@ -258,11 +259,6 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( part_info.min_ttl = part->ttl_infos.part_min_ttl; part_info.max_ttl = part->ttl_infos.part_max_ttl; - time_t ttl = data_settings->ttl_only_drop_parts ? part_info.max_ttl : part_info.min_ttl; - - if (ttl && ttl <= current_time) - has_part_with_expired_ttl = true; - partitions.back().emplace_back(part_info); /// Check for consistency of data parts. If assertion is failed, it requires immediate investigation. @@ -275,38 +271,38 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( prev_part = ∂ } - std::unique_ptr merge_selector; + IMergeSelector::PartsInPartition parts_to_merge; - SimpleMergeSelector::Settings merge_settings; - if (aggressive) - merge_settings.base = 1; - - bool can_merge_with_ttl = - (current_time - last_merge_with_ttl > data_settings->merge_with_ttl_timeout); - - /// NOTE Could allow selection of different merge strategy. - if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled()) + if (!ttl_merges_blocker.isCancelled()) { - merge_selector = std::make_unique(current_time, data_settings->ttl_only_drop_parts); - last_merge_with_ttl = current_time; + TTLMergeSelector merge_selector( + next_ttl_merge_times_by_partition, + current_time, + data_settings->merge_with_ttl_timeout, + data_settings->ttl_only_drop_parts); + parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge); } - else - merge_selector = std::make_unique(merge_settings); - - IMergeSelector::PartsInPartition parts_to_merge = merge_selector->select( - partitions, - max_total_size_to_merge); if (parts_to_merge.empty()) { - if (out_disable_reason) - *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; - return false; - } + SimpleMergeSelector::Settings merge_settings; + if (aggressive) + merge_settings.base = 1; - /// Allow to "merge" part with itself if we need remove some values with expired ttl - if (parts_to_merge.size() == 1 && !has_part_with_expired_ttl) - throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR); + parts_to_merge = SimpleMergeSelector(merge_settings) + .select(partitions, max_total_size_to_merge); + + /// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl + if (parts_to_merge.size() == 1) + throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR); + + if (parts_to_merge.empty()) + { + if (out_disable_reason) + *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; + return false; + } + } MergeTreeData::DataPartsVector parts; parts.reserve(parts_to_merge.size()); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 121cc770d51..3c7fcd99f95 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -242,8 +243,10 @@ private: /// When the last time you wrote to the log that the disk space was running out (not to write about this too often). time_t disk_space_warning_time = 0; - /// Last time when TTLMergeSelector has been used - time_t last_merge_with_ttl = 0; + /// Stores the next TTL merge due time for each partition (used only by TTLMergeSelector) + TTLMergeSelector::PartitionIdToTTLs next_ttl_merge_times_by_partition; + /// Performing TTL merges independently for each partition guarantees that + /// there is only a limited number of TTL merges and no partition stores data, that is too stale }; diff --git a/src/Storages/MergeTree/TTLMergeSelector.cpp b/src/Storages/MergeTree/TTLMergeSelector.cpp index 0ba341fca64..1966f2a4f0a 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.cpp +++ b/src/Storages/MergeTree/TTLMergeSelector.cpp @@ -1,12 +1,20 @@ #include +#include -#include #include +#include namespace DB { +const String & getPartitionIdForPart(const TTLMergeSelector::Part & part_info) +{ + const MergeTreeData::DataPartPtr & part = *static_cast(part_info.data); + return part->info.partition_id; +} + + IMergeSelector::PartsInPartition TTLMergeSelector::select( const Partitions & partitions, const size_t max_total_size_to_merge) @@ -18,15 +26,24 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select( for (size_t i = 0; i < partitions.size(); ++i) { - for (auto it = partitions[i].begin(); it != partitions[i].end(); ++it) + const auto & mergeable_parts_in_partition = partitions[i]; + if (mergeable_parts_in_partition.empty()) + continue; + + const auto & partition_id = getPartitionIdForPart(mergeable_parts_in_partition.front()); + const auto & next_merge_time_for_partition = merge_due_times[partition_id]; + if (next_merge_time_for_partition > current_time) + continue; + + for (Iterator part_it = mergeable_parts_in_partition.cbegin(); part_it != mergeable_parts_in_partition.cend(); ++part_it) { - time_t ttl = only_drop_parts ? it->max_ttl : it->min_ttl; + time_t ttl = only_drop_parts ? part_it->max_ttl : part_it->min_ttl; if (ttl && (partition_to_merge_index == -1 || ttl < partition_to_merge_min_ttl)) { partition_to_merge_min_ttl = ttl; partition_to_merge_index = i; - best_begin = it; + best_begin = part_it; } } } @@ -68,6 +85,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select( ++best_end; } + const auto & best_partition_id = getPartitionIdForPart(best_partition.front()); + merge_due_times[best_partition_id] = current_time + merge_cooldown_time; + return PartsInPartition(best_begin, best_end); } diff --git a/src/Storages/MergeTree/TTLMergeSelector.h b/src/Storages/MergeTree/TTLMergeSelector.h index 2f03d5b9feb..5b7361d2d2b 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.h +++ b/src/Storages/MergeTree/TTLMergeSelector.h @@ -1,7 +1,10 @@ #pragma once +#include #include +#include + namespace DB { @@ -10,17 +13,27 @@ namespace DB * It selects parts to merge by greedy algorithm: * 1. Finds part with the most earliest expired ttl and includes it to result. * 2. Tries to find the longest range of parts with expired ttl, that includes part from step 1. + * Finally, merge selector updates TTL merge timer for the selected partition */ class TTLMergeSelector : public IMergeSelector { public: - explicit TTLMergeSelector(time_t current_time_, bool only_drop_parts_) : current_time(current_time_), only_drop_parts(only_drop_parts_) {} + using PartitionIdToTTLs = std::map; + + explicit TTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool only_drop_parts_) + : merge_due_times(merge_due_times_), + current_time(current_time_), + merge_cooldown_time(merge_cooldown_time_), + only_drop_parts(only_drop_parts_) {} PartsInPartition select( const Partitions & partitions, const size_t max_total_size_to_merge) override; + private: + PartitionIdToTTLs & merge_due_times; time_t current_time; + Int64 merge_cooldown_time; bool only_drop_parts; }; diff --git a/tests/integration/test_ttl_replicated/test.py b/tests/integration/test_ttl_replicated/test.py index a458db07a23..0f201f569b3 100644 --- a/tests/integration/test_ttl_replicated/test.py +++ b/tests/integration/test_ttl_replicated/test.py @@ -5,10 +5,12 @@ import helpers.client as client from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV + cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', with_zookeeper=True) node2 = cluster.add_instance('node2', with_zookeeper=True) + @pytest.fixture(scope="module") def started_cluster(): try: @@ -22,11 +24,13 @@ def started_cluster(): finally: cluster.shutdown() + def drop_table(nodes, table_name): for node in nodes: node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name)) time.sleep(1) + def test_ttl_columns(started_cluster): drop_table([node1, node2], "test_ttl") for node in [node1, node2]: @@ -47,6 +51,40 @@ def test_ttl_columns(started_cluster): assert TSV(node2.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected) +def test_merge_with_ttl_timeout(started_cluster): + table = "test_merge_with_ttl_timeout" + drop_table([node1, node2], table) + for node in [node1, node2]: + node.query( + ''' + CREATE TABLE {table}(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{table}', '{replica}') + ORDER BY id PARTITION BY toDayOfMonth(date); + '''.format(replica=node.name, table=table)) + + node1.query("SYSTEM STOP TTL MERGES {table}".format(table=table)) + node2.query("SYSTEM STOP TTL MERGES {table}".format(table=table)) + + for i in range(1, 4): + node1.query("INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table)) + + assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n" + assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n" + + node1.query("SYSTEM START TTL MERGES {table}".format(table=table)) + node2.query("SYSTEM START TTL MERGES {table}".format(table=table)) + + time.sleep(15) # TTL merges shall happen. + + for i in range(1, 4): + node1.query("INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table)) + + time.sleep(15) # TTL merges shall not happen. + + assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n" + assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n" + + def test_ttl_many_columns(started_cluster): drop_table([node1, node2], "test_ttl_2") for node in [node1, node2]: From 8225bc13fe34aeeda5620f5eb1af2a232d5f58ce Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Tue, 28 Jul 2020 21:42:30 +0300 Subject: [PATCH 24/25] Fix terrible unbearable data rot. CC @blinkov --- docs/en/getting-started/example-datasets/metrica.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/getting-started/example-datasets/metrica.md b/docs/en/getting-started/example-datasets/metrica.md index a9cf80716ce..4131dca78fe 100644 --- a/docs/en/getting-started/example-datasets/metrica.md +++ b/docs/en/getting-started/example-datasets/metrica.md @@ -7,7 +7,7 @@ toc_title: Yandex.Metrica Data Dataset consists of two tables containing anonymized data about hits (`hits_v1`) and visits (`visits_v1`) of Yandex.Metrica. You can read more about Yandex.Metrica in [ClickHouse history](../../introduction/history.md) section. -The dataset consists of two tables, either of them can be downloaded as a compressed `tsv.xz` file or as prepared partitions. In addition to that, an extended version of the `hits` table containing 100 million rows is available as TSV at https://clickhouse-datasets.s3.yandex.net/hits/tsv/hits\_100m\_obfuscated\_v1.tsv.xz and as prepared partitions at https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits\_100m\_obfuscated\_v1.tar.xz. +The dataset consists of two tables, either of them can be downloaded as a compressed `tsv.xz` file or as prepared partitions. In addition to that, an extended version of the `hits` table containing 100 million rows is available as TSV at https://clickhouse-datasets.s3.yandex.net/hits/tsv/hits_100m_obfuscated_v1.tsv.xz and as prepared partitions at https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_100m_obfuscated_v1.tar.xz. ## Obtaining Tables from Prepared Partitions {#obtaining-tables-from-prepared-partitions} From 9e62fa1c5ab29d31697be32dc144e515dabc99a0 Mon Sep 17 00:00:00 2001 From: Ivan Blinkov Date: Tue, 28 Jul 2020 22:38:52 +0300 Subject: [PATCH 25/25] [docs] maybe fix build --- docs/en/operations/system-tables/metric_log.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/operations/system-tables/metric_log.md b/docs/en/operations/system-tables/metric_log.md index 91d8553683e..1e38eb7e247 100644 --- a/docs/en/operations/system-tables/metric_log.md +++ b/docs/en/operations/system-tables/metric_log.md @@ -50,6 +50,6 @@ CurrentMetric_ReplicatedChecks: 0 **See also** - [system.asynchronous\_metrics](../../operations/system-tables/asynchronous_metrics.md) — Contains periodically calculated metrics. -- [system.events](../../operations/system-tables/events.md) — Contains a number of events that occurred. +- [system.events](../../operations/system-tables/events.md#system_tables-events) — Contains a number of events that occurred. - [system.metrics](../../operations/system-tables/metrics.md) — Contains instantly calculated metrics. - [Monitoring](../../operations/monitoring.md) — Base concepts of ClickHouse monitoring.