diff --git a/.gitmodules b/.gitmodules index 13ed2614f4d..ace36122e6e 100644 --- a/.gitmodules +++ b/.gitmodules @@ -158,7 +158,7 @@ url = https://github.com/openldap/openldap.git [submodule "contrib/AMQP-CPP"] path = contrib/AMQP-CPP - url = https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git + url = https://github.com/ClickHouse-Extras/AMQP-CPP.git [submodule "contrib/cassandra"] path = contrib/cassandra url = https://github.com/ClickHouse-Extras/cpp-driver.git diff --git a/contrib/AMQP-CPP b/contrib/AMQP-CPP index 1c08399ab0a..d63e1f01658 160000 --- a/contrib/AMQP-CPP +++ b/contrib/AMQP-CPP @@ -1 +1 @@ -Subproject commit 1c08399ab0ab9e4042ef8e2bbe9e208e5dcbc13b +Subproject commit d63e1f016582e9faaaf279aa24513087a07bc6e7 diff --git a/contrib/amqpcpp-cmake/CMakeLists.txt b/contrib/amqpcpp-cmake/CMakeLists.txt index ca79b84c523..4853983680e 100644 --- a/contrib/amqpcpp-cmake/CMakeLists.txt +++ b/contrib/amqpcpp-cmake/CMakeLists.txt @@ -16,6 +16,7 @@ set (SRCS ${LIBRARY_DIR}/src/flags.cpp ${LIBRARY_DIR}/src/linux_tcp/openssl.cpp ${LIBRARY_DIR}/src/linux_tcp/tcpconnection.cpp + ${LIBRARY_DIR}/src/inbuffer.cpp ${LIBRARY_DIR}/src/receivedframe.cpp ${LIBRARY_DIR}/src/table.cpp ${LIBRARY_DIR}/src/watchable.cpp diff --git a/docs/en/sql-reference/functions/string-search-functions.md b/docs/en/sql-reference/functions/string-search-functions.md index a625af14505..5f08417f349 100644 --- a/docs/en/sql-reference/functions/string-search-functions.md +++ b/docs/en/sql-reference/functions/string-search-functions.md @@ -360,6 +360,89 @@ Extracts a fragment of a string using a regular expression. If ‘haystack’ do Extracts all the fragments of a string using a regular expression. If ‘haystack’ doesn’t match the ‘pattern’ regex, an empty string is returned. Returns an array of strings consisting of all matches to the regex. In general, the behavior is the same as the ‘extract’ function (it takes the first subpattern, or the entire expression if there isn’t a subpattern). +## extractAllGroupsHorizontal {#extractallgroups-horizontal} + +Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where the first array includes all fragments matching the first group, the second array - matching the second group, etc. + +!!! note "Note" + `extractAllGroupsHorizontal` function is slower than [extractAllGroupsVertical](#extractallgroups-vertical). + +**Syntax** + +``` sql +extractAllGroupsHorizontal(haystack, pattern) +``` + +**Parameters** + +- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md). +- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md). + +**Returned value** + +- Type: [Array](../../sql-reference/data-types/array.md). + +If `haystack` doesn’t match the `pattern` regex, an array of empty arrays is returned. + +**Example** + +Query: + +``` sql +SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)') +``` + +Result: + +``` text +┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐ +│ [['abc','def','ghi'],['111','222','333']] │ +└──────────────────────────────────────────────────────────────────────────────────────────┘ +``` + +**See also** +- [extractAllGroupsVertical](#extractallgroups-vertical) + +## extractAllGroupsVertical {#extractallgroups-vertical} + +Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where each array includes matching fragments from every group. Fragments are grouped in order of appearance in the `haystack`. + +**Syntax** + +``` sql +extractAllGroupsVertical(haystack, pattern) +``` + +**Parameters** + +- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md). +- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md). + +**Returned value** + +- Type: [Array](../../sql-reference/data-types/array.md). + +If `haystack` doesn’t match the `pattern` regex, an empty array is returned. + +**Example** + +Query: + +``` sql +SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)') +``` + +Result: + +``` text +┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐ +│ [['abc','111'],['def','222'],['ghi','333']] │ +└────────────────────────────────────────────────────────────────────────────────────────┘ +``` + +**See also** +- [extractAllGroupsHorizontal](#extractallgroups-horizontal) + ## like(haystack, pattern), haystack LIKE pattern operator {#function-like} Checks whether a string matches a simple regular expression. diff --git a/docs/ru/sql-reference/functions/string-search-functions.md b/docs/ru/sql-reference/functions/string-search-functions.md index de713031046..29dd67fd0eb 100644 --- a/docs/ru/sql-reference/functions/string-search-functions.md +++ b/docs/ru/sql-reference/functions/string-search-functions.md @@ -341,6 +341,89 @@ Result: Извлечение всех фрагментов строки по регулярному выражению. Если haystack не соответствует регулярному выражению pattern, то возвращается пустая строка. Возвращается массив строк, состоящий из всех соответствий регулярному выражению. В остальном, поведение аналогично функции extract (по прежнему, вынимается первый subpattern, или всё выражение, если subpattern-а нет). +## extractAllGroupsHorizontal {#extractallgroups-horizontal} + +Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где первый массив содержит все фрагменты, соответствующие первой группе регулярного выражения, второй массив - соответствующие второй группе, и т.д. + +!!! note "Замечание" + Функция `extractAllGroupsHorizontal` работает медленнее, чем функция [extractAllGroupsVertical](#extractallgroups-vertical). + +**Синтаксис** + +``` sql +extractAllGroupsHorizontal(haystack, pattern) +``` + +**Параметры** + +- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md). +- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md). + +**Возвращаемое значение** + +- Тип: [Array](../../sql-reference/data-types/array.md). + +Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается массив пустых массивов. + +**Пример** + +Запрос: + +``` sql +SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)') +``` + +Результат: + +``` text +┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐ +│ [['abc','def','ghi'],['111','222','333']] │ +└──────────────────────────────────────────────────────────────────────────────────────────┘ +``` + +**См. также** +- функция [extractAllGroupsVertical](#extractallgroups-vertical) + +## extractAllGroupsVertical {#extractallgroups-vertical} + +Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где каждый массив содержит по одному фрагменту, соответствующему каждой группе регулярного выражения. Фрагменты группируются в массивы в соответствии с порядком появления в исходной строке. + +**Синтаксис** + +``` sql +extractAllGroupsVertical(haystack, pattern) +``` + +**Параметры** + +- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md). +- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md). + +**Возвращаемое значение** + +- Тип: [Array](../../sql-reference/data-types/array.md). + +Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается пустой массив. + +**Пример** + +Запрос: + +``` sql +SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)') +``` + +Результат: + +``` text +┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐ +│ [['abc','111'],['def','222'],['ghi','333']] │ +└────────────────────────────────────────────────────────────────────────────────────────┘ +``` + +**См. также** +- функция [extractAllGroupsHorizontal](#extractallgroups-horizontal) + ## like(haystack, pattern), оператор haystack LIKE pattern {#function-like} Проверка строки на соответствие простому регулярному выражению. diff --git a/src/Disks/DiskCacheWrapper.cpp b/src/Disks/DiskCacheWrapper.cpp index c60f69920f4..7ce963380d4 100644 --- a/src/Disks/DiskCacheWrapper.cpp +++ b/src/Disks/DiskCacheWrapper.cpp @@ -194,6 +194,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode auto src_buffer = cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, 0); auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold); copyData(*src_buffer, *dst_buffer); + dst_buffer->finalize(); }, buf_size); } diff --git a/src/Disks/IDisk.cpp b/src/Disks/IDisk.cpp index 5ae96ca6c23..8f11d6549e9 100644 --- a/src/Disks/IDisk.cpp +++ b/src/Disks/IDisk.cpp @@ -27,6 +27,7 @@ void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, cons auto in = from_disk.readFile(from_path); auto out = to_disk.writeFile(to_path); copyData(*in, *out); + out->finalize(); } diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 1c7a5e24282..862fd388476 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -123,6 +124,9 @@ void registerDiskS3(DiskFactory & factory) if (proxy_config) cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); }; + cfg.retryStrategy = std::make_shared( + config.getUInt(config_prefix + ".retry_attempts", 10)); + auto client = S3::ClientFactory::instance().create( cfg, uri.is_virtual_hosted_style, diff --git a/src/Interpreters/InJoinSubqueriesPreprocessor.cpp b/src/Interpreters/InJoinSubqueriesPreprocessor.cpp index 244693396ca..2e922393131 100644 --- a/src/Interpreters/InJoinSubqueriesPreprocessor.cpp +++ b/src/Interpreters/InJoinSubqueriesPreprocessor.cpp @@ -27,7 +27,9 @@ namespace StoragePtr tryGetTable(const ASTPtr & database_and_table, const Context & context) { - auto table_id = context.resolveStorageID(database_and_table); + auto table_id = context.tryResolveStorageID(database_and_table); + if (!table_id) + return {}; return DatabaseCatalog::instance().tryGetTable(table_id, context); } diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index 858f53cb047..271903add86 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -566,7 +566,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st } if (node->exception) - finish(); + cancel(); if (finished) break; diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index f674663154d..f5feca7a2db 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -101,7 +101,7 @@ public: void setQuota(const std::shared_ptr & quota); /// Do not allow to change the table while the processors of pipe are alive. - void addTableLock(const TableLockHolder & lock) { holder.table_locks.push_back(lock); } + void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); } /// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible. void addInterpreterContext(std::shared_ptr context) { holder.interpreter_context.emplace_back(std::move(context)); } void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index d91cfe89840..7bb2a4bd5c0 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -106,7 +106,7 @@ public: const Block & getHeader() const { return pipe.getHeader(); } - void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); } + void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); } void addInterpreterContext(std::shared_ptr context) { pipe.addInterpreterContext(std::move(context)); } void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } void addQueryPlan(std::unique_ptr plan) { pipe.addQueryPlan(std::move(plan)); } diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.cpp b/src/Processors/QueryPlan/ReadFromStorageStep.cpp new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.h b/src/Processors/QueryPlan/ReadFromStorageStep.h new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index f9fb157942a..3da0e203f14 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -276,7 +276,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( ReadBufferFromString ttl_infos_buffer(ttl_infos_string); assertString("ttl format version: 1\n", ttl_infos_buffer); ttl_infos.read(ttl_infos_buffer); - reservation = data.reserveSpacePreferringTTLRules(sum_files_size, ttl_infos, std::time(nullptr), 0, true); + reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true); } else reservation = data.reserveSpace(sum_files_size); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 486e444763d..40f12428561 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -88,6 +88,7 @@ void IMergeTreeDataPart::MinMaxIndex::store( out_hashing.next(); out_checksums.files[file_name].file_size = out_hashing.count(); out_checksums.files[file_name].file_hash = out_hashing.getHash(); + out->finalize(); } } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index f2b26a928c1..ef96c446287 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3037,28 +3037,31 @@ ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr spa return space->reserve(expected_size); } -ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size, - const IMergeTreeDataPart::TTLInfos & ttl_infos, - time_t time_of_move, - size_t min_volume_index, - bool is_insert) const +ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules( + const StorageMetadataPtr & metadata_snapshot, + UInt64 expected_size, + const IMergeTreeDataPart::TTLInfos & ttl_infos, + time_t time_of_move, + size_t min_volume_index, + bool is_insert) const { expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); - ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index, is_insert); + ReservationPtr reservation = tryReserveSpacePreferringTTLRules(metadata_snapshot, expected_size, ttl_infos, time_of_move, min_volume_index, is_insert); return checkAndReturnReservation(expected_size, std::move(reservation)); } -ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size, - const IMergeTreeDataPart::TTLInfos & ttl_infos, - time_t time_of_move, - size_t min_volume_index, - bool is_insert) const +ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules( + const StorageMetadataPtr & metadata_snapshot, + UInt64 expected_size, + const IMergeTreeDataPart::TTLInfos & ttl_infos, + time_t time_of_move, + size_t min_volume_index, + bool is_insert) const { expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); - auto metadata_snapshot = getInMemoryMetadataPtr(); ReservationPtr reservation; auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 1125eb32b66..5c18661dad1 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -632,6 +632,7 @@ public: /// Reserves space at least 1MB preferring best destination according to `ttl_infos`. ReservationPtr reserveSpacePreferringTTLRules( + const StorageMetadataPtr & metadata_snapshot, UInt64 expected_size, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move, @@ -639,6 +640,7 @@ public: bool is_insert = false) const; ReservationPtr tryReserveSpacePreferringTTLRules( + const StorageMetadataPtr & metadata_snapshot, UInt64 expected_size, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move, diff --git a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp index 7d3c00aa19c..92c8a66e828 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp @@ -182,6 +182,10 @@ std::optional selectTTLDescriptionForTTLInfos(const TTLDescripti for (auto ttl_entry_it = descriptions.begin(); ttl_entry_it != descriptions.end(); ++ttl_entry_it) { auto ttl_info_it = ttl_info_map.find(ttl_entry_it->result_column); + + if (ttl_info_it == ttl_info_map.end()) + continue; + time_t ttl_time; if (use_max) @@ -190,8 +194,7 @@ std::optional selectTTLDescriptionForTTLInfos(const TTLDescripti ttl_time = ttl_info_it->second.min; /// Prefer TTL rule which went into action last. - if (ttl_info_it != ttl_info_map.end() - && ttl_time <= current_time + if (ttl_time <= current_time && best_ttl_time <= ttl_time) { best_entry_it = ttl_entry_it; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index c81894ee36d..0b22f1271e3 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -229,6 +229,8 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart: marks.next(); addToChecksums(checksums); + plain_file->finalize(); + marks_file->finalize(); if (sync) { plain_file->sync(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 8295b881d87..c6b689da33a 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -17,8 +17,12 @@ namespace void MergeTreeDataPartWriterOnDisk::Stream::finalize() { compressed.next(); - plain_file->next(); + /// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually. + plain_hashing.next(); marks.next(); + + plain_file->finalize(); + marks_file->finalize(); } void MergeTreeDataPartWriterOnDisk::Stream::sync() const @@ -331,6 +335,7 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization( index_stream->next(); checksums.files["primary.idx"].file_size = index_stream->count(); checksums.files["primary.idx"].file_hash = index_stream->getHash(); + index_file_stream->finalize(); if (sync) index_file_stream->sync(); index_stream = nullptr; diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 739aff31a06..e42bb786f46 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false); NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); - ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr), 0, true); + ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true); VolumePtr volume = data.getStoragePolicy()->getVolume(0); auto new_data_part = data.createPart( diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 4a846f63b7c..9b02b9f1fd8 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -156,6 +156,7 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr out_hashing.next(); checksums.files["partition.dat"].file_size = out_hashing.count(); checksums.files["partition.dat"].file_hash = out_hashing.getHash(); + out->finalize(); } void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row) diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index c91ed545ac5..1b40f9ab292 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -148,6 +148,7 @@ void MergedBlockOutputStream::finalizePartOnDisk( count_out_hashing.next(); checksums.files["count.txt"].file_size = count_out_hashing.count(); checksums.files["count.txt"].file_hash = count_out_hashing.getHash(); + count_out->finalize(); if (sync) count_out->sync(); } @@ -160,6 +161,7 @@ void MergedBlockOutputStream::finalizePartOnDisk( new_part->ttl_infos.write(out_hashing); checksums.files["ttl.txt"].file_size = out_hashing.count(); checksums.files["ttl.txt"].file_hash = out_hashing.getHash(); + out->finalize(); if (sync) out->sync(); } @@ -170,6 +172,7 @@ void MergedBlockOutputStream::finalizePartOnDisk( /// Write a file with a description of columns. auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096); part_columns.writeText(*out); + out->finalize(); if (sync) out->sync(); } @@ -178,6 +181,7 @@ void MergedBlockOutputStream::finalizePartOnDisk( { auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096); DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out); + out->finalize(); } else { @@ -189,6 +193,7 @@ void MergedBlockOutputStream::finalizePartOnDisk( /// Write file with checksums. auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096); checksums.write(*out); + out->finalize(); if (sync) out->sync(); } diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 25e232dc4ad..f16df6983ed 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -21,81 +21,64 @@ namespace ErrorCodes class MemorySource : public SourceWithProgress { + using InitializerFunc = std::function; public: - /// We use range [first, last] which includes right border. /// Blocks are stored in std::list which may be appended in another thread. - /// We don't use synchronisation here, because elements in range [first, last] won't be modified. + /// We use pointer to the beginning of the list and its current size. + /// We don't need synchronisation in this reader, because while we hold SharedLock on storage, + /// only new elements can be added to the back of the list, so our iterators remain valid + MemorySource( Names column_names_, - BlocksList::iterator first_, + BlocksList::const_iterator first_, size_t num_blocks_, const StorageMemory & storage, - const StorageMetadataPtr & metadata_snapshot) + const StorageMetadataPtr & metadata_snapshot, + InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &) {}) : SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID())) , column_names(std::move(column_names_)) , current_it(first_) , num_blocks(num_blocks_) + , initializer_func(std::move(initializer_func_)) { } - /// If called, will initialize the number of blocks at first read. - /// It allows to read data which was inserted into memory table AFTER Storage::read was called. - /// This hack is needed for global subqueries. - void delayInitialization(BlocksList * data_, std::mutex * mutex_) - { - data = data_; - mutex = mutex_; - } - String getName() const override { return "Memory"; } protected: Chunk generate() override { - if (data) + if (!postponed_init_done) { - std::lock_guard guard(*mutex); - current_it = data->begin(); - num_blocks = data->size(); - is_finished = num_blocks == 0; - - data = nullptr; - mutex = nullptr; + initializer_func(current_it, num_blocks); + postponed_init_done = true; } - if (is_finished) - { + if (current_block_idx == num_blocks) return {}; - } - else - { - const Block & src = *current_it; - Columns columns; - columns.reserve(column_names.size()); - /// Add only required columns to `res`. - for (const auto & name : column_names) - columns.emplace_back(src.getByName(name).column); + const Block & src = *current_it; + Columns columns; + columns.reserve(column_names.size()); - ++current_block_idx; + /// Add only required columns to `res`. + for (const auto & name : column_names) + columns.push_back(src.getByName(name).column); - if (current_block_idx == num_blocks) - is_finished = true; - else - ++current_it; + if (++current_block_idx < num_blocks) + ++current_it; - return Chunk(std::move(columns), src.rows()); - } + return Chunk(std::move(columns), src.rows()); } -private: - Names column_names; - BlocksList::iterator current_it; - size_t current_block_idx = 0; - size_t num_blocks; - bool is_finished = false; - BlocksList * data = nullptr; - std::mutex * mutex = nullptr; +private: + const Names column_names; + BlocksList::const_iterator current_it; + size_t num_blocks; + size_t current_block_idx = 0; + + bool postponed_init_done = false; + InitializerFunc initializer_func; }; @@ -113,9 +96,18 @@ public: void write(const Block & block) override { + const auto size_bytes_diff = block.allocatedBytes(); + const auto size_rows_diff = block.rows(); + metadata_snapshot->check(block, true); - std::lock_guard lock(storage.mutex); - storage.data.push_back(block); + { + std::lock_guard lock(storage.mutex); + storage.data.push_back(block); + + storage.total_size_bytes.fetch_add(size_bytes_diff, std::memory_order_relaxed); + storage.total_size_rows.fetch_add(size_rows_diff, std::memory_order_relaxed); + } + } private: StorageMemory & storage; @@ -144,8 +136,6 @@ Pipe StorageMemory::read( { metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); - std::lock_guard lock(mutex); - if (delay_read_for_global_subqueries) { /// Note: for global subquery we use single source. @@ -156,11 +146,22 @@ Pipe StorageMemory::read( /// set for IN or hash table for JOIN, which can't be done concurrently. /// Since no other manipulation with data is done, multiple sources shouldn't give any profit. - auto source = std::make_shared(column_names, data.begin(), data.size(), *this, metadata_snapshot); - source->delayInitialization(&data, &mutex); - return Pipe(std::move(source)); + return Pipe( + std::make_shared( + column_names, data.end(), 0, *this, metadata_snapshot, + /// This hack is needed for global subqueries. + /// It allows to set up this Source for read AFTER Storage::read() has been called and just before actual reading + [this](BlocksList::const_iterator & current_it, size_t & num_blocks) + { + std::lock_guard guard(mutex); + current_it = data.begin(); + num_blocks = data.size(); + } + )); } + std::lock_guard lock(mutex); + size_t size = data.size(); if (num_streams > size) @@ -168,7 +169,7 @@ Pipe StorageMemory::read( Pipes pipes; - BlocksList::iterator it = data.begin(); + BlocksList::const_iterator it = data.begin(); size_t offset = 0; for (size_t stream = 0; stream < num_streams; ++stream) @@ -201,31 +202,32 @@ void StorageMemory::drop() { std::lock_guard lock(mutex); data.clear(); + total_size_bytes.store(0, std::memory_order_relaxed); + total_size_rows.store(0, std::memory_order_relaxed); } + void StorageMemory::truncate( const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) { std::lock_guard lock(mutex); data.clear(); + total_size_bytes.store(0, std::memory_order_relaxed); + total_size_rows.store(0, std::memory_order_relaxed); } + std::optional StorageMemory::totalRows() const { - UInt64 rows = 0; - std::lock_guard lock(mutex); - for (const auto & buffer : data) - rows += buffer.rows(); - return rows; + /// All modifications of these counters are done under mutex which automatically guarantees synchronization/consistency + /// When run concurrently we are fine with any value: "before" or "after" + return total_size_rows.load(std::memory_order_relaxed); } + std::optional StorageMemory::totalBytes() const { - UInt64 bytes = 0; - std::lock_guard lock(mutex); - for (const auto & buffer : data) - bytes += buffer.allocatedBytes(); - return bytes; + return total_size_bytes.load(std::memory_order_relaxed); } diff --git a/src/Storages/StorageMemory.h b/src/Storages/StorageMemory.h index e67e3015028..6b525cd6dbb 100644 --- a/src/Storages/StorageMemory.h +++ b/src/Storages/StorageMemory.h @@ -1,5 +1,7 @@ #pragma once +#include +#include #include #include @@ -93,6 +95,9 @@ private: bool delay_read_for_global_subqueries = false; + std::atomic total_size_bytes = 0; + std::atomic total_size_rows = 0; + protected: StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_); }; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 6c0b082c6a9..5b9fe5f6dcf 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -300,7 +300,12 @@ struct CurrentlyMergingPartsTagger StorageMergeTree & storage; public: - CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation) + CurrentlyMergingPartsTagger( + FutureMergedMutatedPart & future_part_, + size_t total_size, + StorageMergeTree & storage_, + const StorageMetadataPtr & metadata_snapshot, + bool is_mutation) : future_part(future_part_), storage(storage_) { /// Assume mutex is already locked, because this method is called from mergeTask. @@ -318,7 +323,7 @@ public: max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk())); } - reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr), max_volume_index); + reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index); } if (!reserved_space) { @@ -729,7 +734,7 @@ bool StorageMergeTree::merge( return false; } - merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false); + merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false); auto table_id = getStorageID(); merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); } @@ -870,7 +875,7 @@ bool StorageMergeTree::tryMutatePart() future_part.name = part->getNewName(new_part_info); future_part.type = part->getType(); - tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true); + tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); break; } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ef06d27101b..0d67c238250 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1416,11 +1416,11 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) ttl_infos.update(part_ptr->ttl_infos); max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk())); } - ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge, - ttl_infos, time(nullptr), max_volume_index); - auto table_lock = lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations); + StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr(); + ReservationPtr reserved_space = reserveSpacePreferringTTLRules( + metadata_snapshot, estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index); FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type); if (future_merged_part.name != entry.new_part_name) diff --git a/tests/ci/ci_config.json b/tests/ci/ci_config.json index 6810b54bd32..6cc7a4a398a 100644 --- a/tests/ci/ci_config.json +++ b/tests/ci/ci_config.json @@ -102,7 +102,7 @@ "with_coverage": false }, { - "compiler": "clang-10", + "compiler": "clang-11", "build-type": "", "sanitizer": "", "package-type": "binary", @@ -134,7 +134,7 @@ "with_coverage": false }, { - "compiler": "clang-10-darwin", + "compiler": "clang-11-darwin", "build-type": "", "sanitizer": "", "package-type": "binary", @@ -144,7 +144,7 @@ "with_coverage": false }, { - "compiler": "clang-10-aarch64", + "compiler": "clang-11-aarch64", "build-type": "", "sanitizer": "", "package-type": "binary", @@ -154,7 +154,7 @@ "with_coverage": false }, { - "compiler": "clang-10-freebsd", + "compiler": "clang-11-freebsd", "build-type": "", "sanitizer": "", "package-type": "binary", diff --git a/tests/integration/test_merge_tree_s3_failover/__init__.py b/tests/integration/test_merge_tree_s3_failover/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.d/log_conf.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.d/log_conf.xml new file mode 100644 index 00000000000..318a6bca95d --- /dev/null +++ b/tests/integration/test_merge_tree_s3_failover/configs/config.d/log_conf.xml @@ -0,0 +1,12 @@ + + 3 + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml new file mode 100644 index 00000000000..d4d53ab5efe --- /dev/null +++ b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml @@ -0,0 +1,26 @@ + + + + + s3 + + http://resolver:8080/root/data/ + minio + minio123 + + true + + 0 + + + + + +
+ s3 +
+
+
+
+
+
diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.d/users.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.d/users.xml new file mode 100644 index 00000000000..797113053f4 --- /dev/null +++ b/tests/integration/test_merge_tree_s3_failover/configs/config.d/users.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.xml new file mode 100644 index 00000000000..24b7344df3a --- /dev/null +++ b/tests/integration/test_merge_tree_s3_failover/configs/config.xml @@ -0,0 +1,20 @@ + + + 9000 + 127.0.0.1 + + + + true + none + + AcceptCertificateHandler + + + + + 500 + 5368709120 + ./clickhouse/ + users.xml + diff --git a/tests/integration/test_merge_tree_s3_failover/s3_endpoint/endpoint.py b/tests/integration/test_merge_tree_s3_failover/s3_endpoint/endpoint.py new file mode 100644 index 00000000000..e7614dd9536 --- /dev/null +++ b/tests/integration/test_merge_tree_s3_failover/s3_endpoint/endpoint.py @@ -0,0 +1,49 @@ +from bottle import request, route, run, response + + +# Endpoint can be configured to throw 500 error on N-th request attempt. +# In usual situation just redirects to original Minio server. + +# Dict to the number of request should be failed. +cache = {} + + +@route('/fail_request/<_request_number>') +def fail_request(_request_number): + request_number = int(_request_number) + if request_number > 0: + cache['request_number'] = request_number + else: + cache.pop('request_number', None) + return 'OK' + + +# Handle for MultipleObjectsDelete. +@route('/<_bucket>', ['POST']) +def delete(_bucket): + response.set_header("Location", "http://minio1:9001/" + _bucket + "?" + request.query_string) + response.status = 307 + return 'Redirected' + + +@route('/<_bucket>/<_path:path>', ['GET', 'POST', 'PUT', 'DELETE']) +def server(_bucket, _path): + if cache.get('request_number', None): + request_number = cache.pop('request_number') - 1 + if request_number > 0: + cache['request_number'] = request_number + else: + response.status = 500 + return 'Expected Error' + + response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path) + response.status = 307 + return 'Redirected' + + +@route('/') +def ping(): + return 'OK' + + +run(host='0.0.0.0', port=8080) diff --git a/tests/integration/test_merge_tree_s3_failover/test.py b/tests/integration/test_merge_tree_s3_failover/test.py new file mode 100644 index 00000000000..59006e2e99a --- /dev/null +++ b/tests/integration/test_merge_tree_s3_failover/test.py @@ -0,0 +1,117 @@ +import logging +import os +import time + +import pytest + +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster + +logging.getLogger().setLevel(logging.INFO) +logging.getLogger().addHandler(logging.StreamHandler()) + + +# Runs custom python-based S3 endpoint. +def run_endpoint(cluster): + logging.info("Starting custom S3 endpoint") + container_id = cluster.get_container_id('resolver') + current_dir = os.path.dirname(__file__) + cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_endpoint", "endpoint.py"), "endpoint.py") + cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True) + + # Wait for S3 endpoint start + for attempt in range(10): + ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'), + ["curl", "-s", "http://resolver:8080/"], nothrow=True) + if ping_response != 'OK': + if attempt == 9: + assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response) + else: + time.sleep(1) + else: + break + + logging.info("S3 endpoint started") + + +def fail_request(cluster, request): + response = cluster.exec_in_container(cluster.get_container_id('resolver'), + ["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)]) + assert response == 'OK', 'Expected "OK", but got "{}"'.format(response) + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance("node", + main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml"], + with_minio=True) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + run_endpoint(cluster) + + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture(autouse=True) +def drop_table(cluster): + yield + node = cluster.instances["node"] + node.query("DROP TABLE IF EXISTS s3_failover_test NO DELAY") + + +# S3 request will be failed for an appropriate part file write. +FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt +FILES_PER_PART_WIDE = FILES_PER_PART_BASE + 1 + 1 + 3 * 2 # Primary index, MinMax, Mark and data file for column(s) +FILES_PER_PART_COMPACT = FILES_PER_PART_BASE + 1 + 1 + 2 + + +@pytest.mark.parametrize( + "min_bytes_for_wide_part,request_count", + [ + (0, FILES_PER_PART_WIDE), + (1024 * 1024, FILES_PER_PART_COMPACT) + ] +) +def test_write_failover(cluster, min_bytes_for_wide_part, request_count): + node = cluster.instances["node"] + + node.query( + """ + CREATE TABLE s3_failover_test ( + dt Date, + id Int64, + data String + ) ENGINE=MergeTree() + ORDER BY id + PARTITION BY dt + SETTINGS storage_policy='s3', min_bytes_for_wide_part={} + """ + .format(min_bytes_for_wide_part) + ) + + for request in range(request_count + 1): + # Fail N-th request to S3. + fail_request(cluster, request + 1) + + data = "('2020-03-01',0,'data'),('2020-03-01',1,'data')" + positive = request == request_count + try: + node.query("INSERT INTO s3_failover_test VALUES {}".format(data)) + + assert positive, "Insert query should be failed, request {}".format(request) + except QueryRuntimeException as e: + assert not positive, "Insert query shouldn't be failed, request {}".format(request) + assert str(e).find("Expected Error") != -1, "Unexpected error {}".format(str(e)) + + if positive: + # Disable request failing. + fail_request(cluster, 0) + + assert node.query("CHECK TABLE s3_failover_test") == '1\n' + assert node.query("SELECT * FROM s3_failover_test FORMAT Values") == data diff --git a/tests/queries/0_stateless/01487_distributed_in_not_default_db.reference b/tests/queries/0_stateless/01487_distributed_in_not_default_db.reference new file mode 100644 index 00000000000..0d66ea1aee9 --- /dev/null +++ b/tests/queries/0_stateless/01487_distributed_in_not_default_db.reference @@ -0,0 +1,2 @@ +0 +1 diff --git a/tests/queries/0_stateless/01487_distributed_in_not_default_db.sql b/tests/queries/0_stateless/01487_distributed_in_not_default_db.sql new file mode 100644 index 00000000000..f6f7471711a --- /dev/null +++ b/tests/queries/0_stateless/01487_distributed_in_not_default_db.sql @@ -0,0 +1,36 @@ +CREATE DATABASE IF NOT EXISTS shard_0; +CREATE DATABASE IF NOT EXISTS shard_1; +CREATE DATABASE IF NOT EXISTS main_01487; +CREATE DATABASE IF NOT EXISTS test_01487; + +USE main_01487; + +DROP TABLE IF EXISTS shard_0.l; +DROP TABLE IF EXISTS shard_1.l; +DROP TABLE IF EXISTS d; +DROP TABLE IF EXISTS t; + +CREATE TABLE shard_0.l (value UInt8) ENGINE = MergeTree ORDER BY value; +CREATE TABLE shard_1.l (value UInt8) ENGINE = MergeTree ORDER BY value; +CREATE TABLE t (value UInt8) ENGINE = Memory; + +INSERT INTO shard_0.l VALUES (0); +INSERT INTO shard_1.l VALUES (1); +INSERT INTO t VALUES (0), (1), (2); + +CREATE TABLE d AS t ENGINE = Distributed(test_cluster_two_shards_different_databases, currentDatabase(), t); + +USE test_01487; +DROP DATABASE test_01487; + +SELECT * FROM main_01487.d WHERE value IN (SELECT l.value FROM l) ORDER BY value; + +USE main_01487; + +DROP TABLE IF EXISTS shard_0.l; +DROP TABLE IF EXISTS shard_1.l; +DROP TABLE IF EXISTS d; +DROP TABLE IF EXISTS t; + +DROP DATABASE shard_0; +DROP DATABASE shard_1; diff --git a/tests/queries/0_stateless/01514_distributed_cancel_query_on_error.reference b/tests/queries/0_stateless/01514_distributed_cancel_query_on_error.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01514_distributed_cancel_query_on_error.sh b/tests/queries/0_stateless/01514_distributed_cancel_query_on_error.sh new file mode 100755 index 00000000000..a24bb00fdf3 --- /dev/null +++ b/tests/queries/0_stateless/01514_distributed_cancel_query_on_error.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../shell_config.sh + +# _shard_num: +# 1 on 127.2 +# 2 on 127.3 +# max_block_size to fail faster +# max_memory_usage/_shard_num/repeat() will allow failure on the first shard earlier. +opts=( + --max_memory_usage=3G + --max_block_size=50 + --max_threads=1 + --max_distributed_connections=2 +) +${CLICKHOUSE_CLIENT} "${opts[@]}" -q "SELECT groupArray(repeat('a', 1000*_shard_num)), number%100000 k from remote('127.{2,3}', system.numbers) GROUP BY k LIMIT 10e6" |& { + # the query should fail earlier on 127.3 and 127.2 should not even go to the memory limit exceeded error. + fgrep -q 'DB::Exception: Received from 127.3:9000. DB::Exception: Memory limit (for query) exceeded:' + # while if this will not correctly then it will got the exception from the 127.2:9000 and fail +}