Merge pull request #24805 from ClickHouse/fix_database_replicated_enqueue_race

Fix race on enqueue query in Replicated database
This commit is contained in:
tavplubix 2021-06-02 13:44:19 +03:00 committed by GitHub
commit b0fd75f20a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 85 additions and 35 deletions

View File

@ -22,6 +22,7 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/formatAST.h>
#include <Common/Macros.h>
@ -273,19 +274,11 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP
__builtin_unreachable();
}
void DatabaseReplicated::createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper)
void DatabaseReplicated::createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper)
{
/// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info).
DDLLogEntry entry{};
String query_path_prefix = zookeeper_path + "/log/query-";
String counter_prefix = zookeeper_path + "/counter/cnt-";
String counter_path = current_zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential);
String query_path = query_path_prefix + counter_path.substr(counter_prefix.size());
ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(query_path + "/committed", getFullReplicaName(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
DatabaseReplicatedDDLWorker::enqueueQueryImpl(current_zookeeper, entry, this, true);
}
void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper)
@ -296,8 +289,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
createEmptyLogEntry(current_zookeeper);
}
void DatabaseReplicated::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
@ -325,10 +318,25 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_
if (!replicated_table || !create->storage->engine->arguments)
return;
ASTs & args = create->storage->engine->arguments->children;
ASTs & args_ref = create->storage->engine->arguments->children;
ASTs args = args_ref;
if (args.size() < 2)
return;
/// It can be a constant expression. Try to evaluate it, ignore exception if we cannot.
bool has_expression_argument = args_ref[0]->as<ASTFunction>() || args_ref[1]->as<ASTFunction>();
if (has_expression_argument)
{
try
{
args[0] = evaluateConstantExpressionAsLiteral(args_ref[0]->clone(), query_context);
args[1] = evaluateConstantExpressionAsLiteral(args_ref[1]->clone(), query_context);
}
catch (...)
{
}
}
ASTLiteral * arg1 = args[0]->as<ASTLiteral>();
ASTLiteral * arg2 = args[1]->as<ASTLiteral>();
if (!arg1 || !arg2 || arg1->value.getType() != Field::Types::String || arg2->value.getType() != Field::Types::String)
@ -356,12 +364,12 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_
if (maybe_shard_macros && maybe_replica_macros)
return;
if (enable_functional_tests_helper)
if (enable_functional_tests_helper && !has_expression_argument)
{
if (maybe_path.empty() || maybe_path.back() != '/')
maybe_path += '/';
arg1->value = maybe_path + "auto_{shard}";
arg2->value = maybe_replica + "auto_{replica}";
args_ref[0]->as<ASTLiteral>()->value = maybe_path + "auto_{shard}";
args_ref[1]->as<ASTLiteral>()->value = maybe_replica + "auto_{replica}";
return;
}
@ -659,10 +667,8 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
void DatabaseReplicated::drop(ContextPtr context_)
{
auto current_zookeeper = getZooKeeper();
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(replica_path, DROPPED_MARK, -1));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
current_zookeeper->set(replica_path, DROPPED_MARK, -1);
createEmptyLogEntry(current_zookeeper);
DatabaseAtomic::drop(context_);

View File

@ -78,7 +78,7 @@ private:
ClusterPtr getClusterImpl() const;
void setCluster(ClusterPtr && new_cluster);
void createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper);
void createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper);
String zookeeper_path;
String shard_name;

View File

