Merge branch 'master' into geodistance-double-precision

This commit is contained in:
Alexey Milovidov 2024-03-25 02:48:27 +01:00
commit 8bf3da136f
15 changed files with 123 additions and 42 deletions

2
contrib/xxHash vendored

@ -1 +1 @@
Subproject commit 3078dc6039f8c0bffcb1904f81cfe6b2c3209435
Subproject commit bbb27a5efb85b92a0486cf361a8635715a53f6ba

View File

@ -7,7 +7,7 @@ add_library(xxHash ${SRCS})
target_include_directories(xxHash SYSTEM BEFORE INTERFACE "${LIBRARY_DIR}")
# XXH_INLINE_ALL - Make all functions inline, with implementations being directly included within xxhash.h. Inlining functions is beneficial for speed on small keys.
# https://github.com/Cyan4973/xxHash/tree/v0.8.1#build-modifiers
# https://github.com/Cyan4973/xxHash/tree/v0.8.2#build-modifiers
target_compile_definitions(xxHash PUBLIC XXH_INLINE_ALL)
add_library(ch_contrib::xxHash ALIAS xxHash)

View File

@ -59,6 +59,7 @@
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Common/ThreadFuzzer.h>
#include <base/coverage.h>
@ -1165,12 +1166,16 @@ void InterpreterSystemQuery::syncTransactionLog()
}
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id);
SettingsChanges settings_changes;
if (query.query_settings)
settings_changes = query.query_settings->as<ASTSetQuery>()->changes;
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id, getContext()).get()))
storage_distributed->flushClusterNodesAllData(getContext());
storage_distributed->flushClusterNodesAllData(getContext(), settings_changes);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table {} is not distributed", table_id.getNameForLogs());
}

View File

@ -190,6 +190,21 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::SYNC_REPLICA:
case Type::WAIT_LOADING_PARTS:
case Type::FLUSH_DISTRIBUTED:
{
if (table)
{
settings.ostr << ' ';
print_database_table();
}
if (query_settings)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws << "SETTINGS " << (settings.hilite ? hilite_none : "");
query_settings->formatImpl(settings, state, frame);
}
break;
}
case Type::RELOAD_DICTIONARY:
case Type::RELOAD_MODEL:
case Type::RELOAD_FUNCTION:

View File

@ -109,6 +109,7 @@ public:
ASTPtr database;
ASTPtr table;
ASTPtr query_settings;
String getDatabase() const;
String getTable() const;
@ -158,6 +159,7 @@ public:
if (database) { res->database = database->clone(); res->children.push_back(res->database); }
if (table) { res->table = table->clone(); res->children.push_back(res->table); }
if (query_settings) { res->query_settings = query_settings->clone(); res->children.push_back(res->query_settings); }
return res;
}

View File

@ -4,6 +4,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
@ -328,6 +329,21 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::FLUSH_DISTRIBUTED:
{
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ false))
return false;
ParserKeyword s_settings(Keyword::SETTINGS);
if (s_settings.ignore(pos, expected))
{
ParserSetQuery parser_settings(/* parse_only_internals_= */ true);
if (!parser_settings.parse(pos, res->query_settings, expected))
return false;
}
break;
}
case Type::RESTORE_REPLICA:
{
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ false))
@ -616,6 +632,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
res->children.push_back(res->database);
if (res->table)
res->children.push_back(res->table);
if (res->query_settings)
res->children.push_back(res->query_settings);
node = std::move(res);
return true;

View File

