Add ability to override initial INSERT SETTINGS via SYSTEM FLUSH DISTRIBUTED

This is useful to overwrite settings in the initial query during force
flush with SYSTEM FLUSH DISTRIBUTED.

For example imagine that your server is out of queries
(max_max_concurrent_queries_for_all_users had been reached), but you
want to flush the distributed table anyway, after this patch you can
use something like this:

    SYSTEM FLUSH DISTRIBUTED dist SETTINGS max_concurrent_queries_for_all_users=1000000

And also fix flush_on_detach for SYSTEM FLUSH DISTRIBUTED, it should
ignore flush_on_detach.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2024-03-24 17:19:14 +01:00
parent ede245e478
commit 35722be8ec
13 changed files with 121 additions and 40 deletions

View File

@ -59,6 +59,7 @@
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Common/ThreadFuzzer.h>
#include <base/coverage.h>
@ -1165,12 +1166,16 @@ void InterpreterSystemQuery::syncTransactionLog()
}
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id);
SettingsChanges settings_changes;
if (query.query_settings)
settings_changes = query.query_settings->as<ASTSetQuery>()->changes;
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id, getContext()).get()))
storage_distributed->flushClusterNodesAllData(getContext());
storage_distributed->flushClusterNodesAllData(getContext(), settings_changes);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table {} is not distributed", table_id.getNameForLogs());
}

View File

@ -190,6 +190,21 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::SYNC_REPLICA:
case Type::WAIT_LOADING_PARTS:
case Type::FLUSH_DISTRIBUTED:
{
if (table)
{
settings.ostr << ' ';
print_database_table();
}
if (query_settings)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws << "SETTINGS " << (settings.hilite ? hilite_none : "");
query_settings->formatImpl(settings, state, frame);
}
break;
}
case Type::RELOAD_DICTIONARY:
case Type::RELOAD_MODEL:
case Type::RELOAD_FUNCTION:

View File

@ -109,6 +109,7 @@ public:
ASTPtr database;
ASTPtr table;
ASTPtr query_settings;
String getDatabase() const;
String getTable() const;
@ -158,6 +159,7 @@ public:
if (database) { res->database = database->clone(); res->children.push_back(res->database); }
if (table) { res->table = table->clone(); res->children.push_back(res->table); }
if (query_settings) { res->query_settings = query_settings->clone(); res->children.push_back(res->query_settings); }
return res;
}

View File

@ -4,6 +4,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
@ -328,6 +329,21 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::FLUSH_DISTRIBUTED:
{
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ false))
return false;
ParserKeyword s_settings(Keyword::SETTINGS);
if (s_settings.ignore(pos, expected))
{
ParserSetQuery parser_settings(/* parse_only_internals_= */ true);
if (!parser_settings.parse(pos, res->query_settings, expected))
return false;
}
break;
}
case Type::RESTORE_REPLICA:
{
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ false))
@ -616,6 +632,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
res->children.push_back(res->database);
if (res->table)
res->children.push_back(res->table);
if (res->query_settings)
res->children.push_back(res->query_settings);
node = std::move(res);
return true;

View File