@ -1,6 +1,7 @@
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/DDLTask.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -72,25 +73,63 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
{
auto zookeeper = getAndSetZooKeeper();
const String query_path_prefix = queue_dir + "/query-";
return enqueueQueryImpl(zookeeper, entry, database);
}
String DatabaseReplicatedDDLWorker::enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed)
{
const String query_path_prefix = database->zookeeper_path + "/log/query-";
/// We cannot create sequential node and it's ephemeral child in a single transaction, so allocate sequential number another way
String counter_prefix = database->zookeeper_path + "/counter/cnt-";
String counter_path = zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential);
String counter_lock_path = database->zookeeper_path + "/counter_lock";
String counter_path;
size_t iters = 1000;
while (--iters)
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(counter_lock_path, database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(counter_prefix, "", zkutil::CreateMode::EphemeralSequential));
Coordination::Responses res;
Coordination::Error code = zookeeper->tryMulti(ops, res);
if (code == Coordination::Error::ZOK)
{
counter_path = dynamic_cast<const Coordination::CreateResponse &>(*res.back()).path_created;
break;
}
else if (code != Coordination::Error::ZNODEEXISTS)
zkutil::KeeperMultiException::check(code, ops, res);
}
if (iters == 0)
throw Exception(ErrorCodes::UNFINISHED,
"Cannot enqueue query, because some replica are trying to enqueue another query. "
"It may happen on high queries rate or, in rare cases, after connection loss. Client should retry.");
String node_path = query_path_prefix + counter_path.substr(counter_prefix.size());
/// Now create task in queue
Coordination::Requests ops;
/// Query is not committed yet, but we have to write it into log to avoid reordering
ops.emplace_back(zkutil::makeCreateRequest(node_path, entry.toString(), zkutil::CreateMode::Persistent));
/// '/try' will be replaced with '/committed' or will be removed due to expired session or other error
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
if (committed)
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/committed", database->getFullReplicaName(), zkutil::CreateMode::Persistent));
else
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
/// We don't need it anymore
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
/// Unlock counters
ops.emplace_back(zkutil::makeRemoveRequest(counter_lock_path, -1));
/// Create status dirs
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/active", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/finished", "", zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
return node_path;
}

View File

@ -29,6 +29,9 @@ public:
void shutdown() override;
static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed = false);
private:
bool initializeMainThread() override;
void initializeReplication();

View File

@ -7,12 +7,12 @@ DROP TABLE IF EXISTS alter_compression_codec2;
CREATE TABLE alter_compression_codec1 (
somedate Date CODEC(LZ4),
id UInt64 CODEC(NONE)
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs', '1') PARTITION BY somedate ORDER BY id;
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs/{shard}', '1_{replica}') PARTITION BY somedate ORDER BY id;
CREATE TABLE alter_compression_codec2 (
somedate Date CODEC(LZ4),
id UInt64 CODEC(NONE)
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs', '2') PARTITION BY somedate ORDER BY id;
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs/{shard}', '2_{replica}') PARTITION BY somedate ORDER BY id;
INSERT INTO alter_compression_codec1 VALUES('2018-01-01', 1);
INSERT INTO alter_compression_codec1 VALUES('2018-01-01', 2);

View File

@ -6,8 +6,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
declare -A engines
engines[0]="MergeTree"
engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src', toString(randConstant()))"
engines[2]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src_' || toString(randConstant()), 'single_replica')"
engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src', '{replica}_' || toString(randConstant()))"
engines[2]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src_' || toString(randConstant()), '{replica}')"
for ((i=0; i<16; i++)) do
$CLICKHOUSE_CLIENT -q "CREATE TABLE dst_$i (p UInt64, k UInt64, v UInt64)

View File

@ -10,8 +10,8 @@
3 3
4 4
0
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192
2
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192
CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192

View File

@ -1,9 +1,9 @@
drop table if exists tp_1;
drop table if exists tp_2;
create table tp_1 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/01710_projection_fetch_' || currentDatabase(), '1') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32;
create table tp_1 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/01710_projection_fetch_' || currentDatabase(), '1_{replica}') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32;
create table tp_2 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/01710_projection_fetch_' || currentDatabase(), '2') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32;
create table tp_2 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/01710_projection_fetch_' || currentDatabase(), '2_{replica}') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32;
insert into tp_1 select number, number from numbers(3);

View File

@ -109,6 +109,7 @@
"01153_attach_mv_uuid"
],
"database-replicated": [
/// Unclassified
"memory_tracking",
"memory_usage",
"live_view",
@ -167,8 +168,9 @@
/// Does not support renaming of multiple tables in single query
"00634_rename_view",
"00140_rename",
"01783_http_chunk_size",
"01710_projection_fetch"
/// Requires investigation
"00953_zookeeper_suetin_deduplication_bug",
"01783_http_chunk_size"
],
"polymorphic-parts": [
"01508_partition_pruning_long", /// bug, shoud be fixed