@ -60,7 +60,7 @@ bool DistributedAsyncInsertBatch::isEnoughSize() const
|| (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes);
}
void DistributedAsyncInsertBatch::send()
void DistributedAsyncInsertBatch::send(const SettingsChanges & settings_changes)
{
if (files.empty())
return;
@ -84,14 +84,14 @@ void DistributedAsyncInsertBatch::send()
{
try
{
sendBatch();
sendBatch(settings_changes);
}
catch (const Exception & e)
{
if (split_batch_on_failure && files.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
{
tryLogCurrentException(parent.log, "Trying to split batch due to");
sendSeparateFiles();
sendSeparateFiles(settings_changes);
}
else
throw;
@ -201,7 +201,7 @@ void DistributedAsyncInsertBatch::readText(ReadBuffer & in)
recovered = true;
}
void DistributedAsyncInsertBatch::sendBatch()
void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_changes)
{
std::unique_ptr<RemoteInserter> remote;
bool compression_expected = false;
@ -228,7 +228,10 @@ void DistributedAsyncInsertBatch::sendBatch()
if (!remote)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
Settings insert_settings = distributed_header.insert_settings;
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
connection = parent.pool->get(timeouts);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
@ -240,7 +243,7 @@ void DistributedAsyncInsertBatch::sendBatch()
remote = std::make_unique<RemoteInserter>(*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
insert_settings,
distributed_header.client_info);
}
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
@ -264,7 +267,7 @@ void DistributedAsyncInsertBatch::sendBatch()
}
}
void DistributedAsyncInsertBatch::sendSeparateFiles()
void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & settings_changes)
{
size_t broken_files = 0;
@ -277,18 +280,21 @@ void DistributedAsyncInsertBatch::sendSeparateFiles()
ReadBufferFromFile in(file);
const auto & distributed_header = DistributedAsyncInsertHeader::read(in, parent.log);
Settings insert_settings = distributed_header.insert_settings;
insert_settings.applyChanges(settings_changes);
// This function is called in a separated thread, so we set up the trace context from the file
trace_context = distributed_header.createTracingContextHolder(
__PRETTY_FUNCTION__,
parent.storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto connection = parent.pool->get(timeouts);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
RemoteInserter remote(*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
insert_settings,
distributed_header.client_info);
writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log);

View File

@ -9,6 +9,7 @@ namespace DB
class DistributedAsyncInsertDirectoryQueue;
class WriteBuffer;
class ReadBuffer;
class SettingsChanges;
class DistributedAsyncInsertBatch
{
@ -16,7 +17,7 @@ public:
explicit DistributedAsyncInsertBatch(DistributedAsyncInsertDirectoryQueue & parent_);
bool isEnoughSize() const;
void send();
void send(const SettingsChanges & settings_changes);
/// Write batch to current_batch.txt
void serialize();
@ -35,8 +36,8 @@ public:
private:
void writeText(WriteBuffer & out);
void readText(ReadBuffer & in);
void sendBatch();
void sendSeparateFiles();
void sendBatch(const SettingsChanges & settings_changes);
void sendSeparateFiles(const SettingsChanges & settings_changes);
DistributedAsyncInsertDirectoryQueue & parent;

View File

@ -146,7 +146,7 @@ DistributedAsyncInsertDirectoryQueue::~DistributedAsyncInsertDirectoryQueue()
}
}
void DistributedAsyncInsertDirectoryQueue::flushAllData()
void DistributedAsyncInsertDirectoryQueue::flushAllData(const SettingsChanges & settings_changes)
{
if (pending_files.isFinished())
return;
@ -154,7 +154,7 @@ void DistributedAsyncInsertDirectoryQueue::flushAllData()
std::lock_guard lock{mutex};
if (!hasPendingFiles())
return;
processFiles();
processFiles(settings_changes);
}
void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData()
@ -362,19 +362,19 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk()
status.broken_bytes_count = broken_bytes_count;
}
}
void DistributedAsyncInsertDirectoryQueue::processFiles()
void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes)
try
{
if (should_batch_inserts)
processFilesWithBatching();
processFilesWithBatching(settings_changes);
else
{
/// Process unprocessed file.
if (!current_file.empty())
processFile(current_file);
processFile(current_file, settings_changes);
while (!pending_files.isFinished() && pending_files.tryPop(current_file))
processFile(current_file);
processFile(current_file, settings_changes);
}
std::lock_guard status_lock(status_mutex);
@ -393,7 +393,7 @@ catch (...)
throw;
}
void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path, const SettingsChanges & settings_changes)
{
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
@ -408,8 +408,11 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
__PRETTY_FUNCTION__,
storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
auto connection = pool->get(timeouts, distributed_header.insert_settings);
Settings insert_settings = distributed_header.insert_settings;
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto connection = pool->get(timeouts, insert_settings);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
connection->getDescription(),
@ -418,7 +421,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
RemoteInserter remote{*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
insert_settings,
distributed_header.client_info};
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, remote, compression_expected, in, log);
@ -515,7 +518,7 @@ DistributedAsyncInsertDirectoryQueue::Status DistributedAsyncInsertDirectoryQueu
return current_status;
}
void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching()
void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes)
{
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
if (fs::exists(current_batch_file_path))
@ -533,7 +536,7 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching()
/// file was missing, then the batch is not complete and there is no
/// point in trying to pretend that it will not break deduplication.
if (batch.valid())
batch.send();
batch.send(settings_changes);
auto dir_sync_guard = getDirectorySyncGuard(relative_path);
fs::remove(current_batch_file_path);
@ -615,14 +618,14 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching()
if (batch.isEnoughSize())
{
batch.send();
batch.send(settings_changes);
}
}
for (auto & kv : header_to_batch)
{
DistributedAsyncInsertBatch & batch = kv.second;
batch.send();
batch.send(settings_changes);
}
}
catch (...)

