Merge pull request #4936 from TCeason/feature/support_settings_for_async

Suport settings for async distributed inserts (#4852)
This commit is contained in:
alexey-milovidov 2019-05-09 23:59:29 +03:00 committed by GitHub
commit 8ef7f3589a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 56 additions and 24 deletions

View File

@ -123,3 +123,7 @@
#else
#define OPTIMIZE(x)
#endif
/// This number is only used for distributed version compatible.
/// It could be any magic number.
#define DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER 0xCAFECABE

View File

@ -109,5 +109,4 @@ void Settings::addProgramOptions(boost::program_options::options_description & o
Settings::getDescription(index).data)));
}
}
}

View File

@ -220,10 +220,11 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in{file_path};
Settings insert_settings;
std::string insert_query;
readStringBinary(insert_query, in);
readQueryAndSettings(in, insert_settings, insert_query);
RemoteBlockOutputStream remote{*connection, insert_query};
RemoteBlockOutputStream remote{*connection, insert_query, &insert_settings};
remote.writePrefix();
remote.writePrepared(in);
@ -240,20 +241,39 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
LOG_TRACE(log, "Finished processing `" << file_path << '`');
}
void StorageDistributedDirectoryMonitor::readQueryAndSettings(
ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const
{
UInt64 magic_number_or_query_size;
readVarUInt(magic_number_or_query_size, in);
if (magic_number_or_query_size == UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER))
{
insert_settings.deserialize(in);
readVarUInt(magic_number_or_query_size, in);
}
insert_query.resize(magic_number_or_query_size);
in.readStrict(insert_query.data(), magic_number_or_query_size);
}
struct StorageDistributedDirectoryMonitor::BatchHeader
{
Settings settings;
String query;
Block sample_block;
BatchHeader(String query_, Block sample_block_)
: query(std::move(query_))
BatchHeader(Settings settings_, String query_, Block sample_block_)
: settings(std::move(settings_))
, query(std::move(query_))
, sample_block(std::move(sample_block_))
{
}
bool operator==(const BatchHeader & other) const
{
return query == other.query && blocksHaveEqualStructure(sample_block, other.sample_block);
return settings == other.settings && query == other.query &&
blocksHaveEqualStructure(sample_block, other.sample_block);
}
struct Hash
@ -320,6 +340,7 @@ struct StorageDistributedDirectoryMonitor::Batch
bool batch_broken = false;
try
{
Settings insert_settings;
String insert_query;
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;
@ -335,12 +356,12 @@ struct StorageDistributedDirectoryMonitor::Batch
}
ReadBufferFromFile in(file_path->second);
readStringBinary(insert_query, in); /// NOTE: all files must have the same insert_query
parent.readQueryAndSettings(in, insert_settings, insert_query);
if (first)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, insert_query);
remote = std::make_unique<RemoteBlockOutputStream>(*connection, insert_query, &insert_settings);
remote->writePrefix();
}
@ -436,12 +457,13 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
size_t total_rows = 0;
size_t total_bytes = 0;
Block sample_block;
Settings insert_settings;
String insert_query;
try
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readStringBinary(insert_query, in);
readQueryAndSettings(in, insert_settings, insert_query);
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());
@ -468,7 +490,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
throw;
}
BatchHeader batch_header(std::move(insert_query), std::move(sample_block));
BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(sample_block));
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
batch.file_indices.push_back(file_idx);

View File

@ -7,6 +7,7 @@
#include <thread>
#include <mutex>
#include <condition_variable>
#include <IO/ReadBufferFromFile.h>
namespace DB
@ -57,6 +58,9 @@ private:
std::condition_variable cond;
Logger * log;
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
/// Read insert query and insert settings for backward compatible.
void readQueryAndSettings(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const;
};
}

View File

@ -59,10 +59,10 @@ namespace ErrorCodes
DistributedBlockOutputStream::DistributedBlockOutputStream(
StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), query_string(queryToString(query_ast)),
cluster(cluster_), settings(settings_), insert_sync(insert_sync_),
const Context & context_, StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
bool insert_sync_, UInt64 insert_timeout_)
: context(context_), storage(storage), query_ast(query_ast), query_string(queryToString(query_ast)),
cluster(cluster_), insert_sync(insert_sync_),
insert_timeout(insert_timeout_), log(&Logger::get("DistributedBlockOutputStream"))
{
}
@ -249,7 +249,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
auto connections = shard_info.pool->getMany(&settings, PoolMode::GET_ONE);
auto connections = shard_info.pool->getMany(&context.getSettingsRef(), PoolMode::GET_ONE);
if (connections.empty() || connections.front().isNull())
throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
@ -263,7 +263,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (!connection_pool)
throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR);
job.connection_entry = connection_pool->get(&settings);
job.connection_entry = connection_pool->get(&context.getSettingsRef());
if (job.connection_entry.isNull())
throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR);
}
@ -271,7 +271,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (throttler)
job.connection_entry->setThrottler(throttler);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, query_string, &settings);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, query_string, &context.getSettingsRef());
job.stream->writePrefix();
}
@ -283,8 +283,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (!job.stream)
{
/// Forward user settings
job.local_context = std::make_unique<Context>(storage.global_context);
job.local_context->setSettings(settings);
job.local_context = std::make_unique<Context>(context);
InterpreterInsertQuery interp(query_ast, *job.local_context);
job.stream = interp.execute().out;
@ -304,6 +303,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
void DistributedBlockOutputStream::writeSync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = shards_info.size();
@ -504,7 +504,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
{
/// Async insert does not support settings forwarding yet whereas sync one supports
InterpreterInsertQuery interp(query_ast, storage.global_context);
InterpreterInsertQuery interp(query_ast, context);
auto block_io = interp.execute();
block_io.out->writePrefix();
@ -553,6 +553,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out);
context.getSettingsRef().serialize(out);
writeStringBinary(query_string, out);
stream.writePrefix();

View File

@ -35,8 +35,8 @@ class StorageDistributed;
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_,
const Settings & settings_, bool insert_sync_, UInt64 insert_timeout_);
DistributedBlockOutputStream(const Context & context_, StorageDistributed & storage, const ASTPtr & query_ast,
const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_);
Block getHeader() const override;
void write(const Block & block) override;
@ -78,11 +78,11 @@ private:
std::string getCurrentStateDescription();
private:
const Context & context;
StorageDistributed & storage;
ASTPtr query_ast;
String query_string;
ClusterPtr cluster;
const Settings & settings;
size_t inserted_blocks = 0;
size_t inserted_rows = 0;

View File

@ -333,7 +333,8 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
*this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlock()), cluster, settings, insert_sync, timeout);
context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlock()), cluster,
insert_sync, timeout);
}