mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Rewritten and improved sync distributed insert. [#CLICKHOUSE-3346]
This commit is contained in:
parent
f6a63c4d0c
commit
369f88f65d
@ -28,6 +28,7 @@ void RemoteBlockOutputStream::writePrefix()
|
||||
* Sample block is needed to know, what structure is required for blocks to be passed to 'write' method.
|
||||
*/
|
||||
|
||||
query_sent = true;
|
||||
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr);
|
||||
|
||||
Connection::Packet packet = connection.receivePacket();
|
||||
@ -93,6 +94,15 @@ void RemoteBlockOutputStream::writeSuffix()
|
||||
else
|
||||
throw NetException("Unexpected packet from server (expected EndOfStream or Exception, got "
|
||||
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
|
||||
|
||||
finished = true;
|
||||
}
|
||||
|
||||
RemoteBlockOutputStream::~RemoteBlockOutputStream()
|
||||
{
|
||||
/// If interrupted in the middle of the loop of communication with the server, then interrupt the connection
|
||||
if (query_sent && !finished)
|
||||
connection.disconnect();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Common/Throttler.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -33,11 +34,15 @@ public:
|
||||
/// Send pre-serialized and possibly pre-compressed block of data, that will be read from 'input'.
|
||||
void writePrepared(ReadBuffer & input, size_t size = 0);
|
||||
|
||||
~RemoteBlockOutputStream() override;
|
||||
|
||||
private:
|
||||
Connection & connection;
|
||||
String query;
|
||||
const Settings * settings;
|
||||
Block sample_block;
|
||||
bool query_sent = false;
|
||||
bool finished = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -210,26 +210,29 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
info.weight = weight;
|
||||
|
||||
if (address.is_local)
|
||||
{
|
||||
info.local_addresses.push_back(address);
|
||||
info.per_replica_pools = {nullptr};
|
||||
}
|
||||
else
|
||||
{
|
||||
ConnectionPoolPtrs pools;
|
||||
pools.push_back(std::make_shared<ConnectionPool>(
|
||||
ConnectionPoolPtr pool = std::make_shared<ConnectionPool>(
|
||||
settings.distributed_connections_pool_size,
|
||||
address.host_name, address.port, address.resolved_address,
|
||||
address.default_database, address.user, address.password,
|
||||
ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable);
|
||||
|
||||
info.pool = std::make_shared<ConnectionPoolWithFailover>(
|
||||
std::move(pools), settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
ConnectionPoolPtrs{pool}, settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
info.per_replica_pools = {std::move(pool)};
|
||||
}
|
||||
|
||||
if (weight)
|
||||
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
|
||||
|
||||
shards_info.push_back(info);
|
||||
addresses_with_failover.push_back(addresses);
|
||||
shards_info.emplace_back(std::move(info));
|
||||
addresses_with_failover.emplace_back(std::move(addresses));
|
||||
}
|
||||
else if (startsWith(key, "shard"))
|
||||
{
|
||||
@ -282,34 +285,42 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
|
||||
Addresses shard_local_addresses;
|
||||
|
||||
ConnectionPoolPtrs replicas;
|
||||
replicas.reserve(replica_addresses.size());
|
||||
ConnectionPoolPtrs remote_replicas_pools;
|
||||
ConnectionPoolPtrs all_replicas_pools;
|
||||
remote_replicas_pools.reserve(replica_addresses.size());
|
||||
all_replicas_pools.reserve(replica_addresses.size());
|
||||
|
||||
for (const auto & replica : replica_addresses)
|
||||
{
|
||||
if (replica.is_local)
|
||||
{
|
||||
shard_local_addresses.push_back(replica);
|
||||
all_replicas_pools.emplace_back(nullptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
replicas.emplace_back(std::make_shared<ConnectionPool>(
|
||||
auto replica_pool = std::make_shared<ConnectionPool>(
|
||||
settings.distributed_connections_pool_size,
|
||||
replica.host_name, replica.port, replica.resolved_address,
|
||||
replica.default_database, replica.user, replica.password,
|
||||
ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable);
|
||||
|
||||
remote_replicas_pools.emplace_back(replica_pool);
|
||||
all_replicas_pools.emplace_back(replica_pool);
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionPoolWithFailoverPtr shard_pool;
|
||||
if (!replicas.empty())
|
||||
if (!remote_replicas_pools.empty())
|
||||
shard_pool = std::make_shared<ConnectionPoolWithFailover>(
|
||||
std::move(replicas), settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
std::move(remote_replicas_pools), settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
|
||||
if (weight)
|
||||
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
|
||||
|
||||
shards_info.push_back({std::move(dir_name_for_internal_replication), current_shard_num, weight,
|
||||
shard_local_addresses, shard_pool, internal_replication});
|
||||
std::move(shard_local_addresses), std::move(shard_pool), std::move(all_replicas_pools), internal_replication});
|
||||
}
|
||||
else
|
||||
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
|
||||
@ -337,31 +348,38 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
|
||||
|
||||
addresses_with_failover.emplace_back(current);
|
||||
|
||||
ConnectionPoolPtrs replicas;
|
||||
replicas.reserve(current.size());
|
||||
|
||||
Addresses shard_local_addresses;
|
||||
ConnectionPoolPtrs all_replicas;
|
||||
ConnectionPoolPtrs remote_replicas;
|
||||
all_replicas.reserve(current.size());
|
||||
remote_replicas.reserve(current.size());
|
||||
|
||||
for (const auto & replica : current)
|
||||
{
|
||||
if (replica.is_local && !treat_local_as_shared)
|
||||
{
|
||||
shard_local_addresses.push_back(replica);
|
||||
all_replicas.emplace_back(nullptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
replicas.emplace_back(std::make_shared<ConnectionPool>(
|
||||
auto replica_pool = std::make_shared<ConnectionPool>(
|
||||
settings.distributed_connections_pool_size,
|
||||
replica.host_name, replica.port, replica.resolved_address,
|
||||
replica.default_database, replica.user, replica.password,
|
||||
ConnectionTimeouts::getHTTPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable);
|
||||
all_replicas.emplace_back(replica_pool);
|
||||
remote_replicas.emplace_back(replica_pool);
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
|
||||
std::move(replicas), settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
std::move(remote_replicas), settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
|
||||
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
|
||||
shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), shard_pool, false});
|
||||
shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), std::move(shard_pool),
|
||||
std::move(all_replicas), false});
|
||||
++current_shard_num;
|
||||
}
|
||||
|
||||
|
@ -99,6 +99,8 @@ public:
|
||||
Addresses local_addresses;
|
||||
/// nullptr if there are no remote addresses
|
||||
ConnectionPoolWithFailoverPtr pool;
|
||||
/// Connection pool for each replica, contains nullptr for local replicas
|
||||
ConnectionPoolPtrs per_replica_pools;
|
||||
bool has_internal_replication;
|
||||
};
|
||||
|
||||
|
@ -86,16 +86,104 @@ void DistributedBlockOutputStream::writeAsync(const Block & block)
|
||||
}
|
||||
|
||||
|
||||
ThreadPool::Job DistributedBlockOutputStream::createWritingJob(
|
||||
WritingJobContext & context, const Block & block, const Cluster::Address & address, size_t shard_id, size_t job_id)
|
||||
std::string DistributedBlockOutputStream::getCurrentStateDescription()
|
||||
{
|
||||
std::stringstream buffer;
|
||||
const auto & addresses = cluster->getShardsAddresses();
|
||||
|
||||
buffer << "Insertion status:\n";
|
||||
for (auto & shard_jobs : per_shard_jobs)
|
||||
for (JobInfo & job : shard_jobs)
|
||||
{
|
||||
buffer << "Wrote " << job.blocks_written << " blocks and " << job.rows_written << " rows"
|
||||
<< " on shard " << job.shard_index << " replica " << job.replica_index
|
||||
<< ", " << addresses[job.shard_index][job.replica_index].readableString() << "\n";
|
||||
}
|
||||
|
||||
return buffer.str();
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::initWritingJobs()
|
||||
{
|
||||
const auto & addresses_with_failovers = cluster->getShardsAddresses();
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
|
||||
remote_jobs_count = 0;
|
||||
local_jobs_count = 0;
|
||||
per_shard_jobs.resize(shards_info.size());
|
||||
|
||||
for (size_t shard_index : ext::range(0, shards_info.size()))
|
||||
{
|
||||
const auto & shard_info = shards_info[shard_index];
|
||||
auto & shard_jobs = per_shard_jobs[shard_index];
|
||||
|
||||
/// If hasInternalReplication, than prefer local replica
|
||||
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
|
||||
{
|
||||
const auto & replicas = addresses_with_failovers[shard_index];
|
||||
|
||||
for (size_t replica_index : ext::range(0, replicas.size()))
|
||||
{
|
||||
if (!replicas[replica_index].is_local)
|
||||
{
|
||||
shard_jobs.emplace_back(shard_index, replica_index, false);
|
||||
++remote_jobs_count;
|
||||
|
||||
if (shard_info.hasInternalReplication())
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
shard_jobs.emplace_back(shard_index, 0, true);
|
||||
++local_jobs_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::waitForJobs()
|
||||
{
|
||||
size_t jobs_count = remote_jobs_count + local_jobs_count;
|
||||
auto cond = [this, jobs_count] { return finished_jobs_count >= jobs_count; };
|
||||
|
||||
if (insert_timeout)
|
||||
{
|
||||
bool were_jobs_finished;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
were_jobs_finished = cond_var.wait_until(lock, deadline, cond);
|
||||
}
|
||||
|
||||
pool->wait();
|
||||
|
||||
if (!were_jobs_finished)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
|
||||
throw Exception("Synchronous distributed insert timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
cond_var.wait(lock, cond);
|
||||
pool->wait();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobInfo & job)
|
||||
{
|
||||
auto memory_tracker = current_memory_tracker;
|
||||
return [this, memory_tracker, & context, & block, & address, shard_id, job_id]()
|
||||
return [this, memory_tracker, &job]()
|
||||
{
|
||||
SCOPE_EXIT({
|
||||
std::lock_guard<std::mutex> lock(context.mutex);
|
||||
++context.finished_jobs_count;
|
||||
context.cond_var.notify_one();
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
++finished_jobs_count;
|
||||
cond_var.notify_one();
|
||||
});
|
||||
|
||||
if (!current_memory_tracker)
|
||||
@ -104,183 +192,106 @@ ThreadPool::Job DistributedBlockOutputStream::createWritingJob(
|
||||
setThreadName("DistrOutStrProc");
|
||||
}
|
||||
|
||||
const auto & shard_info = cluster->getShardsInfo()[shard_id];
|
||||
if (address.is_local)
|
||||
const auto & shard_info = cluster->getShardsInfo()[job.shard_index];
|
||||
const auto & addresses = cluster->getShardsAddresses();
|
||||
Block & block = current_blocks.at(job.shard_index);
|
||||
|
||||
if (!job.is_local_job)
|
||||
{
|
||||
writeToLocal(block, shard_info.getLocalNodeCount());
|
||||
context.done_local_jobs[job_id] = true;
|
||||
if (!job.stream)
|
||||
{
|
||||
if (shard_info.hasInternalReplication())
|
||||
{
|
||||
/// Skip replica_index in case of internal replication
|
||||
if (per_shard_jobs[job.shard_index].size() != 1)
|
||||
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
|
||||
auto connections = shard_info.pool->getMany(&settings, PoolMode::GET_ONE);
|
||||
if (connections.empty() || connections.front().isNull())
|
||||
throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
job.connection_entry = std::move(connections.front());
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & replica = addresses.at(job.shard_index).at(job.replica_index);
|
||||
|
||||
const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(job.replica_index);
|
||||
if (!connection_pool)
|
||||
throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
job.connection_entry = connection_pool->get(&settings);
|
||||
if (job.connection_entry.isNull())
|
||||
throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
if (throttler)
|
||||
job.connection_entry->setThrottler(throttler);
|
||||
|
||||
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, query_string, &settings);
|
||||
job.stream->writePrefix();
|
||||
}
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
||||
job.stream->write(block);
|
||||
}
|
||||
else
|
||||
{
|
||||
writeToShardSync(block, shard_info.hasInternalReplication()
|
||||
? shard_info.dir_name_for_internal_replication
|
||||
: address.toStringFull());
|
||||
context.done_remote_jobs[job_id] = true;
|
||||
if (!job.stream)
|
||||
{
|
||||
/// Forward user settings
|
||||
job.local_context = std::make_unique<Context>(storage.context);
|
||||
job.local_context->setSettings(settings);
|
||||
|
||||
InterpreterInsertQuery interp(query_ast, *job.local_context);
|
||||
job.stream = interp.execute().out;
|
||||
job.stream->writePrefix();
|
||||
}
|
||||
|
||||
size_t num_repetitions = shard_info.getLocalNodeCount();
|
||||
for (size_t i = 0; i < num_repetitions; ++i)
|
||||
job.stream->write(block);
|
||||
}
|
||||
|
||||
++job.blocks_written;
|
||||
job.rows_written += block.rows();
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
std::string DistributedBlockOutputStream::getCurrentStateDescription(const WritingJobContext & context)
|
||||
{
|
||||
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
|
||||
String description;
|
||||
WriteBufferFromString buffer(description);
|
||||
|
||||
buffer << "Insertion status:\n";
|
||||
|
||||
auto writeDescription = [&buffer](const Cluster::Address & address, size_t shard_id, size_t blocks_wrote)
|
||||
{
|
||||
buffer << "Wrote " << blocks_wrote << " blocks on shard " << shard_id << " replica ";
|
||||
buffer << address.toString() << '\n';
|
||||
};
|
||||
|
||||
const auto addresses_with_failovers = cluster->getShardsAddresses();
|
||||
|
||||
size_t remote_job_id = 0;
|
||||
size_t local_job_id = 0;
|
||||
for (size_t shard_id : ext::range(0, shards_info.size()))
|
||||
{
|
||||
const auto & shard_info = shards_info[shard_id];
|
||||
/// If hasInternalReplication, than prefer local replica
|
||||
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
|
||||
{
|
||||
for (const auto & address : addresses_with_failovers[shard_id])
|
||||
if (!address.is_local)
|
||||
{
|
||||
writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id] ? 1 : 0));
|
||||
++remote_job_id;
|
||||
if (shard_info.hasInternalReplication())
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
const auto & address = shard_info.local_addresses.front();
|
||||
writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id] ? 1 : 0));
|
||||
++local_job_id;
|
||||
}
|
||||
}
|
||||
|
||||
return description;
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context, const Blocks & blocks)
|
||||
{
|
||||
const auto & addresses_with_failovers = cluster->getShardsAddresses();
|
||||
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
|
||||
|
||||
size_t remote_job_id = 0;
|
||||
size_t local_job_id = 0;
|
||||
for (size_t shard_id : ext::range(0, blocks.size()))
|
||||
{
|
||||
const auto & shard_info = shards_info[shard_id];
|
||||
/// If hasInternalReplication, than prefer local replica
|
||||
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
|
||||
{
|
||||
for (const auto & address : addresses_with_failovers[shard_id])
|
||||
if (!address.is_local)
|
||||
{
|
||||
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id));
|
||||
++remote_job_id;
|
||||
if (shard_info.hasInternalReplication())
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (shards_info[shard_id].isLocal())
|
||||
{
|
||||
const auto & address = shards_info[shard_id].local_addresses.front();
|
||||
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id));
|
||||
++local_job_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::calculateJobsCount()
|
||||
{
|
||||
remote_jobs_count = 0;
|
||||
local_jobs_count = 0;
|
||||
|
||||
const auto & addresses_with_failovers = cluster->getShardsAddresses();
|
||||
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
for (size_t shard_id : ext::range(0, shards_info.size()))
|
||||
{
|
||||
const auto & shard_info = shards_info[shard_id];
|
||||
/// If hasInternalReplication, than prefer local replica
|
||||
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
|
||||
{
|
||||
for (const auto & address : addresses_with_failovers[shard_id])
|
||||
if (!address.is_local)
|
||||
{
|
||||
++remote_jobs_count;
|
||||
if (shard_info.hasInternalReplication())
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
local_jobs_count += shard_info.isLocal() ? 1 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::waitForUnfinishedJobs(WritingJobContext & context)
|
||||
{
|
||||
size_t jobs_count = remote_jobs_count + local_jobs_count;
|
||||
auto cond = [& context, jobs_count] { return context.finished_jobs_count == jobs_count; };
|
||||
|
||||
if (insert_timeout)
|
||||
{
|
||||
bool were_jobs_finished;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(context.mutex);
|
||||
were_jobs_finished = context.cond_var.wait_until(lock, deadline, cond);
|
||||
}
|
||||
if (!were_jobs_finished)
|
||||
{
|
||||
pool->wait();
|
||||
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
|
||||
throw Exception("Timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(context.mutex);
|
||||
context.cond_var.wait(lock, cond);
|
||||
}
|
||||
pool->wait();
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::writeSync(const Block & block)
|
||||
{
|
||||
if (!pool)
|
||||
{
|
||||
/// Deferred initialization. Only for sync insertion.
|
||||
calculateJobsCount();
|
||||
initWritingJobs();
|
||||
pool.emplace(remote_jobs_count + local_jobs_count);
|
||||
query_string = queryToString(query_ast);
|
||||
|
||||
if (!throttler && (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes))
|
||||
{
|
||||
throttler = std::make_shared<Throttler>(settings.limits.max_network_bandwidth, settings.limits.max_network_bytes,
|
||||
"Network bandwidth limit for a query exceeded.");
|
||||
}
|
||||
}
|
||||
|
||||
WritingJobContext context;
|
||||
context.done_remote_jobs.assign(remote_jobs_count, false);
|
||||
context.done_local_jobs.assign(local_jobs_count, false);
|
||||
context.finished_jobs_count = 0;
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
current_blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block});
|
||||
|
||||
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
|
||||
Blocks blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block});
|
||||
createWritingJobs(context, blocks);
|
||||
/// Run jobs in parallel for each block and wait them
|
||||
finished_jobs_count = 0;
|
||||
for (size_t shard_index : ext::range(0, current_blocks.size()))
|
||||
for (JobInfo & job : per_shard_jobs.at(shard_index))
|
||||
pool->schedule(runWritingJob(job));
|
||||
|
||||
try
|
||||
{
|
||||
waitForUnfinishedJobs(context);
|
||||
waitForJobs();
|
||||
}
|
||||
catch (Exception & exception)
|
||||
{
|
||||
exception.addMessage(getCurrentStateDescription(context));
|
||||
exception.addMessage(getCurrentStateDescription());
|
||||
throw;
|
||||
}
|
||||
|
||||
@ -288,6 +299,25 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::writeSuffix()
|
||||
{
|
||||
if (insert_sync && pool)
|
||||
{
|
||||
finished_jobs_count = 0;
|
||||
for (auto & shard_jobs : per_shard_jobs)
|
||||
for (JobInfo & job : shard_jobs)
|
||||
{
|
||||
if (job.stream)
|
||||
pool->schedule([&job] () { job.stream->writeSuffix(); });
|
||||
}
|
||||
|
||||
pool->wait();
|
||||
|
||||
LOG_DEBUG(&Logger::get("DistributedBlockOutputStream"), getCurrentStateDescription());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
|
||||
{
|
||||
storage.getShardingKeyExpr()->execute(block);
|
||||
@ -393,21 +423,10 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
|
||||
|
||||
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
|
||||
{
|
||||
std::unique_ptr<Context> local_context;
|
||||
std::optional<InterpreterInsertQuery> interp;
|
||||
|
||||
/// Async insert does not support settings forwarding yet whereas sync one supports
|
||||
if (insert_sync)
|
||||
interp.emplace(query_ast, storage.context);
|
||||
else
|
||||
{
|
||||
/// Overwrite global settings by user settings
|
||||
local_context = std::make_unique<Context>(storage.context);
|
||||
local_context->setSettings(settings);
|
||||
interp.emplace(query_ast, *local_context);
|
||||
}
|
||||
InterpreterInsertQuery interp(query_ast, storage.context);
|
||||
|
||||
auto block_io = interp->execute();
|
||||
auto block_io = interp.execute();
|
||||
block_io.out->writePrefix();
|
||||
|
||||
for (size_t i = 0; i < repeats; ++i)
|
||||
@ -417,22 +436,6 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::writeToShardSync(const Block & block, const std::string & connection_pool_name)
|
||||
{
|
||||
auto pool = storage.requireConnectionPool(connection_pool_name);
|
||||
auto connection = pool->get();
|
||||
|
||||
const auto & query_string = queryToString(query_ast);
|
||||
RemoteBlockOutputStream remote{*connection, query_string, &settings};
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
||||
|
||||
remote.writePrefix();
|
||||
remote.write(block);
|
||||
remote.writeSuffix();
|
||||
}
|
||||
|
||||
|
||||
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
|
||||
{
|
||||
/** tmp directory is used to ensure atomicity of transactions
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <common/ThreadPool.h>
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
@ -39,37 +40,15 @@ public:
|
||||
|
||||
void writePrefix() override;
|
||||
|
||||
void writeSuffix() override;
|
||||
|
||||
private:
|
||||
void writeAsync(const Block & block);
|
||||
|
||||
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
|
||||
void writeSync(const Block & block);
|
||||
|
||||
void calculateJobsCount();
|
||||
|
||||
struct WritingJobContext
|
||||
{
|
||||
/// Remote job per replica.
|
||||
std::vector<char> done_remote_jobs;
|
||||
/// Local job per shard.
|
||||
std::vector<char> done_local_jobs;
|
||||
std::atomic<unsigned> finished_jobs_count;
|
||||
std::mutex mutex;
|
||||
std::condition_variable cond_var;
|
||||
};
|
||||
|
||||
ThreadPool::Job createWritingJob(WritingJobContext & context, const Block & block,
|
||||
const Cluster::Address & address, size_t shard_id, size_t job_id);
|
||||
|
||||
void createWritingJobs(WritingJobContext & context, const Blocks & blocks);
|
||||
|
||||
void waitForUnfinishedJobs(WritingJobContext & context);
|
||||
|
||||
/// Returns the number of blocks was written for each cluster node. Uses during exception handling.
|
||||
std::string getCurrentStateDescription(const WritingJobContext & context);
|
||||
|
||||
IColumn::Selector createSelector(Block block);
|
||||
|
||||
|
||||
void writeAsync(const Block & block);
|
||||
|
||||
/// Split block between shards.
|
||||
Blocks splitBlock(const Block & block);
|
||||
|
||||
@ -82,21 +61,63 @@ private:
|
||||
|
||||
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
|
||||
|
||||
/// Performs synchronous insertion to remote node.
|
||||
void writeToShardSync(const Block & block, const std::string & connection_pool_name);
|
||||
|
||||
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
|
||||
void writeSync(const Block & block);
|
||||
|
||||
void initWritingJobs();
|
||||
|
||||
struct JobInfo;
|
||||
ThreadPool::Job runWritingJob(JobInfo & job);
|
||||
|
||||
void waitForJobs();
|
||||
|
||||
/// Returns the number of blocks was written for each cluster node. Uses during exception handling.
|
||||
std::string getCurrentStateDescription();
|
||||
|
||||
private:
|
||||
StorageDistributed & storage;
|
||||
ASTPtr query_ast;
|
||||
ClusterPtr cluster;
|
||||
const Settings & settings;
|
||||
bool insert_sync;
|
||||
UInt64 insert_timeout;
|
||||
size_t blocks_inserted = 0;
|
||||
|
||||
bool insert_sync;
|
||||
|
||||
/// Sync-related stuff
|
||||
UInt64 insert_timeout;
|
||||
std::chrono::steady_clock::time_point deadline;
|
||||
size_t remote_jobs_count;
|
||||
size_t local_jobs_count;
|
||||
std::optional<ThreadPool> pool;
|
||||
ThrottlerPtr throttler;
|
||||
String query_string;
|
||||
|
||||
struct JobInfo
|
||||
{
|
||||
JobInfo() = default;
|
||||
JobInfo(size_t shard_index, size_t replica_index, bool is_local_job)
|
||||
: shard_index(shard_index), replica_index(replica_index), is_local_job(is_local_job) {}
|
||||
|
||||
size_t shard_index = 0;
|
||||
size_t replica_index = 0;
|
||||
bool is_local_job = false;
|
||||
|
||||
ConnectionPool::Entry connection_entry;
|
||||
std::unique_ptr<Context> local_context;
|
||||
BlockOutputStreamPtr stream;
|
||||
|
||||
UInt64 blocks_written = 0;
|
||||
UInt64 rows_written = 0;
|
||||
};
|
||||
|
||||
std::vector<std::list<JobInfo>> per_shard_jobs;
|
||||
Blocks current_blocks;
|
||||
|
||||
size_t remote_jobs_count = 0;
|
||||
size_t local_jobs_count = 0;
|
||||
|
||||
std::atomic<unsigned> finished_jobs_count{0};
|
||||
std::mutex mutex;
|
||||
std::condition_variable cond_var;
|
||||
};
|
||||
|
||||
}
|
||||
|
9
dbms/tests/integration/test_insert_into_distributed_sync_async/test.py
Normal file → Executable file
9
dbms/tests/integration/test_insert_into_distributed_sync_async/test.py
Normal file → Executable file
@ -1,9 +1,12 @@
|
||||
#!/usr/bin/env python2
|
||||
import sys
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.test_tools import TSV
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException, QueryTimeoutExceedException
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user