@ -60,7 +60,7 @@ bool DistributedAsyncInsertBatch::isEnoughSize() const
|| (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes);
}
void DistributedAsyncInsertBatch::send()
void DistributedAsyncInsertBatch::send(const SettingsChanges & settings_changes)
{
if (files.empty())
return;
@ -84,14 +84,14 @@ void DistributedAsyncInsertBatch::send()
{
try
{
sendBatch();
sendBatch(settings_changes);
}
catch (const Exception & e)
{
if (split_batch_on_failure && files.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
{
tryLogCurrentException(parent.log, "Trying to split batch due to");
sendSeparateFiles();
sendSeparateFiles(settings_changes);
}
else
throw;
@ -201,7 +201,7 @@ void DistributedAsyncInsertBatch::readText(ReadBuffer & in)
recovered = true;
}
void DistributedAsyncInsertBatch::sendBatch()
void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_changes)
{
std::unique_ptr<RemoteInserter> remote;
bool compression_expected = false;
@ -228,7 +228,10 @@ void DistributedAsyncInsertBatch::sendBatch()
if (!remote)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
Settings insert_settings = distributed_header.insert_settings;
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
connection = parent.pool->get(timeouts);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
@ -240,7 +243,7 @@ void DistributedAsyncInsertBatch::sendBatch()
remote = std::make_unique<RemoteInserter>(*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
insert_settings,
distributed_header.client_info);
}
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
@ -264,7 +267,7 @@ void DistributedAsyncInsertBatch::sendBatch()
}
}
void DistributedAsyncInsertBatch::sendSeparateFiles()
void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & settings_changes)
{
size_t broken_files = 0;
@ -277,18 +280,21 @@ void DistributedAsyncInsertBatch::sendSeparateFiles()
ReadBufferFromFile in(file);
const auto & distributed_header = DistributedAsyncInsertHeader::read(in, parent.log);
Settings insert_settings = distributed_header.insert_settings;
insert_settings.applyChanges(settings_changes);
// This function is called in a separated thread, so we set up the trace context from the file
trace_context = distributed_header.createTracingContextHolder(
__PRETTY_FUNCTION__,
parent.storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto connection = parent.pool->get(timeouts);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
RemoteInserter remote(*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
insert_settings,
distributed_header.client_info);
writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log);

View File

@ -9,6 +9,7 @@ namespace DB
class DistributedAsyncInsertDirectoryQueue;
class WriteBuffer;
class ReadBuffer;
class SettingsChanges;
class DistributedAsyncInsertBatch
{
@ -16,7 +17,7 @@ public:
explicit DistributedAsyncInsertBatch(DistributedAsyncInsertDirectoryQueue & parent_);
bool isEnoughSize() const;
void send();
void send(const SettingsChanges & settings_changes);
/// Write batch to current_batch.txt
void serialize();
@ -35,8 +36,8 @@ public:
private:
void writeText(WriteBuffer & out);
void readText(ReadBuffer & in);
void sendBatch();
void sendSeparateFiles();
void sendBatch(const SettingsChanges & settings_changes);
void sendSeparateFiles(const SettingsChanges & settings_changes);
DistributedAsyncInsertDirectoryQueue & parent;

View File

@ -146,7 +146,7 @@ DistributedAsyncInsertDirectoryQueue::~DistributedAsyncInsertDirectoryQueue()
}
}
void DistributedAsyncInsertDirectoryQueue::flushAllData()
void DistributedAsyncInsertDirectoryQueue::flushAllData(const SettingsChanges & settings_changes)
{
if (pending_files.isFinished())
return;
@ -154,7 +154,7 @@ void DistributedAsyncInsertDirectoryQueue::flushAllData()
std::lock_guard lock{mutex};
if (!hasPendingFiles())
return;
processFiles();
processFiles(settings_changes);
}
void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData()
@ -362,19 +362,19 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk()
status.broken_bytes_count = broken_bytes_count;
}
}
void DistributedAsyncInsertDirectoryQueue::processFiles()
void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes)
try
{
if (should_batch_inserts)
processFilesWithBatching();
processFilesWithBatching(settings_changes);
else
{
/// Process unprocessed file.
if (!current_file.empty())
processFile(current_file);
processFile(current_file, settings_changes);
while (!pending_files.isFinished() && pending_files.tryPop(current_file))
processFile(current_file);
processFile(current_file, settings_changes);
}
std::lock_guard status_lock(status_mutex);
@ -393,7 +393,7 @@ catch (...)
throw;
}
void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path, const SettingsChanges & settings_changes)
{
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
@ -408,8 +408,11 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
__PRETTY_FUNCTION__,
storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
auto connection = pool->get(timeouts, distributed_header.insert_settings);
Settings insert_settings = distributed_header.insert_settings;
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto connection = pool->get(timeouts, insert_settings);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
connection->getDescription(),
@ -418,7 +421,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
RemoteInserter remote{*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
insert_settings,
distributed_header.client_info};
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, remote, compression_expected, in, log);
@ -515,7 +518,7 @@ DistributedAsyncInsertDirectoryQueue::Status DistributedAsyncInsertDirectoryQueu
return current_status;
}
void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching()
void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes)
{
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
if (fs::exists(current_batch_file_path))
@ -533,7 +536,7 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching()
/// file was missing, then the batch is not complete and there is no
/// point in trying to pretend that it will not break deduplication.
if (batch.valid())
batch.send();
batch.send(settings_changes);
auto dir_sync_guard = getDirectorySyncGuard(relative_path);
fs::remove(current_batch_file_path);
@ -615,14 +618,14 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching()
if (batch.isEnoughSize())
{
batch.send();
batch.send(settings_changes);
}
}
for (auto & kv : header_to_batch)
{
DistributedAsyncInsertBatch & batch = kv.second;
batch.send();
batch.send(settings_changes);
}
}
catch (...)

