From cf0d8be8aa1cb8bce1810962e7f44ee357a9a7a6 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Mon, 19 Apr 2021 17:45:46 +0300 Subject: [PATCH 01/30] Add uniqTHetaSketch in performance test --- tests/performance/uniq.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/performance/uniq.xml b/tests/performance/uniq.xml index b4e73733769..f688f1d5a9d 100644 --- a/tests/performance/uniq.xml +++ b/tests/performance/uniq.xml @@ -46,6 +46,7 @@ uniqUpTo(10) uniqUpTo(25) uniqUpTo(100) + uniqThetaSketch From d3a9d6633fdcec1174287a8439ad73de5cb237c9 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Tue, 18 May 2021 11:34:13 +0300 Subject: [PATCH 02/30] Rename uniqThetaSketch to uniqTheta --- tests/performance/uniq.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/performance/uniq.xml b/tests/performance/uniq.xml index f688f1d5a9d..b373bccd938 100644 --- a/tests/performance/uniq.xml +++ b/tests/performance/uniq.xml @@ -46,7 +46,7 @@ uniqUpTo(10) uniqUpTo(25) uniqUpTo(100) - uniqThetaSketch + uniqTheta From 4b945321368ac2749b825ab8559b0c5a476938b5 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Tue, 18 May 2021 17:34:11 +0300 Subject: [PATCH 03/30] Adjust query with SearchPhrase --- tests/performance/uniq.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/performance/uniq.xml b/tests/performance/uniq.xml index b373bccd938..52dfefb902b 100644 --- a/tests/performance/uniq.xml +++ b/tests/performance/uniq.xml @@ -23,7 +23,6 @@ --> SearchEngineID RegionID - SearchPhrase ClientIP @@ -52,4 +51,5 @@ SELECT {key} AS k, {func}(UserID) FROM hits_100m_single GROUP BY k FORMAT Null + SELECT SearchPhrase AS k, uniqTheta(UserID) FROM (SELECT SearchPhrase, UserID FROM hits_100m_single LIMIT 20000000) GROUP BY k From b93d59e9310939ef1a7cbc2e9ed821f5015467ac Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Thu, 20 May 2021 11:13:27 +0300 Subject: [PATCH 04/30] Try to limit all queries to see the changes --- tests/performance/uniq.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/performance/uniq.xml b/tests/performance/uniq.xml index 52dfefb902b..378a7ee5193 100644 --- a/tests/performance/uniq.xml +++ b/tests/performance/uniq.xml @@ -24,6 +24,7 @@ SearchEngineID RegionID ClientIP + SearchPhrase @@ -50,6 +51,5 @@ - SELECT {key} AS k, {func}(UserID) FROM hits_100m_single GROUP BY k FORMAT Null - SELECT SearchPhrase AS k, uniqTheta(UserID) FROM (SELECT SearchPhrase, UserID FROM hits_100m_single LIMIT 20000000) GROUP BY k + SELECT {key} AS k, {func}(UserID) FROM (SELECT {key}, UserID FROM hits_100m_single LIMIT 20000000) GROUP BY k From e832296768da5eff3bf0a7916d1a9a17f78a9603 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Thu, 20 May 2021 11:14:24 +0300 Subject: [PATCH 05/30] Reorder values --- tests/performance/uniq.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/performance/uniq.xml b/tests/performance/uniq.xml index 378a7ee5193..766742f43cd 100644 --- a/tests/performance/uniq.xml +++ b/tests/performance/uniq.xml @@ -23,8 +23,8 @@ --> SearchEngineID RegionID - ClientIP SearchPhrase + ClientIP From 3d01028d192ba7534d8f7f5801bcb4c07751c383 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Fri, 28 May 2021 14:20:39 +0300 Subject: [PATCH 06/30] Use hits_10m_single instead of hits_100m_single --- tests/performance/uniq.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/performance/uniq.xml b/tests/performance/uniq.xml index acd84d75788..f6f7ac01c65 100644 --- a/tests/performance/uniq.xml +++ b/tests/performance/uniq.xml @@ -1,6 +1,6 @@ - hits_100m_single + hits_10m_single 30000000000 @@ -58,5 +58,5 @@ - SELECT {key} AS k, {func}(UserID) FROM (SELECT {key}, UserID FROM hits_100m_single LIMIT 20000000) GROUP BY k + SELECT {key} AS k, {func}(UserID) FROM hits_10m_single GROUP BY k FORMAT Null From dde9ce522338450df9e969eac40c177f37e6c95a Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Fri, 11 Jun 2021 15:22:35 +0300 Subject: [PATCH 07/30] Use hits_10m_single only for uniqTheta --- tests/performance/uniq.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/performance/uniq.xml b/tests/performance/uniq.xml index f6f7ac01c65..e8f3aed62fe 100644 --- a/tests/performance/uniq.xml +++ b/tests/performance/uniq.xml @@ -1,5 +1,6 @@ + hits_100m_single hits_10m_single 30000000000 @@ -53,10 +54,10 @@ uniqUpTo(10) uniqUpTo(25) uniqUpTo(100) - uniqTheta - SELECT {key} AS k, {func}(UserID) FROM hits_10m_single GROUP BY k FORMAT Null + SELECT {key} AS k, {func}(UserID) FROM hits_100m_single GROUP BY k FORMAT Null + SELECT {key} AS k, uniqTheta(UserID) FROM hits_10m_single GROUP BY k FORMAT Null From 0f9fc33a4e917c2925228ce5a3b66eafd9b042f8 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 21 Jun 2021 11:27:10 +0000 Subject: [PATCH 08/30] Fix postgres arrays --- .../fetchPostgreSQLTableStructure.cpp | 83 ++++++++++++++----- .../test_storage_postgresql/test.py | 16 ++++ 2 files changed, 80 insertions(+), 19 deletions(-) diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index a310315dcc8..ff3e4008af0 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -25,14 +25,19 @@ namespace ErrorCodes } -static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, uint16_t dimensions) +static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, uint16_t dimensions, const std::function & recheck_array) { DataTypePtr res; + bool is_array = false; /// Get rid of trailing '[]' for arrays - if (dimensions) + if (type.ends_with("[]")) + { + is_array = true; + while (type.ends_with("[]")) type.resize(type.size() - 2); + } if (type == "smallint") res = std::make_shared(); @@ -88,8 +93,24 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, ui res = std::make_shared(); if (is_nullable) res = std::make_shared(res); - while (dimensions--) - res = std::make_shared(res); + + if (is_array) + { + /// In some cases att_ndims does not return correct number of dimensions + /// (it might return incorrect 0 number, for example, when a postgres table is created via 'as select * from table_with_arrays'). + /// So recheck all arrays separately afterwards. (Cannot check here on the same connection because another query is in execution). + if (!dimensions) + { + /// Return 1d array type and recheck all arrays dims with array_ndims + res = std::make_shared(res); + recheck_array(); + } + else + { + while (dimensions--) + res = std::make_shared(res); + } + } return res; } @@ -98,7 +119,7 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, ui std::shared_ptr fetchPostgreSQLTableStructure( postgres::ConnectionHolderPtr connection_holder, const String & postgres_table_name, bool use_nulls) { - auto columns = NamesAndTypesList(); + auto columns = NamesAndTypes(); if (postgres_table_name.find('\'') != std::string::npos || postgres_table_name.find('\\') != std::string::npos) @@ -115,22 +136,46 @@ std::shared_ptr fetchPostgreSQLTableStructure( "AND NOT attisdropped AND attnum > 0", postgres_table_name); try { - pqxx::read_transaction tx(connection_holder->get()); - auto stream{pqxx::stream_from::query(tx, query)}; - - std::tuple row; - while (stream >> row) + std::set recheck_arrays_indexes; { - columns.push_back(NameAndTypePair( - std::get<0>(row), - convertPostgreSQLDataType( - std::get<1>(row), - use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable - std::get<3>(row)))); + pqxx::read_transaction tx(connection_holder->get()); + auto stream{pqxx::stream_from::query(tx, query)}; + + std::tuple row; + size_t i = 0; + auto recheck_array = [&]() { recheck_arrays_indexes.insert(i); }; + while (stream >> row) + { + auto data_type = convertPostgreSQLDataType(std::get<1>(row), + use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable + std::get<3>(row), + recheck_array); + columns.push_back(NameAndTypePair(std::get<0>(row), data_type)); + ++i; + } + stream.complete(); + tx.commit(); + } + + for (auto & i : recheck_arrays_indexes) + { + const auto & name_and_type = columns[i]; + + pqxx::nontransaction tx(connection_holder->get()); + /// All rows must contain the same number of dimensions, so limit 1 is ok. If number of dimensions in all rows is not the same - + /// such arrays are not able to be used as ClickHouse Array at all. + pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table_name))}; + auto dimensions = result[0][0].as(); + + /// It is always 1d array if it is in recheck. + DataTypePtr type = assert_cast(name_and_type.type.get())->getNestedType(); + while (dimensions--) + type = std::make_shared(type); + + columns[i] = NameAndTypePair(name_and_type.name, type); } - stream.complete(); - tx.commit(); } + catch (const pqxx::undefined_table &) { throw Exception(fmt::format( @@ -146,7 +191,7 @@ std::shared_ptr fetchPostgreSQLTableStructure( if (columns.empty()) return nullptr; - return std::make_shared(columns); + return std::make_shared(NamesAndTypesList(columns.begin(), columns.end())); } } diff --git a/tests/integration/test_storage_postgresql/test.py b/tests/integration/test_storage_postgresql/test.py index f81033822c8..05c7ba9365d 100644 --- a/tests/integration/test_storage_postgresql/test.py +++ b/tests/integration/test_storage_postgresql/test.py @@ -308,6 +308,22 @@ def test_postgres_distributed(started_cluster): assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n') +def test_postgres_ndim(started_cluster): + conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True) + cursor = conn.cursor() + cursor.execute('CREATE TABLE arr1 (a Integer[])') + cursor.execute("INSERT INTO arr1 SELECT '{{1}, {2}}'") + + # The point is in creating a table via 'as select *', in postgres att_ndim will not be correct in this case. + cursor.execute('CREATE TABLE arr2 AS SELECT * FROM arr1') + cursor.execute("SELECT attndims AS dims FROM pg_attribute WHERE attrelid = 'arr2'::regclass; ") + result = cursor.fetchall()[0] + assert(int(result[0]) == 0) + + result = node1.query('''SELECT toTypeName(a) FROM postgresql('postgres1:5432', 'clickhouse', 'arr2', 'postgres', 'mysecretpassword')''') + assert(result.strip() == "Array(Array(Nullable(Int32)))") + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...") From 77a3a1416deb9b54981087d659ba720d94789119 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Mon, 21 Jun 2021 17:26:14 +0300 Subject: [PATCH 09/30] Update fetchPostgreSQLTableStructure.cpp --- src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index ff3e4008af0..ea86fe94e8e 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -157,7 +157,7 @@ std::shared_ptr fetchPostgreSQLTableStructure( tx.commit(); } - for (auto & i : recheck_arrays_indexes) + for (const auto & i : recheck_arrays_indexes) { const auto & name_and_type = columns[i]; From a0209178cc198fd06ff5814d9f4d11e0775aa387 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 3 May 2021 10:52:45 +0300 Subject: [PATCH 10/30] Add ability to split distributed batch on failures Add distributed_directory_monitor_split_batch_on_failure setting (OFF by default), that will split the batch and send files one by one in case of retriable errors. v2: more error codes --- docs/en/operations/settings/settings.md | 21 +++ src/Common/ErrorCodes.cpp | 1 + src/Core/Settings.h | 1 + src/Storages/Distributed/DirectoryMonitor.cpp | 122 ++++++++++++++---- src/Storages/Distributed/DirectoryMonitor.h | 1 + .../__init__.py | 0 .../configs/overrides_1.xml | 15 +++ .../configs/overrides_2.xml | 15 +++ .../configs/remote_servers.xml | 18 +++ .../test.py | 60 +++++++++ 10 files changed, 227 insertions(+), 27 deletions(-) create mode 100644 tests/integration/test_distributed_directory_monitor_split_batch_on_failure/__init__.py create mode 100644 tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml create mode 100644 tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml create mode 100644 tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/remote_servers.xml create mode 100644 tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 2bde3b03048..29a6948567d 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1802,6 +1802,27 @@ Possible values: Default value: 0. +## distributed_directory_monitor_split_batch_on_failure {#distributed_directory_monitor_split_batch_on_failure} + +Enables/disables splitting batches on failures. + +Sometimes sending particular batch to the remote shard may fail, because of some complex pipeline after (i.e. `MATERIALIZED VIEW` with `GROUP BY`) due to `Memory limit exceeded` or similar errors. In this case, retrying will not help (and this will stuck distributed sends for the table) but sending files from that batch one by one may succeed INSERT. + +So installing this setting to `1` will disable batching for such batches (i.e. temporary disables `distributed_directory_monitor_batch_inserts` for failed batches). + +Possible values: + +- 1 — Enabled. +- 0 — Disabled. + +Default value: 0. + +!!! note "Note" + This setting also affects broken batches (that may appears because of abnormal server (machine) termination and no `fsync_after_insert`/`fsync_directories` for [Distributed](../../engines/table-engines/special/distributed.md) table engine). + +!!! warning "Warning" + You should not rely on automatic batch splitting, since this may hurt performance. + ## os_thread_priority {#setting-os-thread-priority} Sets the priority ([nice](https://en.wikipedia.org/wiki/Nice_(Unix))) for threads that execute queries. The OS scheduler considers this priority when choosing the next thread to run on each available CPU core. diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index be26997d8ff..2b3df9ea96a 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -555,6 +555,7 @@ M(585, CANNOT_PARSE_YAML) \ M(586, CANNOT_CREATE_FILE) \ M(587, CONCURRENT_ACCESS_NOT_SUPPORTED) \ + M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 84e7500b064..e660dc7caea 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -90,6 +90,7 @@ class IColumn; M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \ \ M(Bool, distributed_directory_monitor_batch_inserts, false, "Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones.", 0) \ + M(Bool, distributed_directory_monitor_split_batch_on_failure, false, "Should StorageDistributed DirectoryMonitors try to split batch into smaller in case of failures.", 0) \ \ M(Bool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.", 0) \ \ diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index e8835132f8f..15a097a5ab9 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -52,6 +52,14 @@ namespace ErrorCodes extern const int ATTEMPT_TO_READ_AFTER_EOF; extern const int EMPTY_DATA_PASSED; extern const int INCORRECT_FILE_NAME; + extern const int MEMORY_LIMIT_EXCEEDED; + extern const int DISTRIBUTED_BROKEN_BATCH_INFO; + extern const int TOO_MANY_PARTS; + extern const int TOO_MANY_BYTES; + extern const int TOO_MANY_ROWS_OR_BYTES; + extern const int TOO_MANY_PARTITIONS; + extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; + extern const int ARGUMENT_OUT_OF_BOUND; } @@ -203,9 +211,25 @@ namespace || code == ErrorCodes::CANNOT_READ_ALL_DATA || code == ErrorCodes::UNKNOWN_CODEC || code == ErrorCodes::CANNOT_DECOMPRESS + || code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO || (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF); } + /// Can the batch be split and send files from batch one-by-one instead? + bool isSplittableErrorCode(int code) + { + return code == ErrorCodes::MEMORY_LIMIT_EXCEEDED + /// FunctionRange::max_elements and similar + || code == ErrorCodes::ARGUMENT_OUT_OF_BOUND + || code == ErrorCodes::TOO_MANY_PARTS + || code == ErrorCodes::TOO_MANY_BYTES + || code == ErrorCodes::TOO_MANY_ROWS_OR_BYTES + || code == ErrorCodes::TOO_MANY_PARTITIONS + || code == ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES + || code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO + ; + } + SyncGuardPtr getDirectorySyncGuard(bool dir_fsync, const DiskPtr & disk, const String & path) { if (dir_fsync) @@ -295,6 +319,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( , relative_path(relative_path_) , path(fs::path(disk->getPath()) / relative_path / "") , should_batch_inserts(storage.getContext()->getSettingsRef().distributed_directory_monitor_batch_inserts) + , split_batch_on_failure(storage.getContext()->getSettingsRef().distributed_directory_monitor_split_batch_on_failure) , dir_fsync(storage.getDistributedSettingsRef().fsync_directories) , min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows) , min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes) @@ -618,6 +643,7 @@ struct StorageDistributedDirectoryMonitor::Batch StorageDistributedDirectoryMonitor & parent; const std::map & file_index_to_path; + bool split_batch_on_failure = true; bool fsync = false; bool dir_fsync = false; @@ -626,6 +652,7 @@ struct StorageDistributedDirectoryMonitor::Batch const std::map & file_index_to_path_) : parent(parent_) , file_index_to_path(file_index_to_path_) + , split_batch_on_failure(parent.split_batch_on_failure) , fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert) , dir_fsync(parent.dir_fsync) {} @@ -681,35 +708,20 @@ struct StorageDistributedDirectoryMonitor::Batch bool batch_broken = false; try { - std::unique_ptr remote; - - for (UInt64 file_idx : file_indices) + try { - auto file_path = file_index_to_path.find(file_idx); - if (file_path == file_index_to_path.end()) - { - LOG_ERROR(parent.log, "Failed to send batch: file with index {} is absent", file_idx); - batch_broken = true; - break; - } - - ReadBufferFromFile in(file_path->second); - const auto & distributed_header = readDistributedHeader(in, parent.log); - - if (!remote) - { - remote = std::make_unique(*connection, timeouts, - distributed_header.insert_query, - distributed_header.insert_settings, - distributed_header.client_info); - remote->writePrefix(); - } - bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; - writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log); + sendBatch(*connection, timeouts); + } + catch (const Exception & e) + { + if (split_batch_on_failure && isSplittableErrorCode(e.code())) + { + tryLogCurrentException(parent.log, "Trying to split batch due to"); + sendSeparateFiles(*connection, timeouts); + } + else + throw; } - - if (remote) - remote->writeSuffix(); } catch (Exception & e) { @@ -773,6 +785,62 @@ struct StorageDistributedDirectoryMonitor::Batch } recovered = true; } + +private: + void sendBatch(Connection & connection, const ConnectionTimeouts & timeouts) + { + std::unique_ptr remote; + + for (UInt64 file_idx : file_indices) + { + auto file_path = file_index_to_path.find(file_idx); + if (file_path == file_index_to_path.end()) + throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO, + "Failed to send batch: file with index {} is absent", file_idx); + + ReadBufferFromFile in(file_path->second); + const auto & distributed_header = readDistributedHeader(in, parent.log); + + if (!remote) + { + remote = std::make_unique(connection, timeouts, + distributed_header.insert_query, + distributed_header.insert_settings, + distributed_header.client_info); + remote->writePrefix(); + } + bool compression_expected = connection.getCompression() == Protocol::Compression::Enable; + writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log); + } + + if (remote) + remote->writeSuffix(); + } + + void sendSeparateFiles(Connection & connection, const ConnectionTimeouts & timeouts) + { + for (UInt64 file_idx : file_indices) + { + auto file_path = file_index_to_path.find(file_idx); + if (file_path == file_index_to_path.end()) + { + LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx); + continue; + } + + ReadBufferFromFile in(file_path->second); + const auto & distributed_header = readDistributedHeader(in, parent.log); + + RemoteBlockOutputStream remote(connection, timeouts, + distributed_header.insert_query, + distributed_header.insert_settings, + distributed_header.client_info); + remote.writePrefix(); + bool compression_expected = connection.getCompression() == Protocol::Compression::Enable; + writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log); + remote.writeSuffix(); + } + } }; class DirectoryMonitorBlockInputStream : public IBlockInputStream diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index ab9b8592294..c04c49f3b9b 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -92,6 +92,7 @@ private: std::string path; const bool should_batch_inserts = false; + const bool split_batch_on_failure = true; const bool dir_fsync = false; const size_t min_batched_block_size_rows = 0; const size_t min_batched_block_size_bytes = 0; diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/__init__.py b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml new file mode 100644 index 00000000000..4e4ccf75323 --- /dev/null +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml @@ -0,0 +1,15 @@ + + + + + 0 + + 1 + + 1 + + 86400 + 86400 + + + diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml new file mode 100644 index 00000000000..d7c69c4a9ac --- /dev/null +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml @@ -0,0 +1,15 @@ + + + + + 0 + + 1 + + 0 + + 86400 + 86400 + + + diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/remote_servers.xml b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/remote_servers.xml new file mode 100644 index 00000000000..ebce4697529 --- /dev/null +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py new file mode 100644 index 00000000000..9cbf8771ee5 --- /dev/null +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py @@ -0,0 +1,60 @@ +import pytest +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +# node1 -- distributed_directory_monitor_split_batch_on_failure=on +node1 = cluster.add_instance('node1', + main_configs=['configs/remote_servers.xml'], + user_configs=['configs/overrides_1.xml'], +) +# node2 -- distributed_directory_monitor_split_batch_on_failure=off +node2 = cluster.add_instance('node2', + main_configs=['configs/remote_servers.xml'], + user_configs=['configs/overrides_2.xml'], +) + +@pytest.fixture(scope='module') +def started_cluster(): + try: + cluster.start() + + for _, node in cluster.instances.items(): + node.query(""" + create table null_ (key Int, value Int) engine=Null(); + create table dist as null_ engine=Distributed(test_cluster, currentDatabase(), null_, key); + create table data (key Int, uniq_values Int) engine=Memory(); + create materialized view mv to data as select key, uniqExact(value) uniq_values from null_ group by key; + system stop distributed sends dist; + + create table dist_data as data engine=Distributed(test_cluster, currentDatabase(), data); + """) + + yield cluster + finally: + cluster.shutdown() + +def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluster): + for i in range(0, 100): + limit = 100e3 + node2.query(f'insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}', settings={ + # max_memory_usage is the limit for the batch on the remote node + # (local query should not be affected since 30MB is enough for 100K rows) + 'max_memory_usage': '30Mi', + }) + # "Received from" is mandatory, since the exception should be thrown on the remote node. + with pytest.raises(QueryRuntimeException, match=r'DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv'): + node2.query('system flush distributed dist') + assert int(node2.query('select count() from dist_data')) == 0 + +def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster): + for i in range(0, 100): + limit = 100e3 + node1.query(f'insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}', settings={ + # max_memory_usage is the limit for the batch on the remote node + # (local query should not be affected since 30MB is enough for 100K rows) + 'max_memory_usage': '30Mi', + }) + node1.query('system flush distributed dist') + assert int(node1.query('select count() from dist_data')) == 100000 From 3bd53c68f98e6b4c4fb75275289a55b9b072f864 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 3 May 2021 10:52:51 +0300 Subject: [PATCH 11/30] Try to split the batch in case of broken batch too Broken batches may be because of abnormal server shutdown (and lack of fsync), and ignoring the whole batch is not great in this case, so apply the same split logic here too. v2: rename exception v3: catch missing exception v4: fix marking the file as broken multiple times (fixes test_insert_distributed_async_send with setting enabled) --- src/Common/ErrorCodes.cpp | 1 + src/Storages/Distributed/DirectoryMonitor.cpp | 48 ++++++++++++++----- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 2b3df9ea96a..f4ceef2896a 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -556,6 +556,7 @@ M(586, CANNOT_CREATE_FILE) \ M(587, CONCURRENT_ACCESS_NOT_SUPPORTED) \ M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \ + M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 15a097a5ab9..b40b73f45cd 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -54,6 +54,7 @@ namespace ErrorCodes extern const int INCORRECT_FILE_NAME; extern const int MEMORY_LIMIT_EXCEEDED; extern const int DISTRIBUTED_BROKEN_BATCH_INFO; + extern const int DISTRIBUTED_BROKEN_BATCH_FILES; extern const int TOO_MANY_PARTS; extern const int TOO_MANY_BYTES; extern const int TOO_MANY_ROWS_OR_BYTES; @@ -212,11 +213,12 @@ namespace || code == ErrorCodes::UNKNOWN_CODEC || code == ErrorCodes::CANNOT_DECOMPRESS || code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO + || code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES || (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF); } /// Can the batch be split and send files from batch one-by-one instead? - bool isSplittableErrorCode(int code) + bool isSplittableErrorCode(int code, bool remote) { return code == ErrorCodes::MEMORY_LIMIT_EXCEEDED /// FunctionRange::max_elements and similar @@ -227,6 +229,7 @@ namespace || code == ErrorCodes::TOO_MANY_PARTITIONS || code == ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES || code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO + || isFileBrokenErrorCode(code, remote) ; } @@ -706,6 +709,7 @@ struct StorageDistributedDirectoryMonitor::Batch auto connection = parent.pool->get(timeouts); bool batch_broken = false; + bool batch_marked_as_broken = false; try { try @@ -714,7 +718,7 @@ struct StorageDistributedDirectoryMonitor::Batch } catch (const Exception & e) { - if (split_batch_on_failure && isSplittableErrorCode(e.code())) + if (split_batch_on_failure && isSplittableErrorCode(e.code(), e.isRemoteException())) { tryLogCurrentException(parent.log, "Trying to split batch due to"); sendSeparateFiles(*connection, timeouts); @@ -729,6 +733,8 @@ struct StorageDistributedDirectoryMonitor::Batch { tryLogCurrentException(parent.log, "Failed to send batch due to"); batch_broken = true; + if (!e.isRemoteException() && e.code() == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES) + batch_marked_as_broken = true; } else { @@ -749,7 +755,7 @@ struct StorageDistributedDirectoryMonitor::Batch for (UInt64 file_index : file_indices) parent.markAsSend(file_index_to_path.at(file_index)); } - else + else if (!batch_marked_as_broken) { LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size()); @@ -819,27 +825,43 @@ private: void sendSeparateFiles(Connection & connection, const ConnectionTimeouts & timeouts) { + size_t broken_files = 0; + for (UInt64 file_idx : file_indices) { auto file_path = file_index_to_path.find(file_idx); if (file_path == file_index_to_path.end()) { LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx); + ++broken_files; continue; } - ReadBufferFromFile in(file_path->second); - const auto & distributed_header = readDistributedHeader(in, parent.log); + try + { + ReadBufferFromFile in(file_path->second); + const auto & distributed_header = readDistributedHeader(in, parent.log); - RemoteBlockOutputStream remote(connection, timeouts, - distributed_header.insert_query, - distributed_header.insert_settings, - distributed_header.client_info); - remote.writePrefix(); - bool compression_expected = connection.getCompression() == Protocol::Compression::Enable; - writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log); - remote.writeSuffix(); + RemoteBlockOutputStream remote(connection, timeouts, + distributed_header.insert_query, + distributed_header.insert_settings, + distributed_header.client_info); + remote.writePrefix(); + bool compression_expected = connection.getCompression() == Protocol::Compression::Enable; + writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log); + remote.writeSuffix(); + } + catch (Exception & e) + { + e.addMessage(fmt::format("While sending {}", file_path->second)); + parent.maybeMarkAsBroken(file_path->second, e); + ++broken_files; + } } + + if (broken_files) + throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES, + "Failed to send {} files", broken_files); } }; From 353a770a870b5604f6c15e07270f4789f705fedb Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 4 May 2021 08:59:09 +0300 Subject: [PATCH 12/30] Extend test_insert_distributed_async_send for distributed_directory_monitor_split_batch_on_failure --- .../configs/remote_servers_split.xml | 32 ++++++++++++ .../configs/users.d/split.xml | 7 +++ .../test.py | 50 ++++++++++++++----- 3 files changed, 77 insertions(+), 12 deletions(-) create mode 100644 tests/integration/test_insert_distributed_async_send/configs/remote_servers_split.xml create mode 100644 tests/integration/test_insert_distributed_async_send/configs/users.d/split.xml diff --git a/tests/integration/test_insert_distributed_async_send/configs/remote_servers_split.xml b/tests/integration/test_insert_distributed_async_send/configs/remote_servers_split.xml new file mode 100644 index 00000000000..e2757bbc18c --- /dev/null +++ b/tests/integration/test_insert_distributed_async_send/configs/remote_servers_split.xml @@ -0,0 +1,32 @@ + + + + + false + + n3 + 9000 + + + n4 + 9000 + + + + + + + n3 + 9000 + + + + + n4 + 9000 + + + + + + diff --git a/tests/integration/test_insert_distributed_async_send/configs/users.d/split.xml b/tests/integration/test_insert_distributed_async_send/configs/users.d/split.xml new file mode 100644 index 00000000000..bf826629685 --- /dev/null +++ b/tests/integration/test_insert_distributed_async_send/configs/users.d/split.xml @@ -0,0 +1,7 @@ + + + + 1 + + + diff --git a/tests/integration/test_insert_distributed_async_send/test.py b/tests/integration/test_insert_distributed_async_send/test.py index b469da4e2e1..a9bf9801f4c 100644 --- a/tests/integration/test_insert_distributed_async_send/test.py +++ b/tests/integration/test_insert_distributed_async_send/test.py @@ -17,11 +17,29 @@ n1 = cluster.add_instance('n1', main_configs=['configs/remote_servers.xml'], use # n2 -- distributed_directory_monitor_batch_inserts=0 n2 = cluster.add_instance('n2', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.d/no_batch.xml']) +# n3 -- distributed_directory_monitor_batch_inserts=1/distributed_directory_monitor_split_batch_on_failure=1 +n3 = cluster.add_instance('n3', main_configs=['configs/remote_servers_split.xml'], user_configs=[ + 'configs/users.d/batch.xml', + 'configs/users.d/split.xml', +]) +# n4 -- distributed_directory_monitor_batch_inserts=0/distributed_directory_monitor_split_batch_on_failure=1 +n4 = cluster.add_instance('n4', main_configs=['configs/remote_servers_split.xml'], user_configs=[ + 'configs/users.d/no_batch.xml', + 'configs/users.d/split.xml', +]) + batch_params = pytest.mark.parametrize('batch', [ (1), (0), ]) +batch_and_split_params = pytest.mark.parametrize('batch,split', [ + (1, 0), + (0, 0), + (1, 1), + (0, 1), +]) + @pytest.fixture(scope='module', autouse=True) def start_cluster(): try: @@ -62,15 +80,19 @@ def insert_data(node): assert size > 1<<16 return size -def get_node(batch): +def get_node(batch, split=None): + if split: + if batch: + return n3 + return n4 if batch: return n1 return n2 -def bootstrap(batch): +def bootstrap(batch, split=None): drop_tables() create_tables('insert_distributed_async_send_cluster_two_replicas') - return insert_data(get_node(batch)) + return insert_data(get_node(batch, split)) def get_path_to_dist_batch(file='2.bin'): # There are: @@ -80,8 +102,8 @@ def get_path_to_dist_batch(file='2.bin'): # @return the file for the n2 shard return f'/var/lib/clickhouse/data/default/dist/shard1_replica2/{file}' -def check_dist_after_corruption(truncate, batch): - node = get_node(batch) +def check_dist_after_corruption(truncate, batch, split=None): + node = get_node(batch, split) if batch: # In batch mode errors are ignored @@ -102,8 +124,12 @@ def check_dist_after_corruption(truncate, batch): broken = get_path_to_dist_batch('broken') node.exec_in_container(['bash', '-c', f'ls {broken}/2.bin']) - assert int(n1.query('SELECT count() FROM data')) == 10000 - assert int(n2.query('SELECT count() FROM data')) == 0 + if split: + assert int(n3.query('SELECT count() FROM data')) == 10000 + assert int(n4.query('SELECT count() FROM data')) == 0 + else: + assert int(n1.query('SELECT count() FROM data')) == 10000 + assert int(n2.query('SELECT count() FROM data')) == 0 @batch_params @@ -114,17 +140,17 @@ def test_insert_distributed_async_send_success(batch): assert int(n1.query('SELECT count() FROM data')) == 10000 assert int(n2.query('SELECT count() FROM data')) == 10000 -@batch_params -def test_insert_distributed_async_send_truncated_1(batch): - size = bootstrap(batch) +@batch_and_split_params +def test_insert_distributed_async_send_truncated_1(batch, split): + size = bootstrap(batch, split) path = get_path_to_dist_batch() - node = get_node(batch) + node = get_node(batch, split) new_size = size - 10 # we cannot use truncate, due to hardlinks node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {new_size} /tmp/bin > {path}']) - check_dist_after_corruption(True, batch) + check_dist_after_corruption(True, batch, split) @batch_params def test_insert_distributed_async_send_truncated_2(batch): From 50d9d3efd5d0275556838719cc52d8e0147ed245 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 22 Jun 2021 10:07:22 +0300 Subject: [PATCH 13/30] Remove only symlinks during force_restore_data of Atomic engine --- src/Databases/DatabaseAtomic.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 6b8c470861d..4b9d84de282 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -25,6 +25,7 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int FILE_ALREADY_EXISTS; extern const int INCORRECT_QUERY; + extern const int ABORTED; } class AtomicDatabaseTablesSnapshotIterator final : public DatabaseTablesSnapshotIterator @@ -420,7 +421,18 @@ void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has { /// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken if (has_force_restore_data_flag) - fs::remove_all(path_to_table_symlinks); + { + for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks)) + { + if (!fs::is_symlink(table_path)) + { + throw Exception(ErrorCodes::ABORTED, + "'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path())); + } + + fs::remove(table_path); + } + } DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach); From 843902631254f99a648891a587be771602f9de43 Mon Sep 17 00:00:00 2001 From: yuchuansun Date: Thu, 24 Jun 2021 10:34:28 +0800 Subject: [PATCH 14/30] Update set.md --- docs/zh/engines/table-engines/special/set.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/engines/table-engines/special/set.md b/docs/zh/engines/table-engines/special/set.md index 71271b0d7ca..a4fd0d85bd1 100644 --- a/docs/zh/engines/table-engines/special/set.md +++ b/docs/zh/engines/table-engines/special/set.md @@ -1,4 +1,4 @@ -# 设置 {#set} +# 集合 {#set} 始终存在于 RAM 中的数据集。它适用于IN运算符的右侧(请参见 «IN运算符» 部分)。 From a616ae88618eb050f2e8df3435fceddfd773a250 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 24 Jun 2021 10:07:31 +0300 Subject: [PATCH 15/30] Improve startup time of Distributed engine. - create directory monitors in parallel (this also includes rmdir in case of directory is empty, since even if the directory is empty it may take some time to remove it, due to waiting for journal or if the directory is large, i.e. it had lots of files before, since remember ext4 does not truncate the directory size on each unlink [1]) - initialize increment in parallel too (since it does readdir()) [1]: https://lore.kernel.org/linux-ext4/930A5754-5CE6-4567-8CF0-62447C97825C@dilger.ca/ --- .../DistributedBlockOutputStream.cpp | 2 +- src/Storages/StorageDistributed.cpp | 65 +++++++++++++++---- src/Storages/StorageDistributed.h | 2 +- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 9b13198812b..9a50cec5986 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -752,7 +752,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms; for (const auto & dir_name : dir_names) { - auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name); + auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false); directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds()); } } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 8507198a7f6..d43fd1532a1 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -800,12 +800,31 @@ void StorageDistributed::startup() if (!storage_policy) return; - for (const DiskPtr & disk : data_volume->getDisks()) - createDirectoryMonitors(disk); + const auto & disks = data_volume->getDisks(); + ThreadPool pool(disks.size()); - for (const String & path : getDataPaths()) + for (const DiskPtr & disk : disks) + { + pool.scheduleOrThrowOnError([&]() + { + createDirectoryMonitors(disk); + }); + } + pool.wait(); + + const auto & paths = getDataPaths(); + std::vector last_increment(paths.size()); + for (size_t i = 0; i < paths.size(); ++i) + { + pool.scheduleOrThrowOnError([&, i]() + { + last_increment[i] = getMaximumFileNumber(paths[i]); + }); + } + pool.wait(); + + for (const auto inc : last_increment) { - UInt64 inc = getMaximumFileNumber(path); if (inc > file_names_increment.value) file_names_increment.value.store(inc); } @@ -907,30 +926,50 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk) } else { - requireDirectoryMonitor(disk, dir_path.filename().string()); + requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true); } } } } -StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name) +StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup) { const std::string & disk_path = disk->getPath(); const std::string key(disk_path + name); - std::lock_guard lock(cluster_nodes_mutex); - auto & node_data = cluster_nodes_data[key]; - if (!node_data.directory_monitor) + auto create_node_data = [&]() { - node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); - node_data.directory_monitor = std::make_unique( + ClusterNodeData data; + data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); + data.directory_monitor = std::make_unique( *this, disk, relative_data_path + name, - node_data.connection_pool, + data.connection_pool, monitors_blocker, getContext()->getDistributedSchedulePool()); + return data; + }; + + /// In case of startup the lock can be acquired later. + if (startup) + { + auto tmp_node_data = create_node_data(); + std::lock_guard lock(cluster_nodes_mutex); + auto & node_data = cluster_nodes_data[key]; + assert(!node_data.directory_monitor); + node_data = std::move(tmp_node_data); + return *node_data.directory_monitor; + } + else + { + std::lock_guard lock(cluster_nodes_mutex); + auto & node_data = cluster_nodes_data[key]; + if (!node_data.directory_monitor) + { + node_data = create_node_data(); + } + return *node_data.directory_monitor; } - return *node_data.directory_monitor; } std::vector StorageDistributed::getDirectoryMonitorsStatuses() const diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index c734b0f777e..c63abbc6aa4 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -160,7 +160,7 @@ private: /// create directory monitors for each existing subdirectory void createDirectoryMonitors(const DiskPtr & disk); /// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name - StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name); + StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup); /// Return list of metrics for all created monitors /// (note that monitors are created lazily, i.e. until at least one INSERT executed) From 9788b0e38a9a6f93f3f618fe4ce4684bc33f00bf Mon Sep 17 00:00:00 2001 From: Denis Glazachev Date: Sat, 26 Jun 2021 17:39:02 +0400 Subject: [PATCH 16/30] Fix locating objcopy in macOS Rework clickhouse_embed_binaries() to compile asm files properly and avoid duplicate symbols when linking in macOS --- CMakeLists.txt | 21 +++++++++-- cmake/embed_binary.cmake | 61 +++++++++++++------------------- docs/en/development/build-osx.md | 2 +- 3 files changed, 44 insertions(+), 40 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9cf8188cc8e..d23e5f540d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -184,10 +184,27 @@ endif () set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -rdynamic") find_program (OBJCOPY_PATH NAMES "llvm-objcopy" "llvm-objcopy-12" "llvm-objcopy-11" "llvm-objcopy-10" "llvm-objcopy-9" "llvm-objcopy-8" "objcopy") + +if (NOT OBJCOPY_PATH AND OS_DARWIN) + find_program (BREW_PATH NAMES "brew") + if (BREW_PATH) + execute_process (COMMAND ${BREW_PATH} --prefix llvm ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE LLVM_PREFIX) + if (LLVM_PREFIX) + find_program (OBJCOPY_PATH NAMES "llvm-objcopy" PATHS "${LLVM_PREFIX}/bin" NO_DEFAULT_PATH) + endif () + if (NOT OBJCOPY_PATH) + execute_process (COMMAND ${BREW_PATH} --prefix binutils ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE BINUTILS_PREFIX) + if (BINUTILS_PREFIX) + find_program (OBJCOPY_PATH NAMES "objcopy" PATHS "${BINUTILS_PREFIX}/bin" NO_DEFAULT_PATH) + endif () + endif () + endif () +endif () + if (OBJCOPY_PATH) - message(STATUS "Using objcopy: ${OBJCOPY_PATH}.") + message (STATUS "Using objcopy: ${OBJCOPY_PATH}") else () - message(FATAL_ERROR "Cannot find objcopy.") + message (FATAL_ERROR "Cannot find objcopy.") endif () if (OS_DARWIN) diff --git a/cmake/embed_binary.cmake b/cmake/embed_binary.cmake index e132c590520..a87c63d714e 100644 --- a/cmake/embed_binary.cmake +++ b/cmake/embed_binary.cmake @@ -33,51 +33,38 @@ macro(clickhouse_embed_binaries) message(FATAL_ERROR "The list of binary resources to embed may not be empty") endif() - # If cross-compiling, ensure we use the toolchain file and target the actual target architecture - if (CMAKE_CROSSCOMPILING) - set(CROSS_COMPILE_FLAGS --target=${CMAKE_C_COMPILER_TARGET}) - - # FIXME: find a way to properly pass all cross-compile flags to custom command in CMake - if (CMAKE_SYSTEM_NAME STREQUAL "Darwin") - list(APPEND CROSS_COMPILE_FLAGS -isysroot ${CMAKE_OSX_SYSROOT} -mmacosx-version-min=${CMAKE_OSX_DEPLOYMENT_TARGET}) - else () - list(APPEND CROSS_COMPILE_FLAGS -isysroot ${CMAKE_SYSROOT}) - endif () - else() - set(CROSS_COMPILE_FLAGS "") - endif() + add_library("${EMBED_TARGET}" STATIC) + set_target_properties("${EMBED_TARGET}" PROPERTIES LINKER_LANGUAGE C) set(EMBED_TEMPLATE_FILE "${PROJECT_SOURCE_DIR}/programs/embed_binary.S.in") - set(RESOURCE_OBJS) - foreach(RESOURCE_FILE ${EMBED_RESOURCES}) - set(RESOURCE_OBJ "${RESOURCE_FILE}.o") - list(APPEND RESOURCE_OBJS "${RESOURCE_OBJ}") - # Normalize the name of the resource + foreach(RESOURCE_FILE ${EMBED_RESOURCES}) + set(ASSEMBLY_FILE_NAME "${RESOURCE_FILE}.S") set(BINARY_FILE_NAME "${RESOURCE_FILE}") + + # Normalize the name of the resource. string(REGEX REPLACE "[\./-]" "_" SYMBOL_NAME "${RESOURCE_FILE}") # - must be last in regex string(REPLACE "+" "_PLUS_" SYMBOL_NAME "${SYMBOL_NAME}") - set(ASSEMBLY_FILE_NAME "${RESOURCE_FILE}.S") - # Put the configured assembly file in the output directory. - # This is so we can clean it up as usual, and we CD to the - # source directory before compiling, so that the assembly - # `.incbin` directive can find the file. + # Generate the configured assembly file in the output directory. configure_file("${EMBED_TEMPLATE_FILE}" "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" @ONLY) - # Generate the output object file by compiling the assembly, in the directory of - # the sources so that the resource file may also be found - add_custom_command( - OUTPUT ${RESOURCE_OBJ} - COMMAND cd "${EMBED_RESOURCE_DIR}" && - ${CMAKE_C_COMPILER} "${CROSS_COMPILE_FLAGS}" -c -o - "${CMAKE_CURRENT_BINARY_DIR}/${RESOURCE_OBJ}" - "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" - COMMAND_EXPAND_LISTS - ) - set_source_files_properties("${RESOURCE_OBJ}" PROPERTIES EXTERNAL_OBJECT true GENERATED true) - endforeach() + # If cross-compiling, ensure we use the toolchain file and target the actual target architecture. + if(CMAKE_CROSSCOMPILING) + set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "--target=${CMAKE_C_COMPILER_TARGET}") - add_library("${EMBED_TARGET}" STATIC ${RESOURCE_OBJS}) - set_target_properties("${EMBED_TARGET}" PROPERTIES LINKER_LANGUAGE C) + # FIXME: find a way to properly pass all cross-compile flags. + if(OS_DARWIN) + set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "-isysroot ${CMAKE_OSX_SYSROOT}") + set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "-mmacosx-version-min=${CMAKE_OSX_DEPLOYMENT_TARGET}") + else() + set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "-isysroot ${CMAKE_SYSROOT}") + endif() + endif() + + # Set the include directory for relative paths specified for `.incbin` directive. + set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY INCLUDE_DIRECTORIES "${EMBED_RESOURCE_DIR}") + + target_sources("${EMBED_TARGET}" PRIVATE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}") + endforeach() endmacro() diff --git a/docs/en/development/build-osx.md b/docs/en/development/build-osx.md index a862bdeb299..687e0179e07 100644 --- a/docs/en/development/build-osx.md +++ b/docs/en/development/build-osx.md @@ -33,7 +33,7 @@ Reboot. ``` bash brew update -brew install cmake ninja libtool gettext llvm gcc +brew install cmake ninja libtool gettext llvm gcc binutils ``` ## Checkout ClickHouse Sources {#checkout-clickhouse-sources} From 4da1b8154ae16a67893fd699670c3eeb30be0eb9 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sat, 26 Jun 2021 22:15:57 +0800 Subject: [PATCH 17/30] Fix data race in getClusters() --- src/Interpreters/Context.cpp | 14 +++++++------- src/Interpreters/Context.h | 2 +- src/Storages/System/StorageSystemClusters.cpp | 2 +- .../System/StorageSystemDDLWorkerQueue.cpp | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index dcddeef2811..899550bffec 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -394,7 +394,7 @@ struct ContextSharedPart /// Clusters for distributed tables /// Initialized on demand (on distributed storages initialization) since Settings should be initialized - std::unique_ptr clusters; + std::shared_ptr clusters; ConfigurationPtr clusters_config; /// Stores updated configs mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config @@ -1882,7 +1882,7 @@ std::optional Context::getTCPPortSecure() const std::shared_ptr Context::getCluster(const std::string & cluster_name) const { - auto res = getClusters().getCluster(cluster_name); + auto res = getClusters()->getCluster(cluster_name); if (res) return res; @@ -1896,7 +1896,7 @@ std::shared_ptr Context::getCluster(const std::string & cluster_name) c std::shared_ptr Context::tryGetCluster(const std::string & cluster_name) const { - return getClusters().getCluster(cluster_name); + return getClusters()->getCluster(cluster_name); } @@ -1911,7 +1911,7 @@ void Context::reloadClusterConfig() const } const auto & config = cluster_config ? *cluster_config : getConfigRef(); - auto new_clusters = std::make_unique(config, settings); + auto new_clusters = std::make_shared(config, settings); { std::lock_guard lock(shared->clusters_mutex); @@ -1927,16 +1927,16 @@ void Context::reloadClusterConfig() const } -Clusters & Context::getClusters() const +std::shared_ptr Context::getClusters() const { std::lock_guard lock(shared->clusters_mutex); if (!shared->clusters) { const auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef(); - shared->clusters = std::make_unique(config, settings); + shared->clusters = std::make_shared(config, settings); } - return *shared->clusters; + return shared->clusters; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index c673eb0d408..7990bd7420b 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -676,7 +676,7 @@ public: void setDDLWorker(std::unique_ptr ddl_worker); DDLWorker & getDDLWorker() const; - Clusters & getClusters() const; + std::shared_ptr getClusters() const; std::shared_ptr getCluster(const std::string & cluster_name) const; std::shared_ptr tryGetCluster(const std::string & cluster_name) const; void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers"); diff --git a/src/Storages/System/StorageSystemClusters.cpp b/src/Storages/System/StorageSystemClusters.cpp index 8a3227aafdb..1f5def6d6b4 100644 --- a/src/Storages/System/StorageSystemClusters.cpp +++ b/src/Storages/System/StorageSystemClusters.cpp @@ -31,7 +31,7 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes() void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const { - for (const auto & name_and_cluster : context->getClusters().getContainer()) + for (const auto & name_and_cluster : context->getClusters()->getContainer()) writeCluster(res_columns, name_and_cluster); const auto databases = DatabaseCatalog::instance().getDatabases(); diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index 98b15bfa6e2..5b9ed938e22 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -130,8 +130,8 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, Context if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) zk_exception_code = code; - const auto & clusters = context->getClusters(); - for (const auto & name_and_cluster : clusters.getContainer()) + const auto clusters = context->getClusters(); + for (const auto & name_and_cluster : clusters->getContainer()) { const ClusterPtr & cluster = name_and_cluster.second; const auto & shards_info = cluster->getShardsInfo(); From 7b3996c603888b1999750613cf7cc49829893d98 Mon Sep 17 00:00:00 2001 From: Denis Glazachev Date: Sat, 26 Jun 2021 21:52:37 +0400 Subject: [PATCH 18/30] Remove manual flag adjusting - cross compilations should be configured automatically --- cmake/embed_binary.cmake | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/cmake/embed_binary.cmake b/cmake/embed_binary.cmake index a87c63d714e..d15962c05d4 100644 --- a/cmake/embed_binary.cmake +++ b/cmake/embed_binary.cmake @@ -49,19 +49,6 @@ macro(clickhouse_embed_binaries) # Generate the configured assembly file in the output directory. configure_file("${EMBED_TEMPLATE_FILE}" "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" @ONLY) - # If cross-compiling, ensure we use the toolchain file and target the actual target architecture. - if(CMAKE_CROSSCOMPILING) - set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "--target=${CMAKE_C_COMPILER_TARGET}") - - # FIXME: find a way to properly pass all cross-compile flags. - if(OS_DARWIN) - set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "-isysroot ${CMAKE_OSX_SYSROOT}") - set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "-mmacosx-version-min=${CMAKE_OSX_DEPLOYMENT_TARGET}") - else() - set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY COMPILE_FLAGS "-isysroot ${CMAKE_SYSROOT}") - endif() - endif() - # Set the include directory for relative paths specified for `.incbin` directive. set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY INCLUDE_DIRECTORIES "${EMBED_RESOURCE_DIR}") From e9e941f6efc410a7c6e3728b8eb01c4de21bd9bb Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Sat, 26 Jun 2021 22:23:14 +0000 Subject: [PATCH 19/30] fix --- tests/queries/0_stateless/arcadia_skip_list.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 2b03f32dccc..9bd0163b58a 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -239,3 +239,4 @@ 01870_modulo_partition_key 01880_remote_ipv6 01882_check_max_parts_to_merge_at_once +01914_exchange_dictionaries From 1e55b9376ae7951861a79350d81d9a8e8d79c495 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 27 Jun 2021 02:48:11 +0300 Subject: [PATCH 20/30] Silent ANTLR parser test (non production) --- tests/queries/skip_list.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index fcfd5192ce9..ab988d3e543 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -519,7 +519,8 @@ "01924_argmax_bitmap_state", "01913_replace_dictionary", "01914_exchange_dictionaries", - "01915_create_or_replace_dictionary" + "01915_create_or_replace_dictionary", + "01913_names_of_tuple_literal" ], "parallel": [ From 930a67da13e0b1db9f65c58a7d5ee6bfc123052f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 27 Jun 2021 02:49:25 +0300 Subject: [PATCH 21/30] Fix the annoying Arcadia --- tests/queries/0_stateless/arcadia_skip_list.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 82d054a223b..afd11cb5a7d 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -249,3 +249,5 @@ 01824_prefer_global_in_and_join 01576_alias_column_rewrite 01924_argmax_bitmap_state +01914_exchange_dictionaries +01923_different_expression_name_alias From ba67097c0f67c7d3c58fa50de7e7664b86d122a7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 27 Jun 2021 02:54:22 +0300 Subject: [PATCH 22/30] Fix test in database Ordinary --- tests/queries/skip_list.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index ab988d3e543..f010efcf916 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -112,7 +112,8 @@ "00738_lock_for_inner_table", "01153_attach_mv_uuid", /// Sometimes cannot lock file most likely due to concurrent or adjacent tests, but we don't care how it works in Ordinary database. - "rocksdb" + "rocksdb", + "01914_exchange_dictionaries" /// Requires Atomic database ], "database-replicated": [ /// Unclassified From 686bf75f78345cfa2d560581c65e5a97e7e1b605 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 27 Jun 2021 03:11:48 +0300 Subject: [PATCH 23/30] This performance test does not run in CI - remove --- tests/performance/nyc_taxi.xml | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 tests/performance/nyc_taxi.xml diff --git a/tests/performance/nyc_taxi.xml b/tests/performance/nyc_taxi.xml deleted file mode 100644 index b8d9621e3eb..00000000000 --- a/tests/performance/nyc_taxi.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - trips_mergetree - - - SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type - SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count - SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year - SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC - From ebc2fbfd63471c62f26bb83784cfa21d8243aba4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 27 Jun 2021 03:14:42 +0300 Subject: [PATCH 24/30] Performance test: be more generous --- docker/test/performance-comparison/report.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/test/performance-comparison/report.py b/docker/test/performance-comparison/report.py index 13b18cda326..35e1008e0d7 100755 --- a/docker/test/performance-comparison/report.py +++ b/docker/test/performance-comparison/report.py @@ -561,7 +561,7 @@ if args.report == 'main': # Don't show mildly unstable queries, only the very unstable ones we # treat as errors. if very_unstable_queries: - if very_unstable_queries > 3: + if very_unstable_queries > 5: error_tests += very_unstable_queries status = 'failure' message_array.append(str(very_unstable_queries) + ' unstable') From 4b994fc3c75334380838bd3457136d7e743ae7c4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 27 Jun 2021 15:38:08 +0300 Subject: [PATCH 25/30] Change error code in LIVE VIEW --- src/Storages/LiveView/StorageLiveView.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index 19c3992276f..e1da02c5243 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -49,7 +49,6 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; extern const int INCORRECT_QUERY; extern const int TABLE_WAS_NOT_DROPPED; extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW; @@ -84,7 +83,7 @@ static StorageID extractDependentTable(ASTPtr & query, ContextPtr context, const if (!ast_select) throw Exception("Logical error while creating StorageLiveView." " Could not retrieve table name from select query.", - DB::ErrorCodes::LOGICAL_ERROR); + DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); if (ast_select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); From 4dbd659d64218733234a0ce98424fab5c8a4b9f9 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 27 Jun 2021 15:40:07 +0300 Subject: [PATCH 26/30] Change error code in LIVE VIEW --- src/Storages/LiveView/StorageLiveView.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index e1da02c5243..f54abda6d7f 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -81,8 +81,7 @@ static StorageID extractDependentTable(ASTPtr & query, ContextPtr context, const { auto * ast_select = subquery->as(); if (!ast_select) - throw Exception("Logical error while creating StorageLiveView." - " Could not retrieve table name from select query.", + throw Exception("LIVE VIEWs are only supported for queries from tables, but there is no table name in select query.", DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); if (ast_select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); From 1129b85f3c443b655fc7eb1beb6cb46de52f60e1 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Sun, 27 Jun 2021 15:40:19 +0300 Subject: [PATCH 27/30] Create the external_table_functions_use_nulls setting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Задокументировал настройку external_table_functions_use_nulls. --- docs/en/operations/settings/settings.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 08cf9daeb28..c584776a3b2 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3145,4 +3145,19 @@ SETTINGS index_granularity = 8192 │ └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ ``` +## external_table_functions_use_nulls {#external-table-functions-use-nulls} + +Defines how [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) and [odbc](../../sql-reference/table-functions/odbc.md)] table functions use Nullable columns. + +Possible values: + +- 0 — The table function explicitly uses Nullable columns. +- 1 — The table function implicitly uses Nullable columns. + +Default value: `1`. + +**Usage** + +If the setting is set to `0`, the table function does not make Nullable columns and insert default values instead of NULL. This is also applicable for NULL values inside arrays. + [Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) From a21577925692de75b08eea22c911947e31a1dc62 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Sun, 27 Jun 2021 16:04:35 +0300 Subject: [PATCH 28/30] Translate to Russian MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Перевел на русский язык. --- docs/en/operations/settings/settings.md | 2 -- docs/ru/operations/settings/settings.md | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index c584776a3b2..d5ba2d5a653 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3159,5 +3159,3 @@ Default value: `1`. **Usage** If the setting is set to `0`, the table function does not make Nullable columns and insert default values instead of NULL. This is also applicable for NULL values inside arrays. - -[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) diff --git a/docs/ru/operations/settings/settings.md b/docs/ru/operations/settings/settings.md index be3695badc5..06d6f2b8b61 100644 --- a/docs/ru/operations/settings/settings.md +++ b/docs/ru/operations/settings/settings.md @@ -3023,4 +3023,17 @@ SETTINGS index_granularity = 8192 │ └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ ``` -[Оригинальная статья](https://clickhouse.tech/docs/ru/operations/settings/settings/) +## external_table_functions_use_nulls {#external-table-functions-use-nulls} + +Определяет, как табличные функции [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) и [odbc](../../sql-reference/table-functions/odbc.md)] используют Nullable столбцы. + +Возможные значения: + +- 0 — табличная функция явно использует Nullable столбцы. +- 1 — табличная функция неявно использует Nullable столбцы. + +Значение по умолчанию: `1`. + +**Использование** + +Если установлено значение `0`, то табличная функция не делает Nullable столбцы, а вместо NULL выставляет значения по умолчанию для скалярного типа. Это также применимо для значений NULL внутри массивов. From f6e67d3dc1ed06df521dbd5ff1838d5ee836dbe0 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Sun, 27 Jun 2021 18:22:34 +0300 Subject: [PATCH 29/30] Update StorageDistributed.cpp --- src/Storages/StorageDistributed.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index d43fd1532a1..15b1421c635 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -801,6 +801,8 @@ void StorageDistributed::startup() return; const auto & disks = data_volume->getDisks(); + + /// Make initialization for large number of disks parallel. ThreadPool pool(disks.size()); for (const DiskPtr & disk : disks) From 534d33fb9d9b9b7499fb5bd79374e74f4a8d4d75 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Sun, 27 Jun 2021 20:21:40 +0300 Subject: [PATCH 30/30] Update settings.md --- docs/en/operations/settings/settings.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index d5ba2d5a653..056682a824e 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3158,4 +3158,4 @@ Default value: `1`. **Usage** -If the setting is set to `0`, the table function does not make Nullable columns and insert default values instead of NULL. This is also applicable for NULL values inside arrays. +If the setting is set to `0`, the table function does not make Nullable columns and inserts default values instead of NULL. This is also applicable for NULL values inside arrays.