Skip unavaiable shards when writing to distributed tables

This commit is contained in:
Amos Bird 2021-04-15 22:31:23 +08:00
parent b67f40bae1
commit 096d76627e
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
3 changed files with 10 additions and 6 deletions

View File

@ -93,7 +93,8 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_)
UInt64 insert_timeout_,
StorageID main_table_)
: context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
@ -102,6 +103,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
, cluster(cluster_)
, insert_sync(insert_sync_)
, insert_timeout(insert_timeout_)
, main_table(main_table_)
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
const auto & settings = context->getSettingsRef();
@ -326,11 +328,11 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
if (connections.empty() || connections.front().isNull())
auto results = shard_info.pool->getManyChecked(timeouts, &settings, PoolMode::GET_ONE, main_table.getQualifiedName());
if (results.empty() || results.front().entry.isNull())
throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
job.connection_entry = std::move(connections.front());
job.connection_entry = std::move(results.front().entry);
}
else
{

View File

@ -44,7 +44,8 @@ public:
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_);
UInt64 insert_timeout_,
StorageID main_table_);
Block getHeader() const override;
void write(const Block & block) override;
@ -96,6 +97,7 @@ private:
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
StorageID main_table;
Stopwatch watch;
Stopwatch watch_current_block;
std::optional<ThreadPool> pool;

View File

@ -628,7 +628,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
local_context, *this, metadata_snapshot,
createInsertToRemoteTableQuery(
remote_database, remote_table, metadata_snapshot->getSampleBlockNonMaterialized()),
cluster, insert_sync, timeout);
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});
}