diff --git a/contrib/xxHash b/contrib/xxHash index 3078dc6039f..bbb27a5efb8 160000 --- a/contrib/xxHash +++ b/contrib/xxHash @@ -1 +1 @@ -Subproject commit 3078dc6039f8c0bffcb1904f81cfe6b2c3209435 +Subproject commit bbb27a5efb85b92a0486cf361a8635715a53f6ba diff --git a/contrib/xxHash-cmake/CMakeLists.txt b/contrib/xxHash-cmake/CMakeLists.txt index 314094e9523..bd7192ae944 100644 --- a/contrib/xxHash-cmake/CMakeLists.txt +++ b/contrib/xxHash-cmake/CMakeLists.txt @@ -7,7 +7,7 @@ add_library(xxHash ${SRCS}) target_include_directories(xxHash SYSTEM BEFORE INTERFACE "${LIBRARY_DIR}") # XXH_INLINE_ALL - Make all functions inline, with implementations being directly included within xxhash.h. Inlining functions is beneficial for speed on small keys. -# https://github.com/Cyan4973/xxHash/tree/v0.8.1#build-modifiers +# https://github.com/Cyan4973/xxHash/tree/v0.8.2#build-modifiers target_compile_definitions(xxHash PUBLIC XXH_INLINE_ALL) add_library(ch_contrib::xxHash ALIAS xxHash) diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 49b3bd606d1..1cbc9c49631 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -59,6 +59,7 @@ #include #include #include +#include #include #include #include @@ -1165,12 +1166,16 @@ void InterpreterSystemQuery::syncTransactionLog() } -void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &) +void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query) { getContext()->checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id); + SettingsChanges settings_changes; + if (query.query_settings) + settings_changes = query.query_settings->as()->changes; + if (auto * storage_distributed = dynamic_cast(DatabaseCatalog::instance().getTable(table_id, getContext()).get())) - storage_distributed->flushClusterNodesAllData(getContext()); + storage_distributed->flushClusterNodesAllData(getContext(), settings_changes); else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table {} is not distributed", table_id.getNameForLogs()); } diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index effc7207793..9215353e2b3 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -190,6 +190,21 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s case Type::SYNC_REPLICA: case Type::WAIT_LOADING_PARTS: case Type::FLUSH_DISTRIBUTED: + { + if (table) + { + settings.ostr << ' '; + print_database_table(); + } + + if (query_settings) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws << "SETTINGS " << (settings.hilite ? hilite_none : ""); + query_settings->formatImpl(settings, state, frame); + } + + break; + } case Type::RELOAD_DICTIONARY: case Type::RELOAD_MODEL: case Type::RELOAD_FUNCTION: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 70a9e27178d..18a804ebc45 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -109,6 +109,7 @@ public: ASTPtr database; ASTPtr table; + ASTPtr query_settings; String getDatabase() const; String getTable() const; @@ -158,6 +159,7 @@ public: if (database) { res->database = database->clone(); res->children.push_back(res->database); } if (table) { res->table = table->clone(); res->children.push_back(res->table); } + if (query_settings) { res->query_settings = query_settings->clone(); res->children.push_back(res->query_settings); } return res; } diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index df168e74772..81f9332c730 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -328,6 +329,21 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & } case Type::FLUSH_DISTRIBUTED: + { + if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ false)) + return false; + + ParserKeyword s_settings(Keyword::SETTINGS); + if (s_settings.ignore(pos, expected)) + { + ParserSetQuery parser_settings(/* parse_only_internals_= */ true); + if (!parser_settings.parse(pos, res->query_settings, expected)) + return false; + } + + break; + } + case Type::RESTORE_REPLICA: { if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ false)) @@ -616,6 +632,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & res->children.push_back(res->database); if (res->table) res->children.push_back(res->table); + if (res->query_settings) + res->children.push_back(res->query_settings); node = std::move(res); return true; diff --git a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp index 97268cf1389..8d95e49de57 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp @@ -60,7 +60,7 @@ bool DistributedAsyncInsertBatch::isEnoughSize() const || (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes); } -void DistributedAsyncInsertBatch::send() +void DistributedAsyncInsertBatch::send(const SettingsChanges & settings_changes) { if (files.empty()) return; @@ -84,14 +84,14 @@ void DistributedAsyncInsertBatch::send() { try { - sendBatch(); + sendBatch(settings_changes); } catch (const Exception & e) { if (split_batch_on_failure && files.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException())) { tryLogCurrentException(parent.log, "Trying to split batch due to"); - sendSeparateFiles(); + sendSeparateFiles(settings_changes); } else throw; @@ -201,7 +201,7 @@ void DistributedAsyncInsertBatch::readText(ReadBuffer & in) recovered = true; } -void DistributedAsyncInsertBatch::sendBatch() +void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_changes) { std::unique_ptr remote; bool compression_expected = false; @@ -228,7 +228,10 @@ void DistributedAsyncInsertBatch::sendBatch() if (!remote) { - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings); + Settings insert_settings = distributed_header.insert_settings; + insert_settings.applyChanges(settings_changes); + + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); connection = parent.pool->get(timeouts); compression_expected = connection->getCompression() == Protocol::Compression::Enable; @@ -240,7 +243,7 @@ void DistributedAsyncInsertBatch::sendBatch() remote = std::make_unique(*connection, timeouts, distributed_header.insert_query, - distributed_header.insert_settings, + insert_settings, distributed_header.client_info); } writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log); @@ -264,7 +267,7 @@ void DistributedAsyncInsertBatch::sendBatch() } } -void DistributedAsyncInsertBatch::sendSeparateFiles() +void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & settings_changes) { size_t broken_files = 0; @@ -277,18 +280,21 @@ void DistributedAsyncInsertBatch::sendSeparateFiles() ReadBufferFromFile in(file); const auto & distributed_header = DistributedAsyncInsertHeader::read(in, parent.log); + Settings insert_settings = distributed_header.insert_settings; + insert_settings.applyChanges(settings_changes); + // This function is called in a separated thread, so we set up the trace context from the file trace_context = distributed_header.createTracingContextHolder( __PRETTY_FUNCTION__, parent.storage.getContext()->getOpenTelemetrySpanLog()); - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); auto connection = parent.pool->get(timeouts); bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; RemoteInserter remote(*connection, timeouts, distributed_header.insert_query, - distributed_header.insert_settings, + insert_settings, distributed_header.client_info); writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log); diff --git a/src/Storages/Distributed/DistributedAsyncInsertBatch.h b/src/Storages/Distributed/DistributedAsyncInsertBatch.h index db96634d6f1..e37c3ae6134 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertBatch.h +++ b/src/Storages/Distributed/DistributedAsyncInsertBatch.h @@ -9,6 +9,7 @@ namespace DB class DistributedAsyncInsertDirectoryQueue; class WriteBuffer; class ReadBuffer; +class SettingsChanges; class DistributedAsyncInsertBatch { @@ -16,7 +17,7 @@ public: explicit DistributedAsyncInsertBatch(DistributedAsyncInsertDirectoryQueue & parent_); bool isEnoughSize() const; - void send(); + void send(const SettingsChanges & settings_changes); /// Write batch to current_batch.txt void serialize(); @@ -35,8 +36,8 @@ public: private: void writeText(WriteBuffer & out); void readText(ReadBuffer & in); - void sendBatch(); - void sendSeparateFiles(); + void sendBatch(const SettingsChanges & settings_changes); + void sendSeparateFiles(const SettingsChanges & settings_changes); DistributedAsyncInsertDirectoryQueue & parent; diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp index 4e01cb2c6cf..7fed076713d 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp @@ -146,7 +146,7 @@ DistributedAsyncInsertDirectoryQueue::~DistributedAsyncInsertDirectoryQueue() } } -void DistributedAsyncInsertDirectoryQueue::flushAllData() +void DistributedAsyncInsertDirectoryQueue::flushAllData(const SettingsChanges & settings_changes) { if (pending_files.isFinished()) return; @@ -154,7 +154,7 @@ void DistributedAsyncInsertDirectoryQueue::flushAllData() std::lock_guard lock{mutex}; if (!hasPendingFiles()) return; - processFiles(); + processFiles(settings_changes); } void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData() @@ -362,19 +362,19 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk() status.broken_bytes_count = broken_bytes_count; } } -void DistributedAsyncInsertDirectoryQueue::processFiles() +void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes) try { if (should_batch_inserts) - processFilesWithBatching(); + processFilesWithBatching(settings_changes); else { /// Process unprocessed file. if (!current_file.empty()) - processFile(current_file); + processFile(current_file, settings_changes); while (!pending_files.isFinished() && pending_files.tryPop(current_file)) - processFile(current_file); + processFile(current_file, settings_changes); } std::lock_guard status_lock(status_mutex); @@ -393,7 +393,7 @@ catch (...) throw; } -void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path) +void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path, const SettingsChanges & settings_changes) { OpenTelemetry::TracingContextHolderPtr thread_trace_context; @@ -408,8 +408,11 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path) __PRETTY_FUNCTION__, storage.getContext()->getOpenTelemetrySpanLog()); - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings); - auto connection = pool->get(timeouts, distributed_header.insert_settings); + Settings insert_settings = distributed_header.insert_settings; + insert_settings.applyChanges(settings_changes); + + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); + auto connection = pool->get(timeouts, insert_settings); LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", file_path, connection->getDescription(), @@ -418,7 +421,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path) RemoteInserter remote{*connection, timeouts, distributed_header.insert_query, - distributed_header.insert_settings, + insert_settings, distributed_header.client_info}; bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; writeRemoteConvert(distributed_header, remote, compression_expected, in, log); @@ -515,7 +518,7 @@ DistributedAsyncInsertDirectoryQueue::Status DistributedAsyncInsertDirectoryQueu return current_status; } -void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching() +void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes) { /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch. if (fs::exists(current_batch_file_path)) @@ -533,7 +536,7 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching() /// file was missing, then the batch is not complete and there is no /// point in trying to pretend that it will not break deduplication. if (batch.valid()) - batch.send(); + batch.send(settings_changes); auto dir_sync_guard = getDirectorySyncGuard(relative_path); fs::remove(current_batch_file_path); @@ -615,14 +618,14 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching() if (batch.isEnoughSize()) { - batch.send(); + batch.send(settings_changes); } } for (auto & kv : header_to_batch) { DistributedAsyncInsertBatch & batch = kv.second; - batch.send(); + batch.send(settings_changes); } } catch (...) diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h index a1b436bb9c8..4d6afe31d61 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h @@ -20,6 +20,7 @@ using DiskPtr = std::shared_ptr; class StorageDistributed; class ActionBlocker; class BackgroundSchedulePool; +class SettingsChanges; class IProcessor; using ProcessorPtr = std::shared_ptr; @@ -59,7 +60,7 @@ public: void updatePath(const std::string & new_relative_path); - void flushAllData(); + void flushAllData(const SettingsChanges & settings_changes); void shutdownAndDropAllData(); @@ -98,9 +99,9 @@ private: void addFile(const std::string & file_path); void initializeFilesFromDisk(); - void processFiles(); - void processFile(std::string & file_path); - void processFilesWithBatching(); + void processFiles(const SettingsChanges & settings_changes = {}); + void processFile(std::string & file_path, const SettingsChanges & settings_changes); + void processFilesWithBatching(const SettingsChanges & settings_changes); void markAsBroken(const std::string & file_path); void markAsSend(const std::string & file_path); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 6f578e38945..1ee7c6fc6a5 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1731,7 +1731,7 @@ void StorageDistributed::flushAndPrepareForShutdown() { try { - flushClusterNodesAllData(getContext()); + flushClusterNodesAllDataImpl(getContext(), /* settings_changes= */ {}, getDistributedSettingsRef().flush_on_detach); } catch (...) { @@ -1739,7 +1739,12 @@ void StorageDistributed::flushAndPrepareForShutdown() } } -void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context) +void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context, const SettingsChanges & settings_changes) +{ + flushClusterNodesAllDataImpl(local_context, settings_changes, /* flush= */ true); +} + +void StorageDistributed::flushClusterNodesAllDataImpl(ContextPtr local_context, const SettingsChanges & settings_changes, bool flush) { /// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE auto table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); @@ -1754,7 +1759,7 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context) directory_queues.push_back(node.second.directory_queue); } - if (getDistributedSettingsRef().flush_on_detach) + if (flush) { LOG_INFO(log, "Flushing pending INSERT blocks"); @@ -1764,9 +1769,9 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context) for (const auto & node : directory_queues) { - auto future = scheduleFromThreadPool([node_to_flush = node] + auto future = scheduleFromThreadPool([node_to_flush = node, &settings_changes] { - node_to_flush->flushAllData(); + node_to_flush->flushAllData(settings_changes); }, pool, "DistFlush"); futures.push_back(std::move(future)); } diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 323646ab911..6709b1a2d8c 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -152,7 +153,7 @@ public: ClusterPtr getCluster() const; /// Used by InterpreterSystemQuery - void flushClusterNodesAllData(ContextPtr context); + void flushClusterNodesAllData(ContextPtr context, const SettingsChanges & settings_changes); size_t getShardCount() const; @@ -165,6 +166,10 @@ private: const String & getShardingKeyColumnName() const { return sharding_key_column_name; } const String & getRelativeDataPath() const { return relative_data_path; } + /// @param flush - if true the do flush (DistributedAsyncInsertDirectoryQueue::flushAllData()), + /// otherwise only shutdown (DistributedAsyncInsertDirectoryQueue::shutdownWithoutFlush()) + void flushClusterNodesAllDataImpl(ContextPtr context, const SettingsChanges & settings_changes, bool flush); + /// create directory monitors for each existing subdirectory void initializeDirectoryQueuesForDisk(const DiskPtr & disk); diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference index a0689a0a090..f2b5b95e45c 100644 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference @@ -3,8 +3,8 @@ {"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} 1 ===2=== -{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} -{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &, const SettingsChanges &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &, const SettingsChanges &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} 3 2 ===3=== diff --git a/tests/queries/0_stateless/03030_system_flush_distributed_settings.reference b/tests/queries/0_stateless/03030_system_flush_distributed_settings.reference new file mode 100644 index 00000000000..5caff40c4a0 --- /dev/null +++ b/tests/queries/0_stateless/03030_system_flush_distributed_settings.reference @@ -0,0 +1 @@ +10000 diff --git a/tests/queries/0_stateless/03030_system_flush_distributed_settings.sql b/tests/queries/0_stateless/03030_system_flush_distributed_settings.sql new file mode 100644 index 00000000000..da2a387e07c --- /dev/null +++ b/tests/queries/0_stateless/03030_system_flush_distributed_settings.sql @@ -0,0 +1,19 @@ +drop table if exists ephemeral; +drop table if exists dist_in; +drop table if exists data; +drop table if exists mv; +drop table if exists dist_out; + +create table ephemeral (key Int, value Int) engine=Null(); +create table dist_in as ephemeral engine=Distributed(test_shard_localhost, currentDatabase(), ephemeral, key) settings background_insert_batch=1; +create table data (key Int, uniq_values Int) engine=Memory(); +create materialized view mv to data as select key, uniqExact(value) uniq_values from ephemeral group by key; +system stop distributed sends dist_in; +create table dist_out as data engine=Distributed(test_shard_localhost, currentDatabase(), data); + +set prefer_localhost_replica=0; + +insert into dist_in select number/100, number from system.numbers limit 1e6 settings max_memory_usage='20Mi'; +system flush distributed dist_in; -- { serverError MEMORY_LIMIT_EXCEEDED } +system flush distributed dist_in settings max_memory_usage=0; +select count() from dist_out;