Merge pull request #23152 from amosbird/distributedwriteha

Skip unavaiable replicas when writing to distributed tables
This commit is contained in:
Maksim Kita 2021-04-28 22:46:50 +03:00 committed by GitHub
commit 1f1a443798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 43 additions and 6 deletions

View File

@ -93,7 +93,8 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_)
UInt64 insert_timeout_,
StorageID main_table_)
: context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
@ -102,6 +103,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
, cluster(cluster_)
, insert_sync(insert_sync_)
, insert_timeout(insert_timeout_)
, main_table(main_table_)
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
const auto & settings = context->getSettingsRef();
@ -326,11 +328,11 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
if (connections.empty() || connections.front().isNull())
auto results = shard_info.pool->getManyChecked(timeouts, &settings, PoolMode::GET_ONE, main_table.getQualifiedName());
if (results.empty() || results.front().entry.isNull())
throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
job.connection_entry = std::move(connections.front());
job.connection_entry = std::move(results.front().entry);
}
else
{

View File

@ -44,7 +44,8 @@ public:
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_);
UInt64 insert_timeout_,
StorageID main_table_);
Block getHeader() const override;
void write(const Block & block) override;
@ -96,6 +97,7 @@ private:
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
StorageID main_table;
Stopwatch watch;
Stopwatch watch_current_block;
std::optional<ThreadPool> pool;

View File

@ -628,7 +628,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
local_context, *this, metadata_snapshot,
createInsertToRemoteTableQuery(
remote_database, remote_table, metadata_snapshot->getSampleBlockNonMaterialized()),
cluster, insert_sync, timeout);
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});
}

View File

@ -28,6 +28,19 @@
</replica>
</shard>
</shard_with_local_replica>
<shard_with_local_replica_internal_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</shard_with_local_replica_internal_replication>
<shard_with_low_cardinality>
<shard>
<replica>

View File

@ -75,6 +75,18 @@ CREATE TABLE table_function (n UInt8, s String) ENGINE = MergeTree() ORDER BY n'
node2.query('''
CREATE TABLE table_function (n UInt8, s String) ENGINE = MergeTree() ORDER BY n''')
node1.query('''
CREATE TABLE distributed_one_replica (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica_internal_replication', 'default', 'single_replicated')
''')
node2.query('''
CREATE TABLE distributed_one_replica (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica_internal_replication', 'default', 'single_replicated')
''')
node2.query('''
CREATE TABLE single_replicated(date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/single_replicated', 'node2', date, id, 8192)
''')
yield cluster
finally:
@ -162,6 +174,14 @@ def test_inserts_local(started_cluster):
assert instance.query("SELECT count(*) FROM local").strip() == '1'
def test_inserts_single_replica(started_cluster):
node1.query(
"INSERT INTO distributed_one_replica VALUES ('2000-01-01', 1)",
settings={"insert_distributed_sync": "1", "prefer_localhost_replica": "0"},
)
assert node2.query("SELECT count(*) FROM single_replicated").strip() == '1'
def test_prefer_localhost_replica(started_cluster):
test_query = "SELECT * FROM distributed ORDER BY id"