From 62c1bd5ae952d404d1e74680a6ec7588baee057b Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 1 Mar 2022 10:50:40 +0800 Subject: [PATCH 01/18] hive read columns pruning --- src/Storages/Hive/StorageHive.cpp | 37 ++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 3040ad23283..11296a0f565 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -104,6 +104,7 @@ public: String hdfs_namenode_url_, String format_, String compression_method_, + Block header_block_, Block sample_block_, ContextPtr context_, UInt64 max_block_size_, @@ -115,19 +116,31 @@ public: , format(std::move(format_)) , compression_method(compression_method_) , max_block_size(max_block_size_) - , sample_block(std::move(sample_block_)) + , header_block(header_block_) + , sample_block(sample_block_) , to_read_block(sample_block) , columns_description(getColumnsDescription(sample_block, source_info)) , text_input_field_names(text_input_field_names_) , format_settings(getFormatSettings(getContext())) { /// Initialize to_read_block, which is used to read data from HDFS. - to_read_block = sample_block; for (const auto & name_type : source_info->partition_name_types) { to_read_block.erase(name_type.name); } - + // hive table header doesn't contain partition columns, we need at least one column to read. + if (!to_read_block.columns()) + { + for (size_t i = 0; i < header_block.columns(); ++i) + { + auto & col = header_block.getByPosition(i); + if (source_info->partition_name_types.contains(col.name)) + continue; + to_read_block.insert(0, col); + all_to_read_are_partition_columns = true; + break; + } + } /// Initialize format settings format_settings.hive_text.input_field_names = text_input_field_names; } @@ -204,14 +217,18 @@ public: { Columns columns = res.getColumns(); UInt64 num_rows = res.rows(); + if (all_to_read_are_partition_columns) + columns.clear(); /// Enrich with partition columns. auto types = source_info->partition_name_types.getTypes(); for (size_t i = 0; i < types.size(); ++i) { + if (!sample_block.has(source_info->partition_name_types.getNames()[i])) + continue; auto column = types[i]->createColumnConst(num_rows, source_info->hive_files[current_idx]->getPartitionValues()[i]); auto previous_idx = sample_block.getPositionByName(source_info->partition_name_types.getNames()[i]); - columns.insert(columns.begin() + previous_idx, column->convertToFullColumnIfConst()); + columns.insert(columns.begin() + previous_idx, column); } /// Enrich with virtual columns. @@ -247,8 +264,10 @@ private: String format; String compression_method; UInt64 max_block_size; + Block header_block; // hive table header Block sample_block; Block to_read_block; + bool all_to_read_are_partition_columns = false; ColumnsDescription columns_description; const Names & text_input_field_names; FormatSettings format_settings; @@ -606,13 +625,20 @@ Pipe StorageHive::read( sources_info->table_name = hive_table; sources_info->hive_metastore_client = hive_metastore_client; sources_info->partition_name_types = partition_name_types; + + Block to_read_block; + const auto & sample_block = metadata_snapshot->getSampleBlock(); for (const auto & column : column_names) { + to_read_block.insert(sample_block.getByName(column)); if (column == "_path") sources_info->need_path_column = true; if (column == "_file") sources_info->need_file_column = true; } + // Columming pruning only support parque and orc which are used arrow to read + if (format_name != "Parquet" && format_name != "ORC") + to_read_block = sample_block; if (num_streams > sources_info->hive_files.size()) num_streams = sources_info->hive_files.size(); @@ -625,7 +651,8 @@ Pipe StorageHive::read( hdfs_namenode_url, format_name, compression_method, - metadata_snapshot->getSampleBlock(), + sample_block, + to_read_block, context_, max_block_size, text_input_field_names)); From f4d8fb46c584987c540b48096f398b760ea7b3d2 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 1 Mar 2022 11:03:43 +0800 Subject: [PATCH 02/18] update codes --- src/Storages/Hive/StorageHive.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 11296a0f565..03a1156fbd9 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -131,13 +131,13 @@ public: // hive table header doesn't contain partition columns, we need at least one column to read. if (!to_read_block.columns()) { + all_to_read_are_partition_columns = true; for (size_t i = 0; i < header_block.columns(); ++i) { auto & col = header_block.getByPosition(i); if (source_info->partition_name_types.contains(col.name)) continue; to_read_block.insert(0, col); - all_to_read_are_partition_columns = true; break; } } @@ -636,7 +636,7 @@ Pipe StorageHive::read( if (column == "_file") sources_info->need_file_column = true; } - // Columming pruning only support parque and orc which are used arrow to read + // Columms pruning only support parque and orc which are used arrow to read if (format_name != "Parquet" && format_name != "ORC") to_read_block = sample_block; From d907b70cc48a4049551cb9b1974b95781d98593f Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 2 Mar 2022 10:58:32 +0800 Subject: [PATCH 03/18] update codes: get actual read block --- src/Storages/Hive/StorageHive.cpp | 59 +++++++++++++++++-------------- src/Storages/Hive/StorageHive.h | 3 ++ 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 03a1156fbd9..2f0c3740ea8 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -104,7 +104,6 @@ public: String hdfs_namenode_url_, String format_, String compression_method_, - Block header_block_, Block sample_block_, ContextPtr context_, UInt64 max_block_size_, @@ -116,7 +115,6 @@ public: , format(std::move(format_)) , compression_method(compression_method_) , max_block_size(max_block_size_) - , header_block(header_block_) , sample_block(sample_block_) , to_read_block(sample_block) , columns_description(getColumnsDescription(sample_block, source_info)) @@ -128,19 +126,7 @@ public: { to_read_block.erase(name_type.name); } - // hive table header doesn't contain partition columns, we need at least one column to read. - if (!to_read_block.columns()) - { - all_to_read_are_partition_columns = true; - for (size_t i = 0; i < header_block.columns(); ++i) - { - auto & col = header_block.getByPosition(i); - if (source_info->partition_name_types.contains(col.name)) - continue; - to_read_block.insert(0, col); - break; - } - } + /// Initialize format settings format_settings.hive_text.input_field_names = text_input_field_names; } @@ -217,8 +203,6 @@ public: { Columns columns = res.getColumns(); UInt64 num_rows = res.rows(); - if (all_to_read_are_partition_columns) - columns.clear(); /// Enrich with partition columns. auto types = source_info->partition_name_types.getTypes(); @@ -264,10 +248,8 @@ private: String format; String compression_method; UInt64 max_block_size; - Block header_block; // hive table header Block sample_block; Block to_read_block; - bool all_to_read_are_partition_columns = false; ColumnsDescription columns_description; const Names & text_input_field_names; FormatSettings format_settings; @@ -561,7 +543,34 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded( } return hive_file; } +bool StorageHive::isColumnOriented() +{ + return format_name == "Parquet" || format_name == "ORC"; +} +Block StorageHive::getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns) +{ + if (!isColumnOriented()) + return header_block; + Block result_block = sample_block; + for (const auto & column : partition_columns) + { + sample_block.erase(column); + } + if (!sample_block.columns()) + { + for (size_t i = 0; i < header_block.columns(); ++i) + { + const auto & col = header_block.getByPosition(i); + if (!partition_columns.count(col.name)) + { + result_block.insert(col); + break; + } + } + } + return result_block; +} Pipe StorageHive::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -626,19 +635,18 @@ Pipe StorageHive::read( sources_info->hive_metastore_client = hive_metastore_client; sources_info->partition_name_types = partition_name_types; - Block to_read_block; - const auto & sample_block = metadata_snapshot->getSampleBlock(); + const auto & header_block = metadata_snapshot->getSampleBlock(); + Block sample_block; for (const auto & column : column_names) { - to_read_block.insert(sample_block.getByName(column)); + sample_block.insert(header_block.getByName(column)); if (column == "_path") sources_info->need_path_column = true; if (column == "_file") sources_info->need_file_column = true; } - // Columms pruning only support parque and orc which are used arrow to read - if (format_name != "Parquet" && format_name != "ORC") - to_read_block = sample_block; + + sample_block = getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()}); if (num_streams > sources_info->hive_files.size()) num_streams = sources_info->hive_files.size(); @@ -652,7 +660,6 @@ Pipe StorageHive::read( format_name, compression_method, sample_block, - to_read_block, context_, max_block_size, text_input_field_names)); diff --git a/src/Storages/Hive/StorageHive.h b/src/Storages/Hive/StorageHive.h index 9629629e057..9f8d2fe907a 100644 --- a/src/Storages/Hive/StorageHive.h +++ b/src/Storages/Hive/StorageHive.h @@ -88,6 +88,9 @@ private: HiveFilePtr createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_); + bool isColumnOriented(); + Block getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns); + String hive_metastore_url; /// Hive database and table From 75a50a30c417b81478b7f0d5ad6c89e3fd71e80e Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 7 Mar 2022 09:43:53 +0800 Subject: [PATCH 04/18] update codes --- src/Storages/Hive/StorageHive.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/Hive/StorageHive.h b/src/Storages/Hive/StorageHive.h index 9f8d2fe907a..f3381b09958 100644 --- a/src/Storages/Hive/StorageHive.h +++ b/src/Storages/Hive/StorageHive.h @@ -52,6 +52,8 @@ public: SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/) override; NamesAndTypesList getVirtuals() const override; + + bool isColumnOriented() const override; protected: friend class StorageHiveSource; @@ -88,7 +90,6 @@ private: HiveFilePtr createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_); - bool isColumnOriented(); Block getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns); String hive_metastore_url; From c37eedd8877d2ba431e4c1308af2bc9fc65c3db2 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 7 Mar 2022 10:30:54 +0800 Subject: [PATCH 05/18] update codes --- src/Storages/Hive/StorageHive.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 2f0c3740ea8..4db2b8b98d1 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -543,7 +543,7 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded( } return hive_file; } -bool StorageHive::isColumnOriented() +bool StorageHive::isColumnOriented() const { return format_name == "Parquet" || format_name == "ORC"; } From cfeedd2cb53d6e781d60d3673855fd8f7ea54d21 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 7 Mar 2022 12:28:31 +0800 Subject: [PATCH 06/18] fixed code style --- src/Storages/Hive/StorageHive.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/Hive/StorageHive.h b/src/Storages/Hive/StorageHive.h index f3381b09958..90f339cb8ec 100644 --- a/src/Storages/Hive/StorageHive.h +++ b/src/Storages/Hive/StorageHive.h @@ -52,7 +52,7 @@ public: SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/) override; NamesAndTypesList getVirtuals() const override; - + bool isColumnOriented() const override; protected: From 8ae5296ee81d17a6a02e91c3acd7892c099c9fca Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 7 Mar 2022 17:26:48 +0800 Subject: [PATCH 07/18] fixed compile errors --- src/Storages/Hive/StorageHive.cpp | 2 +- src/Storages/Hive/StorageHive.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 4db2b8b98d1..540089ecf62 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -548,7 +548,7 @@ bool StorageHive::isColumnOriented() const return format_name == "Parquet" || format_name == "ORC"; } -Block StorageHive::getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns) +Block StorageHive::getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns) const { if (!isColumnOriented()) return header_block; diff --git a/src/Storages/Hive/StorageHive.h b/src/Storages/Hive/StorageHive.h index 90f339cb8ec..eccd04a6759 100644 --- a/src/Storages/Hive/StorageHive.h +++ b/src/Storages/Hive/StorageHive.h @@ -90,7 +90,7 @@ private: HiveFilePtr createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_); - Block getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns); + Block getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns) const; String hive_metastore_url; From 202ac18e764c82497742bf5db1b9bae8801e8dc5 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 4 Mar 2022 15:50:21 +0000 Subject: [PATCH 08/18] Skip 01086_odbc_roundtrip for aarch, disable force_tests --- tests/ci/ci_config.py | 2 -- tests/queries/0_stateless/01086_odbc_roundtrip.sh | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/ci/ci_config.py b/tests/ci/ci_config.py index 000d3d9a000..b45a4ce90c6 100644 --- a/tests/ci/ci_config.py +++ b/tests/ci/ci_config.py @@ -231,7 +231,6 @@ CI_CONFIG = { }, "Stateful tests (aarch64, actions)": { "required_build": "package_aarch64", - "force_tests": True, }, "Stateful tests (release, DatabaseOrdinary, actions)": { "required_build": "package_release", @@ -259,7 +258,6 @@ CI_CONFIG = { }, "Stateless tests (aarch64, actions)": { "required_build": "package_aarch64", - "force_tests": True, }, "Stateless tests (release, wide parts enabled, actions)": { "required_build": "package_release", diff --git a/tests/queries/0_stateless/01086_odbc_roundtrip.sh b/tests/queries/0_stateless/01086_odbc_roundtrip.sh index 705746032f8..20066c6b34c 100755 --- a/tests/queries/0_stateless/01086_odbc_roundtrip.sh +++ b/tests/queries/0_stateless/01086_odbc_roundtrip.sh @@ -1,6 +1,7 @@ #!/usr/bin/env bash -# Tags: no-asan, no-msan, no-fasttest +# Tags: no-asan, no-msan, no-fasttest, no-cpu-aarch64 # Tag no-msan: can't pass because odbc libraries are not instrumented +# Tag no-cpu-aarch64: clickhouse-odbc is not setup for arm CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh From 84e22fb32bc6e4f54444d4356413865276712d57 Mon Sep 17 00:00:00 2001 From: tavplubix Date: Mon, 7 Mar 2022 18:59:00 +0300 Subject: [PATCH 09/18] Update DiskLocal.cpp --- src/Disks/DiskLocal.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 57bfaf405e0..e49e9cf6726 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -325,7 +326,7 @@ DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path) void DiskLocal::moveFile(const String & from_path, const String & to_path) { - fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path); + renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path); } void DiskLocal::replaceFile(const String & from_path, const String & to_path) From a8cfc2458a8a5db2a946096fad0bc2299cf5eb10 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 8 Mar 2022 11:55:15 +0800 Subject: [PATCH 10/18] update codes --- src/Storages/Hive/StorageHive.cpp | 30 +++++++++++++++++------------- src/Storages/Hive/StorageHive.h | 2 +- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 540089ecf62..a11488cf3cf 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -115,12 +115,12 @@ public: , format(std::move(format_)) , compression_method(compression_method_) , max_block_size(max_block_size_) - , sample_block(sample_block_) - , to_read_block(sample_block) + , sample_block(std::move(sample_block_)) , columns_description(getColumnsDescription(sample_block, source_info)) , text_input_field_names(text_input_field_names_) , format_settings(getFormatSettings(getContext())) { + to_read_block = sample_block; /// Initialize to_read_block, which is used to read data from HDFS. for (const auto & name_type : source_info->partition_name_types) { @@ -206,12 +206,16 @@ public: /// Enrich with partition columns. auto types = source_info->partition_name_types.getTypes(); + auto names = source_info->partition_name_types.getNames(); + auto fields = source_info->hive_files[current_idx]->getPartitionValues(); for (size_t i = 0; i < types.size(); ++i) { - if (!sample_block.has(source_info->partition_name_types.getNames()[i])) + // Only add the required partition columns. partition columns are not read from readbuffer + // the column must be in sample_block, otherwise sample_block.getPositionByName(names[i]) will throw an exception + if (!sample_block.has(names[i])) continue; - auto column = types[i]->createColumnConst(num_rows, source_info->hive_files[current_idx]->getPartitionValues()[i]); - auto previous_idx = sample_block.getPositionByName(source_info->partition_name_types.getNames()[i]); + auto column = types[i]->createColumnConst(num_rows, fields[i]); + auto previous_idx = sample_block.getPositionByName(names[i]); columns.insert(columns.begin() + previous_idx, column); } @@ -548,28 +552,28 @@ bool StorageHive::isColumnOriented() const return format_name == "Parquet" || format_name == "ORC"; } -Block StorageHive::getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns) const +void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const { if (!isColumnOriented()) - return header_block; - Block result_block = sample_block; + sample_block = header_block; + UInt32 erased_columns = 0; for (const auto & column : partition_columns) { - sample_block.erase(column); + if (sample_block.has(column)) + erased_columns++; } - if (!sample_block.columns()) + if (erased_columns == sample_block.columns()) { for (size_t i = 0; i < header_block.columns(); ++i) { const auto & col = header_block.getByPosition(i); if (!partition_columns.count(col.name)) { - result_block.insert(col); + sample_block.insert(col); break; } } } - return result_block; } Pipe StorageHive::read( const Names & column_names, @@ -646,7 +650,7 @@ Pipe StorageHive::read( sources_info->need_file_column = true; } - sample_block = getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()}); + getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()}); if (num_streams > sources_info->hive_files.size()) num_streams = sources_info->hive_files.size(); diff --git a/src/Storages/Hive/StorageHive.h b/src/Storages/Hive/StorageHive.h index eccd04a6759..71d17750190 100644 --- a/src/Storages/Hive/StorageHive.h +++ b/src/Storages/Hive/StorageHive.h @@ -90,7 +90,7 @@ private: HiveFilePtr createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_); - Block getActualColumnsToRead(Block sample_block, const Block & header_block, const NameSet & partition_columns) const; + void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const; String hive_metastore_url; From caffc144b5a1c31e93c3db7cae0eb22494217b45 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 24 Feb 2022 15:23:26 +0300 Subject: [PATCH 11/18] Fix possible "Part directory doesn't exist" during INSERT In #33291 final part commit had been defered, and now it can take significantly more time, that may lead to "Part directory doesn't exist" error during INSERT: 2022.02.21 18:18:06.979881 [ 11329 ] {insert} executeQuery: (from 127.1:24572, user: default) INSERT INTO db.table (...) VALUES 2022.02.21 20:58:03.933593 [ 11329 ] {insert} db.table: Renaming temporary part tmp_insert_20220214_18044_18044_0 to 20220214_270654_270654_0. 2022.02.21 21:16:50.961917 [ 11329 ] {insert} db.table: Renaming temporary part tmp_insert_20220214_18197_18197_0 to 20220214_270689_270689_0. ... 2022.02.22 21:16:57.632221 [ 64878 ] {} db.table: Removing temporary directory /clickhouse/data/db/table/tmp_insert_20220214_18232_18232_0/ ... 2022.02.23 12:23:56.277480 [ 11329 ] {insert} db.table: Renaming temporary part tmp_insert_20220214_18232_18232_0 to 20220214_273459_273459_0. 2022.02.23 12:23:56.299218 [ 11329 ] {insert} executeQuery: Code: 107. DB::Exception: Part directory /clickhouse/data/db/table/tmp_insert_20220214_18232_18232_0/ doesn't exist. Most likely it is a logical error. (FILE_DOESNT_EXIST) (version 22.2.1.1) (from 127.1:24572) (in query: INSERT INTO db.table (...) VALUES), Stack trace (when copying this message, always include the lines below): Follow-up for: #28760 Refs: #33291 Signed-off-by: Azat Khuzhin --- src/Storages/MergeTree/MergeTask.cpp | 8 ++--- src/Storages/MergeTree/MergeTreeData.cpp | 6 ++-- src/Storages/MergeTree/MergeTreeData.h | 28 ++++++++-------- .../MergeTree/MergeTreeDataMergerMutator.cpp | 6 ---- .../MergeTree/MergeTreeDataMergerMutator.h | 20 ----------- .../ReplicatedMergeTreeCleanupThread.cpp | 2 +- src/Storages/MergeTree/TemporaryParts.cpp | 24 ++++++++++++++ src/Storages/MergeTree/TemporaryParts.h | 33 +++++++++++++++++++ src/Storages/StorageMergeTree.cpp | 4 +-- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 10 files changed, 81 insertions(+), 52 deletions(-) create mode 100644 src/Storages/MergeTree/TemporaryParts.cpp create mode 100644 src/Storages/MergeTree/TemporaryParts.h diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 89fb27cc89c..8b5c2e0dc6e 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -126,13 +126,9 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() if (ctx->disk->exists(local_new_part_tmp_path)) throw Exception("Directory " + fullPath(ctx->disk, local_new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - { - std::lock_guard lock(global_ctx->mutator->tmp_parts_lock); - global_ctx->mutator->tmp_parts.emplace(local_tmp_part_basename); - } + global_ctx->data->temporary_parts.add(local_tmp_part_basename); SCOPE_EXIT( - std::lock_guard lock(global_ctx->mutator->tmp_parts_lock); - global_ctx->mutator->tmp_parts.erase(local_tmp_part_basename); + global_ctx->data->temporary_parts.remove(local_tmp_part_basename); ); global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical(); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 8ea9d0a31d0..a15da7578e8 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1386,7 +1386,7 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa } -size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds) +size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds) { /// If the method is already called from another thread, then we don't need to do anything. std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock); @@ -1418,9 +1418,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta { if (disk->isDirectory(it->path()) && isOldPartDirectory(disk, it->path(), deadline)) { - if (merger_mutator.hasTemporaryPart(basename)) + if (temporary_parts.contains(basename)) { - LOG_WARNING(log, "{} is an active destination for one of merge/mutation (consider increasing temporary_directories_lifetime setting)", full_path); + LOG_WARNING(log, "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path); continue; } else diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 1a04b2a389b..1e7f127c85b 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -3,30 +3,31 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include #include #include -#include #include #include #include -#include -#include +#include +#include +#include +#include #include #include -#include +#include +#include #include @@ -566,7 +567,7 @@ public: /// Delete all directories which names begin with "tmp" /// Must be called with locked lockForShare() because it's using relative_data_path. - size_t clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds); + size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds); size_t clearEmptyParts(); @@ -906,7 +907,6 @@ public: mutable std::mutex currently_submerging_emerging_mutex; protected: - friend class IMergeTreeDataPart; friend class MergeTreeDataMergerMutator; friend struct ReplicatedMergeTreeTableMetadata; @@ -1200,6 +1200,8 @@ private: /// Create zero-copy exclusive lock for part and disk. Useful for coordination of /// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree. virtual std::optional tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; } + + TemporaryParts temporary_parts; }; /// RAII struct to record big parts that are submerging or emerging. diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2f097b69fc4..a6cda0016a8 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -782,10 +782,4 @@ ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadat } -bool MergeTreeDataMergerMutator::hasTemporaryPart(const std::string & basename) const -{ - std::lock_guard lock(tmp_parts_lock); - return tmp_parts.contains(basename); -} - } diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 82cad873dce..ae09e2c916c 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -192,26 +192,6 @@ private: ITTLMergeSelector::PartitionIdToTTLs next_recompress_ttl_merge_times_by_partition; /// Performing TTL merges independently for each partition guarantees that /// there is only a limited number of TTL merges and no partition stores data, that is too stale - -public: - /// Returns true if passed part name is active. - /// (is the destination for one of active mutation/merge). - /// - /// NOTE: that it accept basename (i.e. dirname), not the path, - /// since later requires canonical form. - bool hasTemporaryPart(const std::string & basename) const; - -private: - /// Set of active temporary paths that is used as the destination. - /// List of such paths is required to avoid trying to remove them during cleanup. - /// - /// NOTE: It is pretty short, so use STL is fine. - std::unordered_set tmp_parts; - /// Lock for "tmp_parts". - /// - /// NOTE: mutable is required to mark hasTemporaryPath() const - mutable std::mutex tmp_parts_lock; - }; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 26bfd951d3d..3b6c727cd02 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -64,7 +64,7 @@ void ReplicatedMergeTreeCleanupThread::iterate() /// Both use relative_data_path which changes during rename, so we /// do it under share lock storage.clearOldWriteAheadLogs(); - storage.clearOldTemporaryDirectories(storage.merger_mutator, storage.getSettings()->temporary_directories_lifetime.totalSeconds()); + storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds()); } /// This is loose condition: no problem if we actually had lost leadership at this moment diff --git a/src/Storages/MergeTree/TemporaryParts.cpp b/src/Storages/MergeTree/TemporaryParts.cpp new file mode 100644 index 00000000000..4239c8232e5 --- /dev/null +++ b/src/Storages/MergeTree/TemporaryParts.cpp @@ -0,0 +1,24 @@ +#include + +namespace DB +{ + +bool TemporaryParts::contains(const std::string & basename) const +{ + std::lock_guard lock(mutex); + return parts.contains(basename); +} + +void TemporaryParts::add(std::string basename) +{ + std::lock_guard lock(mutex); + parts.emplace(std::move(basename)); +} + +void TemporaryParts::remove(const std::string & basename) +{ + std::lock_guard lock(mutex); + parts.erase(basename); +} + +} diff --git a/src/Storages/MergeTree/TemporaryParts.h b/src/Storages/MergeTree/TemporaryParts.h new file mode 100644 index 00000000000..bc9d270856f --- /dev/null +++ b/src/Storages/MergeTree/TemporaryParts.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +/// Manages set of active temporary paths that should not be cleaned by background thread. +class TemporaryParts : private boost::noncopyable +{ +private: + /// To add const qualifier for contains() + mutable std::mutex mutex; + + /// NOTE: It is pretty short, so use STL is fine. + std::unordered_set parts; + +public: + /// Returns true if passed part name is active. + /// (is the destination for one of active mutation/merge). + /// + /// NOTE: that it accept basename (i.e. dirname), not the path, + /// since later requires canonical form. + bool contains(const std::string & basename) const; + + void add(std::string basename); + void remove(const std::string & basename); +}; + +} diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 2db93def004..a05ed04a66c 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -108,7 +108,7 @@ void StorageMergeTree::startup() /// Temporary directories contain incomplete results of merges (after forced restart) /// and don't allow to reinitialize them, so delete each of them immediately - clearOldTemporaryDirectories(merger_mutator, 0); + clearOldTemporaryDirectories(0); /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup_parts.restart(); @@ -1062,7 +1062,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign assignee.scheduleCommonTask(ExecutableLambdaAdapter::create( [this, share_lock] () { - return clearOldTemporaryDirectories(merger_mutator, getSettings()->temporary_directories_lifetime.totalSeconds()); + return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds()); }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); scheduled = true; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ab42396f8da..82bddddb32d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -451,7 +451,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } /// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart), /// don't allow to reinitialize them, delete each of them immediately. - clearOldTemporaryDirectories(merger_mutator, 0); + clearOldTemporaryDirectories(0); clearOldWriteAheadLogs(); } From 6499fc2c455289ec9e74689dd20d4ec0e8da8ab1 Mon Sep 17 00:00:00 2001 From: cnmade Date: Tue, 8 Mar 2022 17:03:46 +0800 Subject: [PATCH 12/18] Translate zh/sql-reference/statements/alter/settings-profile: rename old file --- .../alter/{settings-profile.md => settings-profile.md.bak} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/zh/sql-reference/statements/alter/{settings-profile.md => settings-profile.md.bak} (100%) diff --git a/docs/zh/sql-reference/statements/alter/settings-profile.md b/docs/zh/sql-reference/statements/alter/settings-profile.md.bak similarity index 100% rename from docs/zh/sql-reference/statements/alter/settings-profile.md rename to docs/zh/sql-reference/statements/alter/settings-profile.md.bak From 80a8e4aa10c8a69901b31dc0eb93ad25953fe281 Mon Sep 17 00:00:00 2001 From: cnmade Date: Tue, 8 Mar 2022 17:10:54 +0800 Subject: [PATCH 13/18] Translate zh/sql-reference/statements/alter/settings-profile: reimport file --- .../statements/alter/settings-profile.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 docs/zh/sql-reference/statements/alter/settings-profile.md diff --git a/docs/zh/sql-reference/statements/alter/settings-profile.md b/docs/zh/sql-reference/statements/alter/settings-profile.md new file mode 100644 index 00000000000..57d12142c48 --- /dev/null +++ b/docs/zh/sql-reference/statements/alter/settings-profile.md @@ -0,0 +1,16 @@ +--- +toc_priority: 48 +toc_title: SETTINGS PROFILE +--- + +## ALTER SETTINGS PROFILE {#alter-settings-profile-statement} + +Changes settings profiles. + +Syntax: + +``` sql +ALTER SETTINGS PROFILE [IF EXISTS] TO name1 [ON CLUSTER cluster_name1] [RENAME TO new_name1] + [, name2 [ON CLUSTER cluster_name2] [RENAME TO new_name2] ...] + [SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [READONLY|WRITABLE] | INHERIT 'profile_name'] [,...] +``` From 0d668e4b15caf9175ec809158f95f738c60d76fd Mon Sep 17 00:00:00 2001 From: cnmade Date: Tue, 8 Mar 2022 17:13:36 +0800 Subject: [PATCH 14/18] Translate zh/sql-reference/statements/alter/settings-profile: translate to zh --- .../zh/sql-reference/statements/alter/settings-profile.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/zh/sql-reference/statements/alter/settings-profile.md b/docs/zh/sql-reference/statements/alter/settings-profile.md index 57d12142c48..045b2461e8c 100644 --- a/docs/zh/sql-reference/statements/alter/settings-profile.md +++ b/docs/zh/sql-reference/statements/alter/settings-profile.md @@ -1,13 +1,13 @@ --- toc_priority: 48 -toc_title: SETTINGS PROFILE +toc_title: 配置文件设置 --- -## ALTER SETTINGS PROFILE {#alter-settings-profile-statement} +## 更改配置文件设置 {#alter-settings-profile-statement} -Changes settings profiles. +更改配置文件设置。 -Syntax: +语法: ``` sql ALTER SETTINGS PROFILE [IF EXISTS] TO name1 [ON CLUSTER cluster_name1] [RENAME TO new_name1] From e0ab2c7ca2a2dc235f6f1048d1cc5dccdb50daba Mon Sep 17 00:00:00 2001 From: cnmade Date: Tue, 8 Mar 2022 17:14:06 +0800 Subject: [PATCH 15/18] Translate zh/sql-reference/statements/alter/settings-profile: remove old file --- docs/zh/sql-reference/statements/alter/settings-profile.md.bak | 1 - 1 file changed, 1 deletion(-) delete mode 120000 docs/zh/sql-reference/statements/alter/settings-profile.md.bak diff --git a/docs/zh/sql-reference/statements/alter/settings-profile.md.bak b/docs/zh/sql-reference/statements/alter/settings-profile.md.bak deleted file mode 120000 index 0e71ac4e831..00000000000 --- a/docs/zh/sql-reference/statements/alter/settings-profile.md.bak +++ /dev/null @@ -1 +0,0 @@ -../../../../en/sql-reference/statements/alter/settings-profile.md \ No newline at end of file From c4b634285363093fb71a7e57fcd3273a0d52d91d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 8 Mar 2022 17:24:39 +0300 Subject: [PATCH 16/18] Improvements for `parallel_distributed_insert_select` (and related) (#34728) * Add a warning if parallel_distributed_insert_select was ignored Signed-off-by: Azat Khuzhin * Respect max_distributed_depth for parallel_distributed_insert_select Signed-off-by: Azat Khuzhin * Print warning for non applied parallel_distributed_insert_select only for initial query Signed-off-by: Azat Khuzhin * Remove Cluster::getHashOfAddresses() Signed-off-by: Azat Khuzhin * Forbid parallel_distributed_insert_select for remote()/cluster() with different addresses Before it uses empty cluster name (getClusterName()) which is not correct, compare all addresses instead. Signed-off-by: Azat Khuzhin * Fix max_distributed_depth check max_distributed_depth=1 must mean not more then one distributed query, not two, since max_distributed_depth=0 means no limit, and distribute_depth is 0 for the first query. Signed-off-by: Azat Khuzhin * Fix INSERT INTO remote()/cluster() with parallel_distributed_insert_select Signed-off-by: Azat Khuzhin * Add a test for parallel_distributed_insert_select with cluster()/remote() Signed-off-by: Azat Khuzhin * Return instead of empty cluster name in Distributed engine Signed-off-by: Azat Khuzhin * Make user with sharding_key and w/o in remote()/cluster() identical Before with sharding_key the user was "default", while w/o it it was empty. Signed-off-by: Azat Khuzhin --- src/Interpreters/Cluster.h | 2 -- .../ClusterProxy/executeQuery.cpp | 2 +- src/Storages/Distributed/DistributedSink.cpp | 2 +- .../ExternalDataSourceConfiguration.h | 2 +- src/Storages/StorageDistributed.cpp | 33 ++++++++++++++++-- src/Storages/StorageDistributed.h | 3 +- ...istributed_insert_select_cluster.reference | 27 +++++++++++++++ ...llel_distributed_insert_select_cluster.sql | 34 +++++++++++++++++++ 8 files changed, 95 insertions(+), 10 deletions(-) create mode 100644 tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.reference create mode 100644 tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.sql diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index 3773dadaf13..248d212ebf0 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -207,7 +207,6 @@ public: using ShardsInfo = std::vector; - String getHashOfAddresses() const { return hash_of_addresses; } const ShardsInfo & getShardsInfo() const { return shards_info; } const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; } @@ -263,7 +262,6 @@ private: /// Inter-server secret String secret; - String hash_of_addresses; /// Description of the cluster shards. ShardsInfo shards_info; /// Any remote shard. diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 0db07267231..884b8445732 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -116,7 +116,7 @@ void executeQuery( const Settings & settings = context->getSettingsRef(); - if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth) + if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth) throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); std::vector plans; diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 9951fb436b5..aa703bcbb89 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -126,7 +126,7 @@ DistributedSink::DistributedSink( , log(&Poco::Logger::get("DistributedBlockOutputStream")) { const auto & settings = context->getSettingsRef(); - if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth) + if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth) throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); context->getClientInfo().distributed_depth += 1; random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; diff --git a/src/Storages/ExternalDataSourceConfiguration.h b/src/Storages/ExternalDataSourceConfiguration.h index 1e08b088b1d..cc3e136ba50 100644 --- a/src/Storages/ExternalDataSourceConfiguration.h +++ b/src/Storages/ExternalDataSourceConfiguration.h @@ -16,7 +16,7 @@ struct ExternalDataSourceConfiguration { String host; UInt16 port = 0; - String username; + String username = "default"; String password; String database; String table; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index da648aa4e5c..fcbf22bbd33 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -118,6 +118,7 @@ namespace ErrorCodes extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; extern const int ARGUMENT_OUT_OF_BOUND; + extern const int TOO_LARGE_DISTRIBUTED_DEPTH; } namespace ActionLocks @@ -705,6 +706,9 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context) { const Settings & settings = local_context->getSettingsRef(); + if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth) + throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); + std::shared_ptr storage_src; auto & select = query.select->as(); auto new_query = std::dynamic_pointer_cast(query.clone()); @@ -733,14 +737,34 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer } } - if (!storage_src || storage_src->getClusterName() != getClusterName()) + const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{}; + const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses(); + /// Compare addresses instead of cluster name, to handle remote()/cluster(). + /// (since for remote()/cluster() the getClusterName() is empty string) + if (src_addresses != dst_addresses) { + /// The warning should be produced only for root queries, + /// since in case of parallel_distributed_insert_select=1, + /// it will produce warning for the rewritten insert, + /// since destination table is still Distributed there. + if (local_context->getClientInfo().distributed_depth == 0) + { + LOG_WARNING(log, + "Parallel distributed INSERT SELECT is not possible " + "(source cluster={} ({} addresses), destination cluster={} ({} addresses))", + storage_src ? storage_src->getClusterName() : "", + src_addresses.size(), + getClusterName(), + dst_addresses.size()); + } return nullptr; } if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL) { new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName()); + /// Reset table function for INSERT INTO remote()/cluster() + new_query->table_function.reset(); } const auto & cluster = getCluster(); @@ -757,12 +781,15 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer new_query_str = buf.str(); } + ContextMutablePtr query_context = Context::createCopy(local_context); + ++query_context->getClientInfo().distributed_depth; + for (size_t shard_index : collections::range(0, shards_info.size())) { const auto & shard_info = shards_info[shard_index]; if (shard_info.isLocal()) { - InterpreterInsertQuery interpreter(new_query, local_context); + InterpreterInsertQuery interpreter(new_query, query_context); pipelines.emplace_back(std::make_unique()); pipelines.back()->init(interpreter.execute().pipeline); } @@ -776,7 +803,7 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer /// INSERT SELECT query returns empty block auto remote_query_executor - = std::make_shared(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context); + = std::make_shared(shard_info.pool, std::move(connections), new_query_str, Block{}, query_context); pipelines.emplace_back(std::make_unique()); pipelines.back()->init(Pipe(std::make_shared(remote_query_executor, false, settings.async_socket_for_remote))); pipelines.back()->setSinks([](const Block & header, QueryPipelineBuilder::StreamType) -> ProcessorPtr diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index e47e0fddd6c..45b1cd640ee 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -114,8 +114,6 @@ public: /// Used by InterpreterInsertQuery std::string getRemoteDatabaseName() const { return remote_database; } std::string getRemoteTableName() const { return remote_table; } - /// Returns empty string if tables is used by TableFunctionRemote - std::string getClusterName() const { return cluster_name; } ClusterPtr getCluster() const; /// Used by InterpreterSystemQuery @@ -201,6 +199,7 @@ private: std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const; size_t getRandomShardIndex(const Cluster::ShardsInfo & shards); + std::string getClusterName() const { return cluster_name.empty() ? "" : cluster_name; } const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } diff --git a/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.reference b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.reference new file mode 100644 index 00000000000..05fbb680c65 --- /dev/null +++ b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.reference @@ -0,0 +1,27 @@ +-- { echoOn } +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH } +select * from dst_02224; +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=2; +select * from dst_02224; +1 +1 +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; +1 +1 +truncate table dst_02224; +insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key) +select * from remote('127.{1,2}', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; +1 +1 diff --git a/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.sql b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.sql new file mode 100644 index 00000000000..023f220e930 --- /dev/null +++ b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.sql @@ -0,0 +1,34 @@ +drop table if exists dst_02224; +drop table if exists src_02224; +create table dst_02224 (key Int) engine=Memory(); +create table src_02224 (key Int) engine=Memory(); +insert into src_02224 values (1); + +-- { echoOn } +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH } +select * from dst_02224; + +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=2; +select * from dst_02224; + +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; + +truncate table dst_02224; +insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key) +select * from remote('127.{1,2}', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; +-- { echoOff } + +drop table src_02224; +drop table dst_02224; From 52ed751d58d228325d17b15961a3f3ea7c2d6ee8 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Tue, 8 Mar 2022 16:29:42 +0100 Subject: [PATCH 17/18] Fix installation documentation typo --- docs/_includes/install/deb.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/_includes/install/deb.sh b/docs/_includes/install/deb.sh index 9dceef4c245..0daf12a132f 100644 --- a/docs/_includes/install/deb.sh +++ b/docs/_includes/install/deb.sh @@ -1,11 +1,11 @@ -sudo apt-get install apt-transport-https ca-certificates dirmngr +sudo apt-get install -y apt-transport-https ca-certificates dirmngr sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754 -echo "deb https://packages.clickhouse.com/deb stable main/" | sudo tee \ +echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \ /etc/apt/sources.list.d/clickhouse.list sudo apt-get update sudo apt-get install -y clickhouse-server clickhouse-client sudo service clickhouse-server start -clickhouse-client # or "clickhouse-client --password" if you set up a password. +clickhouse-client # or "clickhouse-client --password" if you've set up a password. From a871036361ea8e57660ecd88f1da5dea29b5ebf4 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 8 Mar 2022 18:42:29 +0300 Subject: [PATCH 18/18] Fix `parallel_reading_from_replicas` with `clickhouse-bechmark` (#34751) * Use INITIAL_QUERY for clickhouse-benchmark Signed-off-by: Azat Khuzhin * Fix parallel_reading_from_replicas with clickhouse-bechmark Before it produces the following error: $ clickhouse-benchmark --stacktrace -i1 --query "select * from remote('127.1', default.data_mt) limit 10" --allow_experimental_parallel_reading_from_replicas=1 --max_parallel_replicas=3 Loaded 1 queries. Logical error: 'Coordinator for parallel reading from replicas is not initialized'. Aborted (core dumped) Since it uses the same code, i.e RemoteQueryExecutor -> MultiplexedConnections, which enables coordinator if it was requested from settings, but it should be done only for non-initial queries, i.e. when server send connection to another server. Signed-off-by: Azat Khuzhin * Fix 02226_parallel_reading_from_replicas_benchmark for older shellcheck By shellcheck 0.8 does not complains, while on CI shellcheck 0.7.0 and it does complains [1]: In 02226_parallel_reading_from_replicas_benchmark.sh line 17: --allow_experimental_parallel_reading_from_replicas=1 ^-- SC2191: The = here is literal. To assign by index, use ( [index]=value ) with no spaces. To keep as literal, quote it. Did you mean: "--allow_experimental_parallel_reading_from_replicas=1" [1]: https://s3.amazonaws.com/clickhouse-test-reports/34751/d883af711822faf294c876b017cbf745b1cda1b3/style_check__actions_/shellcheck_output.txt Signed-off-by: Azat Khuzhin --- programs/benchmark/Benchmark.cpp | 2 ++ src/Client/MultiplexedConnections.cpp | 7 ++++- src/QueryPipeline/RemoteQueryExecutor.cpp | 10 ++----- src/QueryPipeline/RemoteQueryExecutor.h | 8 ++++- ..._reading_from_replicas_benchmark.reference | 0 ...arallel_reading_from_replicas_benchmark.sh | 29 +++++++++++++++++++ 6 files changed, 46 insertions(+), 10 deletions(-) create mode 100644 tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.reference create mode 100755 tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp index 35ffb97b8e2..60e5ca92f77 100644 --- a/programs/benchmark/Benchmark.cpp +++ b/programs/benchmark/Benchmark.cpp @@ -435,6 +435,8 @@ private: Progress progress; executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); + executor.sendQuery(ClientInfo::QueryKind::INITIAL_QUERY); + ProfileInfo info; while (Block block = executor.read()) info.update(block); diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index d1873ac038d..31fbc609bdc 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -133,7 +133,12 @@ void MultiplexedConnections::sendQuery( modified_settings.group_by_two_level_threshold_bytes = 0; } - if (settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas) + bool parallel_reading_from_replicas = settings.max_parallel_replicas > 1 + && settings.allow_experimental_parallel_reading_from_replicas + /// To avoid trying to coordinate with clickhouse-benchmark, + /// since it uses the same code. + && client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY; + if (parallel_reading_from_replicas) { client_info.collaborate_with_initiator = true; client_info.count_participating_replicas = replica_info.all_replicas_count; diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 142e56ceb25..d1275444b84 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -210,7 +210,7 @@ static Block adaptBlockStructure(const Block & block, const Block & header) return res; } -void RemoteQueryExecutor::sendQuery() +void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind) { if (sent_query) return; @@ -237,13 +237,7 @@ void RemoteQueryExecutor::sendQuery() auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - /// Set initial_query_id to query_id for the clickhouse-benchmark. - /// - /// (since first query of clickhouse-benchmark will be issued as SECONDARY_QUERY, - /// due to it executes queries via RemoteBlockInputStream) - if (modified_client_info.initial_query_id.empty()) - modified_client_info.initial_query_id = query_id; + modified_client_info.query_kind = query_kind; if (CurrentThread::isInitialized()) { modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 655bd5603de..78bc9f611ab 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -83,7 +83,13 @@ public: ~RemoteQueryExecutor(); /// Create connection and send query, external tables and scalars. - void sendQuery(); + /// + /// @param query_kind - kind of query, usually it is SECONDARY_QUERY, + /// since this is the queries between servers + /// (for which this code was written in general). + /// But clickhouse-benchmark uses the same code, + /// and it should pass INITIAL_QUERY. + void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY); /// Query is resent to a replica, the query itself can be modified. std::atomic resent_query { false }; diff --git a/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.reference b/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh b/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh new file mode 100755 index 00000000000..2a163746e20 --- /dev/null +++ b/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " +drop table if exists data_02226; +create table data_02226 (key Int) engine=MergeTree() order by key +as select * from numbers(1); +" + +# Regression for: +# +# Logical error: 'Coordinator for parallel reading from replicas is not initialized'. +opts=( + --allow_experimental_parallel_reading_from_replicas 1 + --max_parallel_replicas 3 + + --iterations 1 +) +$CLICKHOUSE_BENCHMARK --query "select * from remote('127.1', $CLICKHOUSE_DATABASE, data_02226)" "${opts[@]}" >& /dev/null +ret=$? + +$CLICKHOUSE_CLIENT -nm -q " +drop table data_02226; +" + +exit $ret