View File

@ -20,6 +20,7 @@ using DiskPtr = std::shared_ptr<IDisk>;
class StorageDistributed;
class ActionBlocker;
class BackgroundSchedulePool;
class SettingsChanges;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
@ -59,7 +60,7 @@ public:
void updatePath(const std::string & new_relative_path);
void flushAllData();
void flushAllData(const SettingsChanges & settings_changes);
void shutdownAndDropAllData();
@ -98,9 +99,9 @@ private:
void addFile(const std::string & file_path);
void initializeFilesFromDisk();
void processFiles();
void processFile(std::string & file_path);
void processFilesWithBatching();
void processFiles(const SettingsChanges & settings_changes = {});
void processFile(std::string & file_path, const SettingsChanges & settings_changes);
void processFilesWithBatching(const SettingsChanges & settings_changes);
void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path);

View File

@ -1731,7 +1731,7 @@ void StorageDistributed::flushAndPrepareForShutdown()
{
try
{
flushClusterNodesAllData(getContext());
flushClusterNodesAllDataImpl(getContext(), /* settings_changes= */ {}, getDistributedSettingsRef().flush_on_detach);
}
catch (...)
{
@ -1739,7 +1739,12 @@ void StorageDistributed::flushAndPrepareForShutdown()
}
}
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context, const SettingsChanges & settings_changes)
{
flushClusterNodesAllDataImpl(local_context, settings_changes, /* flush= */ true);
}
void StorageDistributed::flushClusterNodesAllDataImpl(ContextPtr local_context, const SettingsChanges & settings_changes, bool flush)
{
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE
auto table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
@ -1754,7 +1759,7 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
directory_queues.push_back(node.second.directory_queue);
}
if (getDistributedSettingsRef().flush_on_detach)
if (flush)
{
LOG_INFO(log, "Flushing pending INSERT blocks");
@ -1764,9 +1769,9 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
for (const auto & node : directory_queues)
{
auto future = scheduleFromThreadPool<void>([node_to_flush = node]
auto future = scheduleFromThreadPool<void>([node_to_flush = node, &settings_changes]
{
node_to_flush->flushAllData();
node_to_flush->flushAllData(settings_changes);
}, pool, "DistFlush");
futures.push_back(std::move(future));
}

View File

@ -5,6 +5,7 @@
#include <Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <Common/SettingsChanges.h>
#include <Common/SimpleIncrement.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
@ -152,7 +153,7 @@ public:
ClusterPtr getCluster() const;
/// Used by InterpreterSystemQuery
void flushClusterNodesAllData(ContextPtr context);
void flushClusterNodesAllData(ContextPtr context, const SettingsChanges & settings_changes);
size_t getShardCount() const;
@ -165,6 +166,10 @@ private:
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
const String & getRelativeDataPath() const { return relative_data_path; }
/// @param flush - if true the do flush (DistributedAsyncInsertDirectoryQueue::flushAllData()),
/// otherwise only shutdown (DistributedAsyncInsertDirectoryQueue::shutdownWithoutFlush())
void flushClusterNodesAllDataImpl(ContextPtr context, const SettingsChanges & settings_changes, bool flush);
/// create directory monitors for each existing subdirectory
void initializeDirectoryQueuesForDisk(const DiskPtr & disk);

View File

@ -3,8 +3,8 @@
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
1
===2===
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &, const SettingsChanges &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &, const SettingsChanges &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
3
2
===3===

View File

@ -0,0 +1,19 @@
drop table if exists ephemeral;
drop table if exists dist_in;
drop table if exists data;
drop table if exists mv;
drop table if exists dist_out;
create table ephemeral (key Int, value Int) engine=Null();
create table dist_in as ephemeral engine=Distributed(test_shard_localhost, currentDatabase(), ephemeral, key) settings background_insert_batch=1;
create table data (key Int, uniq_values Int) engine=Memory();
create materialized view mv to data as select key, uniqExact(value) uniq_values from ephemeral group by key;
system stop distributed sends dist_in;
create table dist_out as data engine=Distributed(test_shard_localhost, currentDatabase(), data);
set prefer_localhost_replica=0;
insert into dist_in select number/100, number from system.numbers limit 1e6 settings max_memory_usage='20Mi';
system flush distributed dist_in; -- { serverError MEMORY_LIMIT_EXCEEDED }
system flush distributed dist_in settings max_memory_usage=0;
select count() from dist_out;