From 0d39968233453b77971c06d9894b5381e2f79378 Mon Sep 17 00:00:00 2001 From: tai Date: Mon, 8 Apr 2019 18:04:26 +0800 Subject: [PATCH 1/2] Suport settings for async(#4852) There is no hash operation on Settings. Because it is less valuable for hash operations. And I dont know how to write a testcase. --- dbms/src/Core/Defines.h | 4 ++ dbms/src/Core/Settings.cpp | 1 - .../Storages/Distributed/DirectoryMonitor.cpp | 39 +++++++++++++++---- .../Storages/Distributed/DirectoryMonitor.h | 4 ++ .../DistributedBlockOutputStream.cpp | 22 ++++++----- .../DistributedBlockOutputStream.h | 6 +-- dbms/src/Storages/StorageDistributed.cpp | 3 +- 7 files changed, 56 insertions(+), 23 deletions(-) diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 0a3b384797d..d0738875947 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -123,3 +123,7 @@ #else #define OPTIMIZE(x) #endif + +/// This number is only used for distributed version compatible. +/// It could be any magic number. +#define DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER 0xCAFECABE diff --git a/dbms/src/Core/Settings.cpp b/dbms/src/Core/Settings.cpp index c09ff38a2d8..bed05855e99 100644 --- a/dbms/src/Core/Settings.cpp +++ b/dbms/src/Core/Settings.cpp @@ -109,5 +109,4 @@ void Settings::addProgramOptions(boost::program_options::options_description & o Settings::getDescriptionByIndex(index).data))); } } - } diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 3cd51d0bf98..ea2365cfd2b 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -220,10 +220,11 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa ReadBufferFromFile in{file_path}; + Settings insert_settings; std::string insert_query; - readStringBinary(insert_query, in); + readQueryAndSettings(in, insert_settings, insert_query); - RemoteBlockOutputStream remote{*connection, insert_query}; + RemoteBlockOutputStream remote{*connection, insert_query, &insert_settings}; remote.writePrefix(); remote.writePrepared(in); @@ -240,20 +241,39 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa LOG_TRACE(log, "Finished processing `" << file_path << '`'); } +void StorageDistributedDirectoryMonitor::readQueryAndSettings( + ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const +{ + UInt64 magic_number_or_query_size; + + readVarUInt(magic_number_or_query_size, in); + + if (magic_number_or_query_size == UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER)) + { + insert_settings.deserialize(in); + readVarUInt(magic_number_or_query_size, in); + } + insert_query.resize(magic_number_or_query_size); + in.readStrict(insert_query.data(), magic_number_or_query_size); +} + struct StorageDistributedDirectoryMonitor::BatchHeader { + Settings settings; String query; Block sample_block; - BatchHeader(String query_, Block sample_block_) - : query(std::move(query_)) + BatchHeader(Settings settings_, String query_, Block sample_block_) + : settings(std::move(settings_)) + , query(std::move(query_)) , sample_block(std::move(sample_block_)) { } bool operator==(const BatchHeader & other) const { - return query == other.query && blocksHaveEqualStructure(sample_block, other.sample_block); + return settings == other.settings && query == other.query && + blocksHaveEqualStructure(sample_block, other.sample_block); } struct Hash @@ -320,6 +340,7 @@ struct StorageDistributedDirectoryMonitor::Batch bool batch_broken = false; try { + Settings insert_settings; String insert_query; std::unique_ptr remote; bool first = true; @@ -335,12 +356,12 @@ struct StorageDistributedDirectoryMonitor::Batch } ReadBufferFromFile in(file_path->second); - readStringBinary(insert_query, in); /// NOTE: all files must have the same insert_query + parent.readQueryAndSettings(in, insert_settings, insert_query); if (first) { first = false; - remote = std::make_unique(*connection, insert_query); + remote = std::make_unique(*connection, insert_query, &insert_settings); remote->writePrefix(); } @@ -436,11 +457,13 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map size_t total_rows = 0; size_t total_bytes = 0; Block sample_block; + Settings insert_settings; String insert_query; try { /// Determine metadata of the current file and check if it is not broken. ReadBufferFromFile in{file_path}; + insert_settings.deserialize(in); readStringBinary(insert_query, in); CompressedReadBuffer decompressing_in(in); @@ -468,7 +491,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map throw; } - BatchHeader batch_header(std::move(insert_query), std::move(sample_block)); + BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(sample_block)); Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second; batch.file_indices.push_back(file_idx); diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.h b/dbms/src/Storages/Distributed/DirectoryMonitor.h index d7858d3af40..2c95947355d 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB @@ -57,6 +58,9 @@ private: std::condition_variable cond; Logger * log; ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; + + /// Read insert query and insert settings for backward compatible. + void readQueryAndSettings(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const; }; } diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 7f47a76a068..836ba20a644 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -59,10 +59,10 @@ namespace ErrorCodes DistributedBlockOutputStream::DistributedBlockOutputStream( - StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, - const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_) - : storage(storage), query_ast(query_ast), query_string(queryToString(query_ast)), - cluster(cluster_), settings(settings_), insert_sync(insert_sync_), + const Context & context_, StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, + bool insert_sync_, UInt64 insert_timeout_) + : context(context_), storage(storage), query_ast(query_ast), query_string(queryToString(query_ast)), + cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_), log(&Logger::get("DistributedBlockOutputStream")) { } @@ -249,7 +249,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR); /// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here - auto connections = shard_info.pool->getMany(&settings, PoolMode::GET_ONE); + auto connections = shard_info.pool->getMany(&context.getSettingsRef(), PoolMode::GET_ONE); if (connections.empty() || connections.front().isNull()) throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR); @@ -263,7 +263,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp if (!connection_pool) throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR); - job.connection_entry = connection_pool->get(&settings); + job.connection_entry = connection_pool->get(&context.getSettingsRef()); if (job.connection_entry.isNull()) throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR); } @@ -271,7 +271,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp if (throttler) job.connection_entry->setThrottler(throttler); - job.stream = std::make_shared(*job.connection_entry, query_string, &settings); + job.stream = std::make_shared(*job.connection_entry, query_string, &context.getSettingsRef()); job.stream->writePrefix(); } @@ -283,8 +283,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp if (!job.stream) { /// Forward user settings - job.local_context = std::make_unique(storage.global_context); - job.local_context->setSettings(settings); + job.local_context = std::make_unique(context); InterpreterInsertQuery interp(query_ast, *job.local_context); job.stream = interp.execute().out; @@ -304,6 +303,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp void DistributedBlockOutputStream::writeSync(const Block & block) { + const Settings & settings = context.getSettingsRef(); const auto & shards_info = cluster->getShardsInfo(); size_t num_shards = shards_info.size(); @@ -504,7 +504,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats) { /// Async insert does not support settings forwarding yet whereas sync one supports - InterpreterInsertQuery interp(query_ast, storage.global_context); + InterpreterInsertQuery interp(query_ast, context); auto block_io = interp.execute(); block_io.out->writePrefix(); @@ -553,6 +553,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: CompressedWriteBuffer compress{out}; NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()}; + writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out); + context.getSettingsRef().serialize(out); writeStringBinary(query_string, out); stream.writePrefix(); diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h index de802a09483..f71585b8026 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -35,8 +35,8 @@ class StorageDistributed; class DistributedBlockOutputStream : public IBlockOutputStream { public: - DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, - const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_); + DistributedBlockOutputStream(const Context & context_, StorageDistributed & storage, const ASTPtr & query_ast, + const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_); Block getHeader() const override; void write(const Block & block) override; @@ -78,11 +78,11 @@ private: std::string getCurrentStateDescription(); private: + const Context & context; StorageDistributed & storage; ASTPtr query_ast; String query_string; ClusterPtr cluster; - const Settings & settings; size_t inserted_blocks = 0; size_t inserted_rows = 0; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index aa140903496..174c5bea72a 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -333,7 +333,8 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c /// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster return std::make_shared( - *this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlock()), cluster, settings, insert_sync, timeout); + context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlock()), cluster, + insert_sync, timeout); } From b394a79af39dcca1bafcbdf226f484fc9abc06b7 Mon Sep 17 00:00:00 2001 From: tai Date: Mon, 29 Apr 2019 14:50:06 +0800 Subject: [PATCH 2/2] try fix batch async insert settings for Distributed --- dbms/src/Storages/Distributed/DirectoryMonitor.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index ea2365cfd2b..2500b519b23 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -463,8 +463,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map { /// Determine metadata of the current file and check if it is not broken. ReadBufferFromFile in{file_path}; - insert_settings.deserialize(in); - readStringBinary(insert_query, in); + readQueryAndSettings(in, insert_settings, insert_query); CompressedReadBuffer decompressing_in(in); NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());