diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 5d3c156f665..b3e5fc67151 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -273,19 +274,11 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP __builtin_unreachable(); } -void DatabaseReplicated::createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper) +void DatabaseReplicated::createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper) { /// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info). DDLLogEntry entry{}; - - String query_path_prefix = zookeeper_path + "/log/query-"; - String counter_prefix = zookeeper_path + "/counter/cnt-"; - String counter_path = current_zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential); - String query_path = query_path_prefix + counter_path.substr(counter_prefix.size()); - - ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(query_path + "/committed", getFullReplicaName(), zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1)); + DatabaseReplicatedDDLWorker::enqueueQueryImpl(current_zookeeper, entry, this, true); } void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper) @@ -296,8 +289,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt Coordination::Requests ops; ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent)); - createEmptyLogEntry(ops, current_zookeeper); current_zookeeper->multi(ops); + createEmptyLogEntry(current_zookeeper); } void DatabaseReplicated::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach) @@ -325,10 +318,25 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_ if (!replicated_table || !create->storage->engine->arguments) return; - ASTs & args = create->storage->engine->arguments->children; + ASTs & args_ref = create->storage->engine->arguments->children; + ASTs args = args_ref; if (args.size() < 2) return; + /// It can be a constant expression. Try to evaluate it, ignore exception if we cannot. + bool has_expression_argument = args_ref[0]->as() || args_ref[1]->as(); + if (has_expression_argument) + { + try + { + args[0] = evaluateConstantExpressionAsLiteral(args_ref[0]->clone(), query_context); + args[1] = evaluateConstantExpressionAsLiteral(args_ref[1]->clone(), query_context); + } + catch (...) + { + } + } + ASTLiteral * arg1 = args[0]->as(); ASTLiteral * arg2 = args[1]->as(); if (!arg1 || !arg2 || arg1->value.getType() != Field::Types::String || arg2->value.getType() != Field::Types::String) @@ -356,12 +364,12 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_ if (maybe_shard_macros && maybe_replica_macros) return; - if (enable_functional_tests_helper) + if (enable_functional_tests_helper && !has_expression_argument) { if (maybe_path.empty() || maybe_path.back() != '/') maybe_path += '/'; - arg1->value = maybe_path + "auto_{shard}"; - arg2->value = maybe_replica + "auto_{replica}"; + args_ref[0]->as()->value = maybe_path + "auto_{shard}"; + args_ref[1]->as()->value = maybe_replica + "auto_{replica}"; return; } @@ -659,10 +667,8 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node void DatabaseReplicated::drop(ContextPtr context_) { auto current_zookeeper = getZooKeeper(); - Coordination::Requests ops; - ops.emplace_back(zkutil::makeSetRequest(replica_path, DROPPED_MARK, -1)); - createEmptyLogEntry(ops, current_zookeeper); - current_zookeeper->multi(ops); + current_zookeeper->set(replica_path, DROPPED_MARK, -1); + createEmptyLogEntry(current_zookeeper); DatabaseAtomic::drop(context_); diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index b1fe0e58b0e..41b1bf13e5f 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -78,7 +78,7 @@ private: ClusterPtr getClusterImpl() const; void setCluster(ClusterPtr && new_cluster); - void createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper); + void createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper); String zookeeper_path; String shard_name; diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 760300d6750..eb7e65e1b70 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include namespace fs = std::filesystem; @@ -72,25 +73,63 @@ void DatabaseReplicatedDDLWorker::initializeReplication() String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry) { auto zookeeper = getAndSetZooKeeper(); - const String query_path_prefix = queue_dir + "/query-"; + return enqueueQueryImpl(zookeeper, entry, database); +} + +String DatabaseReplicatedDDLWorker::enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry, + DatabaseReplicated * const database, bool committed) +{ + const String query_path_prefix = database->zookeeper_path + "/log/query-"; /// We cannot create sequential node and it's ephemeral child in a single transaction, so allocate sequential number another way String counter_prefix = database->zookeeper_path + "/counter/cnt-"; - String counter_path = zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential); + String counter_lock_path = database->zookeeper_path + "/counter_lock"; + + String counter_path; + size_t iters = 1000; + while (--iters) + { + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCreateRequest(counter_lock_path, database->getFullReplicaName(), zkutil::CreateMode::Ephemeral)); + ops.emplace_back(zkutil::makeCreateRequest(counter_prefix, "", zkutil::CreateMode::EphemeralSequential)); + Coordination::Responses res; + + Coordination::Error code = zookeeper->tryMulti(ops, res); + if (code == Coordination::Error::ZOK) + { + counter_path = dynamic_cast(*res.back()).path_created; + break; + } + else if (code != Coordination::Error::ZNODEEXISTS) + zkutil::KeeperMultiException::check(code, ops, res); + } + + if (iters == 0) + throw Exception(ErrorCodes::UNFINISHED, + "Cannot enqueue query, because some replica are trying to enqueue another query. " + "It may happen on high queries rate or, in rare cases, after connection loss. Client should retry."); + String node_path = query_path_prefix + counter_path.substr(counter_prefix.size()); + /// Now create task in queue Coordination::Requests ops; /// Query is not committed yet, but we have to write it into log to avoid reordering ops.emplace_back(zkutil::makeCreateRequest(node_path, entry.toString(), zkutil::CreateMode::Persistent)); /// '/try' will be replaced with '/committed' or will be removed due to expired session or other error - ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral)); + if (committed) + ops.emplace_back(zkutil::makeCreateRequest(node_path + "/committed", database->getFullReplicaName(), zkutil::CreateMode::Persistent)); + else + ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral)); /// We don't need it anymore ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1)); + /// Unlock counters + ops.emplace_back(zkutil::makeRemoveRequest(counter_lock_path, -1)); /// Create status dirs ops.emplace_back(zkutil::makeCreateRequest(node_path + "/active", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(node_path + "/finished", "", zkutil::CreateMode::Persistent)); zookeeper->multi(ops); + return node_path; } diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 16ad100b81a..4020906f9b2 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -29,6 +29,9 @@ public: void shutdown() override; + static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry, + DatabaseReplicated * const database, bool committed = false); + private: bool initializeMainThread() override; void initializeReplication(); diff --git a/tests/queries/0_stateless/00910_zookeeper_test_alter_compression_codecs_long.sql b/tests/queries/0_stateless/00910_zookeeper_test_alter_compression_codecs_long.sql index 085a79485fb..548f26eadd0 100644 --- a/tests/queries/0_stateless/00910_zookeeper_test_alter_compression_codecs_long.sql +++ b/tests/queries/0_stateless/00910_zookeeper_test_alter_compression_codecs_long.sql @@ -7,12 +7,12 @@ DROP TABLE IF EXISTS alter_compression_codec2; CREATE TABLE alter_compression_codec1 ( somedate Date CODEC(LZ4), id UInt64 CODEC(NONE) -) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs', '1') PARTITION BY somedate ORDER BY id; +) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs/{shard}', '1_{replica}') PARTITION BY somedate ORDER BY id; CREATE TABLE alter_compression_codec2 ( somedate Date CODEC(LZ4), id UInt64 CODEC(NONE) -) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs', '2') PARTITION BY somedate ORDER BY id; +) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_00910/'||currentDatabase()||'alter_compression_codecs/{shard}', '2_{replica}') PARTITION BY somedate ORDER BY id; INSERT INTO alter_compression_codec1 VALUES('2018-01-01', 1); INSERT INTO alter_compression_codec1 VALUES('2018-01-01', 2); diff --git a/tests/queries/0_stateless/01154_move_partition_long.sh b/tests/queries/0_stateless/01154_move_partition_long.sh index f666cc929cc..66ebbacee42 100755 --- a/tests/queries/0_stateless/01154_move_partition_long.sh +++ b/tests/queries/0_stateless/01154_move_partition_long.sh @@ -6,8 +6,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) declare -A engines engines[0]="MergeTree" -engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src', toString(randConstant()))" -engines[2]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src_' || toString(randConstant()), 'single_replica')" +engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src', '{replica}_' || toString(randConstant()))" +engines[2]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src_' || toString(randConstant()), '{replica}')" for ((i=0; i<16; i++)) do $CLICKHOUSE_CLIENT -q "CREATE TABLE dst_$i (p UInt64, k UInt64, v UInt64) diff --git a/tests/queries/0_stateless/01710_projection_fetch.reference b/tests/queries/0_stateless/01710_projection_fetch.reference index 54e5bff80a9..abce5410b26 100644 --- a/tests/queries/0_stateless/01710_projection_fetch.reference +++ b/tests/queries/0_stateless/01710_projection_fetch.reference @@ -10,8 +10,8 @@ 3 3 4 4 0 -CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 +CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 2 -CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 -CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 -CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01710_projection_fetch_default\', \'2\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 +CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 +CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n ),\n PROJECTION pp\n (\n SELECT \n x,\n count()\n GROUP BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 +CREATE TABLE default.tp_2\n(\n `x` Int32,\n `y` Int32,\n PROJECTION p\n (\n SELECT \n x,\n y\n ORDER BY x\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/01710_projection_fetch_default\', \'2_{replica}\')\nORDER BY y\nSETTINGS min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32, index_granularity = 8192 diff --git a/tests/queries/0_stateless/01710_projection_fetch.sql b/tests/queries/0_stateless/01710_projection_fetch.sql index 06790317808..7e4f6cc1d9a 100644 --- a/tests/queries/0_stateless/01710_projection_fetch.sql +++ b/tests/queries/0_stateless/01710_projection_fetch.sql @@ -1,9 +1,9 @@ drop table if exists tp_1; drop table if exists tp_2; -create table tp_1 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/01710_projection_fetch_' || currentDatabase(), '1') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32; +create table tp_1 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/01710_projection_fetch_' || currentDatabase(), '1_{replica}') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32; -create table tp_2 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/01710_projection_fetch_' || currentDatabase(), '2') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32; +create table tp_2 (x Int32, y Int32, projection p (select x, y order by x)) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/01710_projection_fetch_' || currentDatabase(), '2_{replica}') order by y settings min_rows_for_compact_part = 2, min_rows_for_wide_part = 4, min_bytes_for_compact_part = 16, min_bytes_for_wide_part = 32; insert into tp_1 select number, number from numbers(3); diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 6a278316387..a9a41b1ac1f 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -109,6 +109,7 @@ "01153_attach_mv_uuid" ], "database-replicated": [ + /// Unclassified "memory_tracking", "memory_usage", "live_view", @@ -167,8 +168,9 @@ /// Does not support renaming of multiple tables in single query "00634_rename_view", "00140_rename", - "01783_http_chunk_size", - "01710_projection_fetch" + /// Requires investigation + "00953_zookeeper_suetin_deduplication_bug", + "01783_http_chunk_size" ], "polymorphic-parts": [ "01508_partition_pruning_long", /// bug, shoud be fixed