View File

@ -20,6 +20,7 @@ using DiskPtr = std::shared_ptr<IDisk>;
class StorageDistributed;
class ActionBlocker;
class BackgroundSchedulePool;
class SettingsChanges;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
@ -59,7 +60,7 @@ public:
void updatePath(const std::string & new_relative_path);
void flushAllData();
void flushAllData(const SettingsChanges & settings_changes);
void shutdownAndDropAllData();
@ -98,9 +99,9 @@ private:
void addFile(const std::string & file_path);
void initializeFilesFromDisk();
void processFiles();
void processFile(std::string & file_path);
void processFilesWithBatching();
void processFiles(const SettingsChanges & settings_changes = {});
void processFile(std::string & file_path, const SettingsChanges & settings_changes);
void processFilesWithBatching(const SettingsChanges & settings_changes);
void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path);

View File

@ -1731,7 +1731,7 @@ void StorageDistributed::flushAndPrepareForShutdown()
{
try
{
flushClusterNodesAllData(getContext());
flushClusterNodesAllDataImpl(getContext(), /* settings_changes= */ {}, getDistributedSettingsRef().flush_on_detach);
}
catch (...)
{
@ -1739,7 +1739,12 @@ void StorageDistributed::flushAndPrepareForShutdown()
}
}
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context, const SettingsChanges & settings_changes)
{
flushClusterNodesAllDataImpl(local_context, settings_changes, /* flush= */ true);
}
void StorageDistributed::flushClusterNodesAllDataImpl(ContextPtr local_context, const SettingsChanges & settings_changes, bool flush)
{
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE
auto table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
@ -1754,7 +1759,7 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
directory_queues.push_back(node.second.directory_queue);
}
if (getDistributedSettingsRef().flush_on_detach)
if (flush)
{
LOG_INFO(log, "Flushing pending INSERT blocks");
@ -1764,9 +1769,9 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
for (const auto & node : directory_queues)
{
auto future = scheduleFromThreadPool<void>([node_to_flush = node]
auto future = scheduleFromThreadPool<void>([node_to_flush = node, &settings_changes]
{
node_to_flush->flushAllData();
node_to_flush->flushAllData(settings_changes);
}, pool, "DistFlush");
futures.push_back(std::move(future));
}

View File

@ -5,6 +5,7 @@
#include <Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <Common/SettingsChanges.h>
#include <Common/SimpleIncrement.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
@ -152,7 +153,7 @@ public:
ClusterPtr getCluster() const;
/// Used by InterpreterSystemQuery
void flushClusterNodesAllData(ContextPtr context);
void flushClusterNodesAllData(ContextPtr context, const SettingsChanges & settings_changes);
size_t getShardCount() const;
@ -165,6 +166,10 @@ private:
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
const String & getRelativeDataPath() const { return relative_data_path; }
/// @param flush - if true the do flush (DistributedAsyncInsertDirectoryQueue::flushAllData()),
/// otherwise only shutdown (DistributedAsyncInsertDirectoryQueue::shutdownWithoutFlush())
void flushClusterNodesAllDataImpl(ContextPtr context, const SettingsChanges & settings_changes, bool flush);
/// create directory monitors for each existing subdirectory
void initializeDirectoryQueuesForDisk(const DiskPtr & disk);

View File

@ -3,8 +3,8 @@
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
1
===2===
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &, const SettingsChanges &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &, const SettingsChanges &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
3
2
===3===

View File

@ -0,0 +1,19 @@
drop table if exists ephemeral;
drop table if exists dist_in;
drop table if exists data;
drop table if exists mv;
drop table if exists dist_out;
create table ephemeral (key Int, value Int) engine=Null();
create table dist_in as ephemeral engine=Distributed(test_shard_localhost, currentDatabase(), ephemeral, key) settings background_insert_batch=1;
create table data (key Int, uniq_values Int) engine=Memory();
create materialized view mv to data as select key, uniqExact(value) uniq_values from ephemeral group by key;
system stop distributed sends dist_in;
create table dist_out as data engine=Distributed(test_shard_localhost, currentDatabase(), data);
set prefer_localhost_replica=0;
insert into dist_in select number/100, number from system.numbers limit 1e6 settings max_memory_usage='20Mi';
system flush distributed dist_in; -- { serverError MEMORY_LIMIT_EXCEEDED }
system flush distributed dist_in settings max_memory_usage=0;
select count() from dist_out;