mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652)
* initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
This commit is contained in:
parent
3a2adfb602
commit
4c391f8e99
@ -38,6 +38,7 @@ The list of available `SYSTEM` statements:
|
|||||||
- [START REPLICATION QUEUES](#query_language-system-start-replication-queues)
|
- [START REPLICATION QUEUES](#query_language-system-start-replication-queues)
|
||||||
- [SYNC REPLICA](#query_language-system-sync-replica)
|
- [SYNC REPLICA](#query_language-system-sync-replica)
|
||||||
- [RESTART REPLICA](#query_language-system-restart-replica)
|
- [RESTART REPLICA](#query_language-system-restart-replica)
|
||||||
|
- [RESTORE REPLICA](#query_language-system-restore-replica)
|
||||||
- [RESTART REPLICAS](#query_language-system-restart-replicas)
|
- [RESTART REPLICAS](#query_language-system-restart-replicas)
|
||||||
|
|
||||||
## RELOAD EMBEDDED DICTIONARIES {#query_language-system-reload-emdedded-dictionaries}
|
## RELOAD EMBEDDED DICTIONARIES {#query_language-system-reload-emdedded-dictionaries}
|
||||||
@ -290,13 +291,60 @@ After running this statement the `[db.]replicated_merge_tree_family_table_name`
|
|||||||
|
|
||||||
### RESTART REPLICA {#query_language-system-restart-replica}
|
### RESTART REPLICA {#query_language-system-restart-replica}
|
||||||
|
|
||||||
Provides possibility to reinitialize Zookeeper sessions state for `ReplicatedMergeTree` table, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed
|
Provides possibility to reinitialize Zookeeper sessions state for `ReplicatedMergeTree` table, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed.
|
||||||
Initialization replication quene based on ZooKeeper date happens in the same way as `ATTACH TABLE` statement. For a short time the table will be unavailable for any operations.
|
Initialization replication queue based on ZooKeeper date happens in the same way as `ATTACH TABLE` statement. For a short time the table will be unavailable for any operations.
|
||||||
|
|
||||||
``` sql
|
``` sql
|
||||||
SYSTEM RESTART REPLICA [db.]replicated_merge_tree_family_table_name
|
SYSTEM RESTART REPLICA [db.]replicated_merge_tree_family_table_name
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### RESTORE REPLICA {#query_language-system-restore-replica}
|
||||||
|
|
||||||
|
Restores a replica if data is [possibly] present but Zookeeper metadata is lost.
|
||||||
|
|
||||||
|
Works only on readonly `ReplicatedMergeTree` tables.
|
||||||
|
|
||||||
|
One may execute query after:
|
||||||
|
|
||||||
|
- ZooKeeper root `/` loss.
|
||||||
|
- Replicas path `/replicas` loss.
|
||||||
|
- Individual replica path `/replicas/replica_name/` loss.
|
||||||
|
|
||||||
|
Replica attaches locally found parts and sends info about them to Zookeeper.
|
||||||
|
Parts present on replica before metadata loss are not re-fetched from other replicas if not being outdated
|
||||||
|
(so replica restoration does not mean re-downloading all data over the network).
|
||||||
|
|
||||||
|
Caveat: parts in all states are moved to `detached/` folder. Parts active before data loss (Committed) are attached.
|
||||||
|
|
||||||
|
#### Syntax
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SYSTEM RESTORE REPLICA [db.]replicated_merge_tree_family_table_name [ON CLUSTER cluster_name]
|
||||||
|
```
|
||||||
|
|
||||||
|
Alternative syntax:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SYSTEM RESTORE REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_table_name
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Example
|
||||||
|
|
||||||
|
```sql
|
||||||
|
-- Creating table on multiple servers
|
||||||
|
|
||||||
|
CREATE TABLE test(n UInt32)
|
||||||
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/', '{replica}')
|
||||||
|
ORDER BY n PARTITION BY n % 10;
|
||||||
|
|
||||||
|
INSERT INTO test SELECT * FROM numbers(1000);
|
||||||
|
|
||||||
|
-- zookeeper_delete_path("/clickhouse/tables/test", recursive=True) <- root loss.
|
||||||
|
|
||||||
|
SYSTEM RESTART REPLICA test; -- Table will attach as readonly as metadata is missing.
|
||||||
|
SYSTEM RESTORE REPLICA test; -- Need to execute on every replica, another way: RESTORE REPLICA test ON CLUSTER cluster
|
||||||
|
```
|
||||||
|
|
||||||
### RESTART REPLICAS {#query_language-system-restart-replicas}
|
### RESTART REPLICAS {#query_language-system-restart-replicas}
|
||||||
|
|
||||||
Provides possibility to reinitialize Zookeeper sessions state for all `ReplicatedMergeTree` tables, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed
|
Provides possibility to reinitialize Zookeeper sessions state for all `ReplicatedMergeTree` tables, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed
|
||||||
|
@ -33,6 +33,7 @@ option (ENABLE_CLICKHOUSE_OBFUSCATOR "Table data obfuscator (convert real data t
|
|||||||
${ENABLE_CLICKHOUSE_ALL})
|
${ENABLE_CLICKHOUSE_ALL})
|
||||||
|
|
||||||
# https://clickhouse.tech/docs/en/operations/utilities/odbc-bridge/
|
# https://clickhouse.tech/docs/en/operations/utilities/odbc-bridge/
|
||||||
|
# TODO Also needs NANODBC.
|
||||||
if (ENABLE_ODBC)
|
if (ENABLE_ODBC)
|
||||||
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver"
|
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver"
|
||||||
${ENABLE_CLICKHOUSE_ALL})
|
${ENABLE_CLICKHOUSE_ALL})
|
||||||
|
@ -154,6 +154,7 @@ enum class AccessType
|
|||||||
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
|
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
|
||||||
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
|
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
|
||||||
M(SYSTEM_RESTART_REPLICA, "RESTART REPLICA", TABLE, SYSTEM) \
|
M(SYSTEM_RESTART_REPLICA, "RESTART REPLICA", TABLE, SYSTEM) \
|
||||||
|
M(SYSTEM_RESTORE_REPLICA, "RESTORE REPLICA", TABLE, SYSTEM) \
|
||||||
M(SYSTEM_FLUSH_DISTRIBUTED, "FLUSH DISTRIBUTED", TABLE, SYSTEM_FLUSH) \
|
M(SYSTEM_FLUSH_DISTRIBUTED, "FLUSH DISTRIBUTED", TABLE, SYSTEM_FLUSH) \
|
||||||
M(SYSTEM_FLUSH_LOGS, "FLUSH LOGS", GLOBAL, SYSTEM_FLUSH) \
|
M(SYSTEM_FLUSH_LOGS, "FLUSH LOGS", GLOBAL, SYSTEM_FLUSH) \
|
||||||
M(SYSTEM_FLUSH, "", GROUP, SYSTEM) \
|
M(SYSTEM_FLUSH, "", GROUP, SYSTEM) \
|
||||||
|
@ -554,6 +554,7 @@
|
|||||||
M(584, PROJECTION_NOT_USED) \
|
M(584, PROJECTION_NOT_USED) \
|
||||||
M(585, CANNOT_PARSE_YAML) \
|
M(585, CANNOT_PARSE_YAML) \
|
||||||
M(586, CANNOT_CREATE_FILE) \
|
M(586, CANNOT_CREATE_FILE) \
|
||||||
|
M(587, CONCURRENT_ACCESS_NOT_SUPPORTED) \
|
||||||
\
|
\
|
||||||
M(998, POSTGRESQL_CONNECTION_FAILURE) \
|
M(998, POSTGRESQL_CONNECTION_FAILURE) \
|
||||||
M(999, KEEPER_EXCEPTION) \
|
M(999, KEEPER_EXCEPTION) \
|
||||||
|
@ -831,14 +831,17 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
|||||||
if (create.attach && !create.storage && !create.columns_list)
|
if (create.attach && !create.storage && !create.columns_list)
|
||||||
{
|
{
|
||||||
auto database = DatabaseCatalog::instance().getDatabase(database_name);
|
auto database = DatabaseCatalog::instance().getDatabase(database_name);
|
||||||
|
|
||||||
if (database->getEngineName() == "Replicated")
|
if (database->getEngineName() == "Replicated")
|
||||||
{
|
{
|
||||||
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.table);
|
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.table);
|
||||||
if (typeid_cast<DatabaseReplicated *>(database.get()) && getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
|
|
||||||
|
if (auto* ptr = typeid_cast<DatabaseReplicated *>(database.get());
|
||||||
|
ptr && getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
|
||||||
{
|
{
|
||||||
create.database = database_name;
|
create.database = database_name;
|
||||||
guard->releaseTableLock();
|
guard->releaseTableLock();
|
||||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
return ptr->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -926,11 +929,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
|||||||
if (need_add_to_database && database->getEngineName() == "Replicated")
|
if (need_add_to_database && database->getEngineName() == "Replicated")
|
||||||
{
|
{
|
||||||
auto guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table);
|
auto guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table);
|
||||||
if (typeid_cast<DatabaseReplicated *>(database.get()) && getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
|
|
||||||
|
if (auto * ptr = typeid_cast<DatabaseReplicated *>(database.get());
|
||||||
|
ptr && getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
|
||||||
{
|
{
|
||||||
assertOrSetUUID(create, database);
|
assertOrSetUUID(create, database);
|
||||||
guard->releaseTableLock();
|
guard->releaseTableLock();
|
||||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
return ptr->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -992,8 +997,10 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
|||||||
}
|
}
|
||||||
|
|
||||||
data_path = database->getTableDataPath(create);
|
data_path = database->getTableDataPath(create);
|
||||||
|
|
||||||
if (!create.attach && !data_path.empty() && fs::exists(fs::path{getContext()->getPath()} / data_path))
|
if (!create.attach && !data_path.empty() && fs::exists(fs::path{getContext()->getPath()} / data_path))
|
||||||
throw Exception(storage_already_exists_error_code, "Directory for {} data {} already exists", Poco::toLower(storage_name), String(data_path));
|
throw Exception(storage_already_exists_error_code,
|
||||||
|
"Directory for {} data {} already exists", Poco::toLower(storage_name), String(data_path));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -189,12 +189,11 @@ BlockIO InterpreterInsertQuery::execute()
|
|||||||
const auto & union_modes = select_query.list_of_modes;
|
const auto & union_modes = select_query.list_of_modes;
|
||||||
|
|
||||||
/// ASTSelectWithUnionQuery is not normalized now, so it may pass some queries which can be Trivial select queries
|
/// ASTSelectWithUnionQuery is not normalized now, so it may pass some queries which can be Trivial select queries
|
||||||
is_trivial_insert_select
|
const auto mode_is_all = [](const auto & mode) { return mode == ASTSelectWithUnionQuery::Mode::ALL; };
|
||||||
= std::all_of(
|
|
||||||
union_modes.begin(),
|
is_trivial_insert_select =
|
||||||
union_modes.end(),
|
std::all_of(union_modes.begin(), union_modes.end(), std::move(mode_is_all))
|
||||||
[](const ASTSelectWithUnionQuery::Mode & mode) { return mode == ASTSelectWithUnionQuery::Mode::ALL; })
|
&& std::all_of(selects.begin(), selects.end(), isTrivialSelect);
|
||||||
&& std::all_of(selects.begin(), selects.end(), [](const ASTPtr & select) { return isTrivialSelect(select); });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (is_trivial_insert_select)
|
if (is_trivial_insert_select)
|
||||||
|
@ -43,11 +43,8 @@
|
|||||||
# include "config_core.h"
|
# include "config_core.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
@ -56,6 +53,7 @@ namespace ErrorCodes
|
|||||||
extern const int NOT_IMPLEMENTED;
|
extern const int NOT_IMPLEMENTED;
|
||||||
extern const int TIMEOUT_EXCEEDED;
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
extern const int TABLE_WAS_NOT_DROPPED;
|
extern const int TABLE_WAS_NOT_DROPPED;
|
||||||
|
extern const int NO_ZOOKEEPER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -131,6 +129,8 @@ AccessType getRequiredAccessType(StorageActionBlockType action_type)
|
|||||||
throw Exception("Unknown action type: " + std::to_string(action_type), ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Unknown action type: " + std::to_string(action_type), ErrorCodes::LOGICAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
constexpr std::string_view table_is_not_replicated = "Table {} is not replicated";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Implements SYSTEM [START|STOP] <something action from ActionLocks>
|
/// Implements SYSTEM [START|STOP] <something action from ActionLocks>
|
||||||
@ -212,11 +212,16 @@ BlockIO InterpreterSystemQuery::execute()
|
|||||||
system_context->setSetting("profile", getContext()->getSystemProfileName());
|
system_context->setSetting("profile", getContext()->getSystemProfileName());
|
||||||
|
|
||||||
/// Make canonical query for simpler processing
|
/// Make canonical query for simpler processing
|
||||||
if (!query.table.empty())
|
if (query.type == Type::RELOAD_DICTIONARY)
|
||||||
|
{
|
||||||
|
if (!query.database.empty())
|
||||||
|
query.table = query.database + "." + query.table;
|
||||||
|
}
|
||||||
|
else if (!query.table.empty())
|
||||||
|
{
|
||||||
table_id = getContext()->resolveStorageID(StorageID(query.database, query.table), Context::ResolveOrdinary);
|
table_id = getContext()->resolveStorageID(StorageID(query.database, query.table), Context::ResolveOrdinary);
|
||||||
|
}
|
||||||
|
|
||||||
if (!query.target_dictionary.empty() && !query.database.empty())
|
|
||||||
query.target_dictionary = query.database + "." + query.target_dictionary;
|
|
||||||
|
|
||||||
volume_ptr = {};
|
volume_ptr = {};
|
||||||
if (!query.storage_policy.empty() && !query.volume.empty())
|
if (!query.storage_policy.empty() && !query.volume.empty())
|
||||||
@ -286,7 +291,7 @@ BlockIO InterpreterSystemQuery::execute()
|
|||||||
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
|
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
|
||||||
|
|
||||||
auto & external_dictionaries_loader = system_context->getExternalDictionariesLoader();
|
auto & external_dictionaries_loader = system_context->getExternalDictionariesLoader();
|
||||||
external_dictionaries_loader.reloadDictionary(query.target_dictionary, getContext());
|
external_dictionaries_loader.reloadDictionary(query.table, getContext());
|
||||||
|
|
||||||
|
|
||||||
ExternalDictionariesLoader::resetAll();
|
ExternalDictionariesLoader::resetAll();
|
||||||
@ -296,8 +301,8 @@ BlockIO InterpreterSystemQuery::execute()
|
|||||||
{
|
{
|
||||||
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
|
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
|
||||||
executeCommandsAndThrowIfError(
|
executeCommandsAndThrowIfError(
|
||||||
[&] () { system_context->getExternalDictionariesLoader().reloadAllTriedToLoad(); },
|
[&] { system_context->getExternalDictionariesLoader().reloadAllTriedToLoad(); },
|
||||||
[&] () { system_context->getEmbeddedDictionaries().reload(); }
|
[&] { system_context->getEmbeddedDictionaries().reload(); }
|
||||||
);
|
);
|
||||||
ExternalDictionariesLoader::resetAll();
|
ExternalDictionariesLoader::resetAll();
|
||||||
break;
|
break;
|
||||||
@ -392,8 +397,10 @@ BlockIO InterpreterSystemQuery::execute()
|
|||||||
break;
|
break;
|
||||||
case Type::RESTART_REPLICA:
|
case Type::RESTART_REPLICA:
|
||||||
if (!tryRestartReplica(table_id, system_context))
|
if (!tryRestartReplica(table_id, system_context))
|
||||||
throw Exception("There is no " + query.database + "." + query.table + " replicated table",
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
|
||||||
ErrorCodes::BAD_ARGUMENTS);
|
break;
|
||||||
|
case Type::RESTORE_REPLICA:
|
||||||
|
restoreReplica();
|
||||||
break;
|
break;
|
||||||
case Type::RESTART_DISK:
|
case Type::RESTART_DISK:
|
||||||
restartDisk(query.disk);
|
restartDisk(query.disk);
|
||||||
@ -402,14 +409,14 @@ BlockIO InterpreterSystemQuery::execute()
|
|||||||
{
|
{
|
||||||
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
|
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
|
||||||
executeCommandsAndThrowIfError(
|
executeCommandsAndThrowIfError(
|
||||||
[&] () { if (auto query_log = getContext()->getQueryLog()) query_log->flush(true); },
|
[&] { if (auto query_log = getContext()->getQueryLog()) query_log->flush(true); },
|
||||||
[&] () { if (auto part_log = getContext()->getPartLog("")) part_log->flush(true); },
|
[&] { if (auto part_log = getContext()->getPartLog("")) part_log->flush(true); },
|
||||||
[&] () { if (auto query_thread_log = getContext()->getQueryThreadLog()) query_thread_log->flush(true); },
|
[&] { if (auto query_thread_log = getContext()->getQueryThreadLog()) query_thread_log->flush(true); },
|
||||||
[&] () { if (auto trace_log = getContext()->getTraceLog()) trace_log->flush(true); },
|
[&] { if (auto trace_log = getContext()->getTraceLog()) trace_log->flush(true); },
|
||||||
[&] () { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); },
|
[&] { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); },
|
||||||
[&] () { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); },
|
[&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); },
|
||||||
[&] () { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
|
[&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
|
||||||
[&] () { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
|
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -423,12 +430,51 @@ BlockIO InterpreterSystemQuery::execute()
|
|||||||
return BlockIO();
|
return BlockIO();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void InterpreterSystemQuery::restoreReplica()
|
||||||
|
{
|
||||||
|
getContext()->checkAccess(AccessType::SYSTEM_RESTORE_REPLICA, table_id);
|
||||||
|
|
||||||
|
const zkutil::ZooKeeperPtr& zookeeper = getContext()->getZooKeeper();
|
||||||
|
|
||||||
|
if (zookeeper->expired())
|
||||||
|
throw Exception(ErrorCodes::NO_ZOOKEEPER,
|
||||||
|
"Cannot restore table metadata because ZooKeeper session has expired");
|
||||||
|
|
||||||
|
const StoragePtr table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext());
|
||||||
|
|
||||||
|
auto * const table_replicated_ptr = dynamic_cast<StorageReplicatedMergeTree *>(table_ptr.get());
|
||||||
|
|
||||||
|
if (table_replicated_ptr == nullptr)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
|
||||||
|
|
||||||
|
auto & table_replicated = *table_replicated_ptr;
|
||||||
|
|
||||||
|
StorageReplicatedMergeTree::Status status;
|
||||||
|
table_replicated.getStatus(status);
|
||||||
|
|
||||||
|
if (!status.is_readonly)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica must be readonly");
|
||||||
|
|
||||||
|
const String replica_name = table_replicated.getReplicaName();
|
||||||
|
const String& zk_root_path = status.zookeeper_path;
|
||||||
|
|
||||||
|
if (String replica_path = zk_root_path + "replicas/" + replica_name; zookeeper->exists(replica_path))
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||||
|
"Replica path is present at {} -- nothing to restore. "
|
||||||
|
"If you are sure that metadata it lost and replica path contain some garbage, "
|
||||||
|
"then use SYSTEM DROP REPLICA query first.", replica_path);
|
||||||
|
|
||||||
|
table_replicated.restoreMetadataInZooKeeper();
|
||||||
|
}
|
||||||
|
|
||||||
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context, bool need_ddl_guard)
|
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context, bool need_ddl_guard)
|
||||||
{
|
{
|
||||||
getContext()->checkAccess(AccessType::SYSTEM_RESTART_REPLICA, replica);
|
getContext()->checkAccess(AccessType::SYSTEM_RESTART_REPLICA, replica);
|
||||||
|
|
||||||
auto table_ddl_guard = need_ddl_guard ? DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName()) : nullptr;
|
auto table_ddl_guard = need_ddl_guard
|
||||||
|
? DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName())
|
||||||
|
: nullptr;
|
||||||
|
|
||||||
auto [database, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(replica, getContext());
|
auto [database, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(replica, getContext());
|
||||||
ASTPtr create_ast;
|
ASTPtr create_ast;
|
||||||
|
|
||||||
@ -475,28 +521,23 @@ void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context)
|
|||||||
auto & catalog = DatabaseCatalog::instance();
|
auto & catalog = DatabaseCatalog::instance();
|
||||||
|
|
||||||
for (auto & elem : catalog.getDatabases())
|
for (auto & elem : catalog.getDatabases())
|
||||||
{
|
for (auto it = elem.second->getTablesIterator(getContext()); it->isValid(); it->next())
|
||||||
DatabasePtr & database = elem.second;
|
if (dynamic_cast<const StorageReplicatedMergeTree *>(it->table().get()))
|
||||||
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
|
replica_names.emplace_back(it->databaseName(), it->name());
|
||||||
{
|
|
||||||
if (auto table = iterator->table())
|
|
||||||
{
|
|
||||||
if (dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
|
|
||||||
replica_names.emplace_back(StorageID{iterator->databaseName(), iterator->name()});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (replica_names.empty())
|
if (replica_names.empty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
TableGuards guards;
|
TableGuards guards;
|
||||||
|
|
||||||
for (const auto & name : replica_names)
|
for (const auto & name : replica_names)
|
||||||
guards.emplace(UniqueTableName{name.database_name, name.table_name}, nullptr);
|
guards.emplace(UniqueTableName{name.database_name, name.table_name}, nullptr);
|
||||||
|
|
||||||
for (auto & guard : guards)
|
for (auto & guard : guards)
|
||||||
guard.second = catalog.getDDLGuard(guard.first.database_name, guard.first.table_name);
|
guard.second = catalog.getDDLGuard(guard.first.database_name, guard.first.table_name);
|
||||||
|
|
||||||
ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size()));
|
ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size()));
|
||||||
|
|
||||||
for (auto & replica : replica_names)
|
for (auto & replica : replica_names)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Restarting replica on {}", replica.getNameForLogs());
|
LOG_TRACE(log, "Restarting replica on {}", replica.getNameForLogs());
|
||||||
@ -516,7 +557,7 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
|
|||||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
||||||
|
|
||||||
if (!dropReplicaImpl(query, table))
|
if (!dropReplicaImpl(query, table))
|
||||||
throw Exception("Table " + table_id.getNameForLogs() + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
|
||||||
}
|
}
|
||||||
else if (!query.database.empty())
|
else if (!query.database.empty())
|
||||||
{
|
{
|
||||||
@ -628,7 +669,7 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery &)
|
|||||||
LOG_TRACE(log, "SYNC REPLICA {}: OK", table_id.getNameForLogs());
|
LOG_TRACE(log, "SYNC REPLICA {}: OK", table_id.getNameForLogs());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
throw Exception("Table " + table_id.getNameForLogs() + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
|
||||||
}
|
}
|
||||||
|
|
||||||
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
|
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
|
||||||
@ -659,6 +700,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
|
|||||||
const auto & query = query_ptr->as<const ASTSystemQuery &>();
|
const auto & query = query_ptr->as<const ASTSystemQuery &>();
|
||||||
using Type = ASTSystemQuery::Type;
|
using Type = ASTSystemQuery::Type;
|
||||||
AccessRightsElements required_access;
|
AccessRightsElements required_access;
|
||||||
|
|
||||||
switch (query.type)
|
switch (query.type)
|
||||||
{
|
{
|
||||||
case Type::SHUTDOWN: [[fallthrough]];
|
case Type::SHUTDOWN: [[fallthrough]];
|
||||||
@ -770,6 +812,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
|
|||||||
required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.database, query.table);
|
required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.database, query.table);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case Type::RESTORE_REPLICA:
|
||||||
|
{
|
||||||
|
required_access.emplace_back(AccessType::SYSTEM_RESTORE_REPLICA, query.database, query.table);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case Type::SYNC_REPLICA:
|
case Type::SYNC_REPLICA:
|
||||||
{
|
{
|
||||||
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.database, query.table);
|
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.database, query.table);
|
||||||
|
@ -49,6 +49,9 @@ private:
|
|||||||
|
|
||||||
void restartReplicas(ContextMutablePtr system_context);
|
void restartReplicas(ContextMutablePtr system_context);
|
||||||
void syncReplica(ASTSystemQuery & query);
|
void syncReplica(ASTSystemQuery & query);
|
||||||
|
|
||||||
|
void restoreReplica();
|
||||||
|
|
||||||
void dropReplica(ASTSystemQuery & query);
|
void dropReplica(ASTSystemQuery & query);
|
||||||
bool dropReplicaImpl(ASTSystemQuery & query, const StoragePtr & table);
|
bool dropReplicaImpl(ASTSystemQuery & query, const StoragePtr & table);
|
||||||
void flushDistributed(ASTSystemQuery & query);
|
void flushDistributed(ASTSystemQuery & query);
|
||||||
|
@ -102,12 +102,10 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
|
|||||||
|
|
||||||
/// The current database in a distributed query need to be replaced with either
|
/// The current database in a distributed query need to be replaced with either
|
||||||
/// the local current database or a shard's default database.
|
/// the local current database or a shard's default database.
|
||||||
bool need_replace_current_database
|
bool need_replace_current_database = std::any_of(
|
||||||
= (std::find_if(
|
|
||||||
query_requires_access.begin(),
|
query_requires_access.begin(),
|
||||||
query_requires_access.end(),
|
query_requires_access.end(),
|
||||||
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); })
|
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); });
|
||||||
!= query_requires_access.end());
|
|
||||||
|
|
||||||
bool use_local_default_database = false;
|
bool use_local_default_database = false;
|
||||||
const String & current_database = context->getCurrentDatabase();
|
const String & current_database = context->getCurrentDatabase();
|
||||||
|
@ -44,6 +44,8 @@ const char * ASTSystemQuery::typeToString(Type type)
|
|||||||
return "RESTART REPLICAS";
|
return "RESTART REPLICAS";
|
||||||
case Type::RESTART_REPLICA:
|
case Type::RESTART_REPLICA:
|
||||||
return "RESTART REPLICA";
|
return "RESTART REPLICA";
|
||||||
|
case Type::RESTORE_REPLICA:
|
||||||
|
return "RESTORE REPLICA";
|
||||||
case Type::DROP_REPLICA:
|
case Type::DROP_REPLICA:
|
||||||
return "DROP REPLICA";
|
return "DROP REPLICA";
|
||||||
case Type::SYNC_REPLICA:
|
case Type::SYNC_REPLICA:
|
||||||
@ -119,18 +121,6 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|
|||||||
<< (settings.hilite ? hilite_none : "");
|
<< (settings.hilite ? hilite_none : "");
|
||||||
};
|
};
|
||||||
|
|
||||||
auto print_database_dictionary = [&]
|
|
||||||
{
|
|
||||||
settings.ostr << " ";
|
|
||||||
if (!database.empty())
|
|
||||||
{
|
|
||||||
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(database)
|
|
||||||
<< (settings.hilite ? hilite_none : "") << ".";
|
|
||||||
}
|
|
||||||
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(target_dictionary)
|
|
||||||
<< (settings.hilite ? hilite_none : "");
|
|
||||||
};
|
|
||||||
|
|
||||||
auto print_drop_replica = [&]
|
auto print_drop_replica = [&]
|
||||||
{
|
{
|
||||||
settings.ostr << " " << quoteString(replica);
|
settings.ostr << " " << quoteString(replica);
|
||||||
@ -187,14 +177,14 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|
|||||||
else if (!volume.empty())
|
else if (!volume.empty())
|
||||||
print_on_volume();
|
print_on_volume();
|
||||||
}
|
}
|
||||||
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED)
|
else if ( type == Type::RESTART_REPLICA
|
||||||
|
|| type == Type::RESTORE_REPLICA
|
||||||
|
|| type == Type::SYNC_REPLICA
|
||||||
|
|| type == Type::FLUSH_DISTRIBUTED
|
||||||
|
|| type == Type::RELOAD_DICTIONARY)
|
||||||
{
|
{
|
||||||
print_database_table();
|
print_database_table();
|
||||||
}
|
}
|
||||||
else if (type == Type::RELOAD_DICTIONARY)
|
|
||||||
{
|
|
||||||
print_database_dictionary();
|
|
||||||
}
|
|
||||||
else if (type == Type::DROP_REPLICA)
|
else if (type == Type::DROP_REPLICA)
|
||||||
{
|
{
|
||||||
print_drop_replica();
|
print_drop_replica();
|
||||||
|
@ -32,6 +32,7 @@ public:
|
|||||||
START_LISTEN_QUERIES,
|
START_LISTEN_QUERIES,
|
||||||
RESTART_REPLICAS,
|
RESTART_REPLICAS,
|
||||||
RESTART_REPLICA,
|
RESTART_REPLICA,
|
||||||
|
RESTORE_REPLICA,
|
||||||
DROP_REPLICA,
|
DROP_REPLICA,
|
||||||
SYNC_REPLICA,
|
SYNC_REPLICA,
|
||||||
RELOAD_DICTIONARY,
|
RELOAD_DICTIONARY,
|
||||||
@ -65,7 +66,6 @@ public:
|
|||||||
|
|
||||||
Type type = Type::UNKNOWN;
|
Type type = Type::UNKNOWN;
|
||||||
|
|
||||||
String target_dictionary;
|
|
||||||
String target_model;
|
String target_model;
|
||||||
String database;
|
String database;
|
||||||
String table;
|
String table;
|
||||||
|
@ -133,7 +133,7 @@ ASTPtr SystemQuery::convertToOld() const
|
|||||||
{
|
{
|
||||||
auto table = std::static_pointer_cast<ASTTableIdentifier>(get(TABLE)->convertToOld());
|
auto table = std::static_pointer_cast<ASTTableIdentifier>(get(TABLE)->convertToOld());
|
||||||
query->database = table->getDatabaseName();
|
query->database = table->getDatabaseName();
|
||||||
query->target_dictionary = table->shortName();
|
query->table = table->getTableId().table_name;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case QueryType::REPLICATED_SENDS:
|
case QueryType::REPLICATED_SENDS:
|
||||||
|
@ -95,21 +95,18 @@ bool ParserRenameQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
|||||||
|
|
||||||
ASTRenameQuery::Elements elements;
|
ASTRenameQuery::Elements elements;
|
||||||
|
|
||||||
auto ignore_delim = [&]()
|
const auto ignore_delim = [&] { return exchange ? s_and.ignore(pos) : s_to.ignore(pos); };
|
||||||
{
|
|
||||||
return exchange ? s_and.ignore(pos) : s_to.ignore(pos);
|
|
||||||
};
|
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
if (!elements.empty() && !s_comma.ignore(pos))
|
if (!elements.empty() && !s_comma.ignore(pos))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
elements.push_back(ASTRenameQuery::Element());
|
ASTRenameQuery::Element& ref = elements.emplace_back();
|
||||||
|
|
||||||
if (!parseDatabaseAndTable(elements.back().from, pos, expected)
|
if (!parseDatabaseAndTable(ref.from, pos, expected)
|
||||||
|| !ignore_delim()
|
|| !ignore_delim()
|
||||||
|| !parseDatabaseAndTable(elements.back().to, pos, expected))
|
|| !parseDatabaseAndTable(ref.to, pos, expected))
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,6 +15,47 @@ namespace ErrorCodes
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
static bool parseQueryWithOnClusterAndMaybeTable(std::shared_ptr<ASTSystemQuery> & res, IParser::Pos & pos,
|
||||||
|
Expected & expected, bool require_table, bool allow_string_literal)
|
||||||
|
{
|
||||||
|
/// Better form for user: SYSTEM <ACTION> table ON CLUSTER cluster
|
||||||
|
/// Query rewritten form + form while executing on cluster: SYSTEM <ACTION> ON CLUSTER cluster table
|
||||||
|
/// Need to support both
|
||||||
|
String cluster;
|
||||||
|
bool parsed_on_cluster = false;
|
||||||
|
|
||||||
|
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
||||||
|
{
|
||||||
|
if (!ASTQueryWithOnCluster::parse(pos, cluster, expected))
|
||||||
|
return false;
|
||||||
|
parsed_on_cluster = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool parsed_table = false;
|
||||||
|
if (allow_string_literal)
|
||||||
|
{
|
||||||
|
ASTPtr ast;
|
||||||
|
if (ParserStringLiteral{}.parse(pos, ast, expected))
|
||||||
|
{
|
||||||
|
res->database = {};
|
||||||
|
res->table = ast->as<ASTLiteral &>().value.safeGet<String>();
|
||||||
|
parsed_table = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!parsed_table)
|
||||||
|
parsed_table = parseDatabaseAndTableName(pos, expected, res->database, res->table);
|
||||||
|
|
||||||
|
if (!parsed_table && require_table)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (!parsed_on_cluster && ParserKeyword{"ON"}.ignore(pos, expected))
|
||||||
|
if (!ASTQueryWithOnCluster::parse(pos, cluster, expected))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
res->cluster = cluster;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
|
bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
|
||||||
{
|
{
|
||||||
@ -43,17 +84,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
|
|||||||
{
|
{
|
||||||
case Type::RELOAD_DICTIONARY:
|
case Type::RELOAD_DICTIONARY:
|
||||||
{
|
{
|
||||||
String cluster_str;
|
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ true))
|
||||||
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
|
||||||
{
|
|
||||||
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
res->cluster = cluster_str;
|
|
||||||
ASTPtr ast;
|
|
||||||
if (ParserStringLiteral{}.parse(pos, ast, expected))
|
|
||||||
res->target_dictionary = ast->as<ASTLiteral &>().value.safeGet<String>();
|
|
||||||
else if (!parseDatabaseAndTableName(pos, expected, res->database, res->target_dictionary))
|
|
||||||
return false;
|
return false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -145,24 +176,21 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case Type::STOP_DISTRIBUTED_SENDS:
|
|
||||||
case Type::START_DISTRIBUTED_SENDS:
|
|
||||||
case Type::FLUSH_DISTRIBUTED:
|
|
||||||
{
|
|
||||||
String cluster_str;
|
|
||||||
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
|
||||||
{
|
|
||||||
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
res->cluster = cluster_str;
|
|
||||||
if (!parseDatabaseAndTableName(pos, expected, res->database, res->table))
|
|
||||||
{
|
|
||||||
/// FLUSH DISTRIBUTED requires table
|
/// FLUSH DISTRIBUTED requires table
|
||||||
/// START/STOP DISTRIBUTED SENDS does not require table
|
/// START/STOP DISTRIBUTED SENDS does not require table
|
||||||
if (res->type == Type::FLUSH_DISTRIBUTED)
|
case Type::STOP_DISTRIBUTED_SENDS:
|
||||||
|
case Type::START_DISTRIBUTED_SENDS:
|
||||||
|
{
|
||||||
|
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ false, /* allow_string_literal = */ false))
|
||||||
return false;
|
return false;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case Type::FLUSH_DISTRIBUTED:
|
||||||
|
case Type::RESTORE_REPLICA:
|
||||||
|
{
|
||||||
|
if (!parseQueryWithOnClusterAndMaybeTable(res, pos, expected, /* require table = */ true, /* allow_string_literal = */ false))
|
||||||
|
return false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,9 +190,10 @@ protected:
|
|||||||
/// Initially reserved virtual column name may be shadowed by real column.
|
/// Initially reserved virtual column name may be shadowed by real column.
|
||||||
bool isVirtualColumn(const String & column_name, const StorageMetadataPtr & metadata_snapshot) const;
|
bool isVirtualColumn(const String & column_name, const StorageMetadataPtr & metadata_snapshot) const;
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
StorageID storage_id;
|
StorageID storage_id;
|
||||||
|
|
||||||
mutable std::mutex id_mutex;
|
mutable std::mutex id_mutex;
|
||||||
|
|
||||||
/// Multiversion storage metadata. Allows to read/write storage metadata
|
/// Multiversion storage metadata. Allows to read/write storage metadata
|
||||||
|
@ -223,6 +223,12 @@ public:
|
|||||||
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
|
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static constexpr auto all_part_states =
|
||||||
|
{
|
||||||
|
State::Temporary, State::PreCommitted, State::Committed, State::Outdated, State::Deleting,
|
||||||
|
State::DeleteOnDestroy
|
||||||
|
};
|
||||||
|
|
||||||
using TTLInfo = MergeTreeDataPartTTLInfo;
|
using TTLInfo = MergeTreeDataPartTTLInfo;
|
||||||
using TTLInfos = MergeTreeDataPartTTLInfos;
|
using TTLInfos = MergeTreeDataPartTTLInfos;
|
||||||
|
|
||||||
|
@ -2156,8 +2156,7 @@ bool MergeTreeData::renameTempPartAndReplace(
|
|||||||
|
|
||||||
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->relative_path, part_name);
|
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->relative_path, part_name);
|
||||||
|
|
||||||
auto it_duplicate = data_parts_by_info.find(part_info);
|
if (auto it_duplicate = data_parts_by_info.find(part_info); it_duplicate != data_parts_by_info.end())
|
||||||
if (it_duplicate != data_parts_by_info.end())
|
|
||||||
{
|
{
|
||||||
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
|
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
|
||||||
|
|
||||||
|
@ -402,6 +402,7 @@ public:
|
|||||||
|
|
||||||
/// Returns a copy of the list so that the caller shouldn't worry about locks.
|
/// Returns a copy of the list so that the caller shouldn't worry about locks.
|
||||||
DataParts getDataParts(const DataPartStates & affordable_states) const;
|
DataParts getDataParts(const DataPartStates & affordable_states) const;
|
||||||
|
|
||||||
/// Returns sorted list of the parts with specified states
|
/// Returns sorted list of the parts with specified states
|
||||||
/// out_states will contain snapshot of each part state
|
/// out_states will contain snapshot of each part state
|
||||||
DataPartsVector getDataPartsVector(
|
DataPartsVector getDataPartsVector(
|
||||||
|
@ -262,8 +262,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
|||||||
{
|
{
|
||||||
log_entry.type = StorageReplicatedMergeTree::LogEntry::ATTACH_PART;
|
log_entry.type = StorageReplicatedMergeTree::LogEntry::ATTACH_PART;
|
||||||
|
|
||||||
/// We don't need to involve ZooKeeper to obtain the checksums as by the time we get
|
/// We don't need to involve ZooKeeper to obtain checksums as by the time we get
|
||||||
/// the MutableDataPartPtr here, we already have the data thus being able to
|
/// MutableDataPartPtr here, we already have the data thus being able to
|
||||||
/// calculate the checksums.
|
/// calculate the checksums.
|
||||||
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
|
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
|
||||||
}
|
}
|
||||||
@ -384,6 +384,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
|||||||
|
|
||||||
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
||||||
bool renamed = false;
|
bool renamed = false;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
renamed = storage.renameTempPartAndAdd(part, nullptr, &transaction);
|
renamed = storage.renameTempPartAndAdd(part, nullptr, &transaction);
|
||||||
@ -394,6 +395,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
|||||||
&& e.code() != ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
|
&& e.code() != ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!renamed)
|
if (!renamed)
|
||||||
{
|
{
|
||||||
if (is_already_existing_part)
|
if (is_already_existing_part)
|
||||||
|
@ -257,7 +257,7 @@ If you use the Replicated version of engines, see https://clickhouse.tech/docs/e
|
|||||||
|
|
||||||
static StoragePtr create(const StorageFactory::Arguments & args)
|
static StoragePtr create(const StorageFactory::Arguments & args)
|
||||||
{
|
{
|
||||||
/** [Replicated][|Summing|Collapsing|Aggregating|Replacing|Graphite]MergeTree (2 * 7 combinations) engines
|
/** [Replicated][|Summing|VersionedCollapsing|Collapsing|Aggregating|Replacing|Graphite]MergeTree (2 * 7 combinations) engines
|
||||||
* The argument for the engine should be:
|
* The argument for the engine should be:
|
||||||
* - (for Replicated) The path to the table in ZooKeeper
|
* - (for Replicated) The path to the table in ZooKeeper
|
||||||
* - (for Replicated) Replica name in ZooKeeper
|
* - (for Replicated) Replica name in ZooKeeper
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#include <Core/Defines.h>
|
#include <Core/Defines.h>
|
||||||
|
|
||||||
|
#include "Common/hex.h"
|
||||||
#include <Common/Macros.h>
|
#include <Common/Macros.h>
|
||||||
#include <Common/StringUtils/StringUtils.h>
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
@ -63,10 +64,13 @@
|
|||||||
#include <common/scope_guard.h>
|
#include <common/scope_guard.h>
|
||||||
#include <common/scope_guard_safe.h>
|
#include <common/scope_guard_safe.h>
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
#include <ctime>
|
#include <ctime>
|
||||||
|
#include <filesystem>
|
||||||
|
#include <iterator>
|
||||||
|
#include <numeric>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <filesystem>
|
|
||||||
|
|
||||||
#include <boost/algorithm/string/join.hpp>
|
#include <boost/algorithm/string/join.hpp>
|
||||||
|
|
||||||
@ -135,6 +139,7 @@ namespace ErrorCodes
|
|||||||
extern const int INTERSERVER_SCHEME_DOESNT_MATCH;
|
extern const int INTERSERVER_SCHEME_DOESNT_MATCH;
|
||||||
extern const int DUPLICATE_DATA_PART;
|
extern const int DUPLICATE_DATA_PART;
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
|
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace ActionLocks
|
namespace ActionLocks
|
||||||
@ -153,10 +158,6 @@ static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
|
|||||||
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
|
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
|
||||||
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
|
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
|
||||||
|
|
||||||
|
|
||||||
std::atomic_uint StorageReplicatedMergeTree::total_fetches {0};
|
|
||||||
|
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::setZooKeeper()
|
void StorageReplicatedMergeTree::setZooKeeper()
|
||||||
{
|
{
|
||||||
/// Every ReplicatedMergeTree table is using only one ZooKeeper session.
|
/// Every ReplicatedMergeTree table is using only one ZooKeeper session.
|
||||||
@ -376,7 +377,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
|
|
||||||
if (attach && !current_zookeeper->exists(zookeeper_path + "/metadata"))
|
if (attach && !current_zookeeper->exists(zookeeper_path + "/metadata"))
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "No metadata in ZooKeeper: table will be in readonly mode.");
|
LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will be in readonly mode.", zookeeper_path);
|
||||||
is_readonly = true;
|
is_readonly = true;
|
||||||
has_metadata_in_zookeeper = false;
|
has_metadata_in_zookeeper = false;
|
||||||
return;
|
return;
|
||||||
@ -384,10 +385,20 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
|
|
||||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||||
|
|
||||||
|
/// May it be ZK lost not the whole root, so the upper check passed, but only the /replicas/replica
|
||||||
|
/// folder.
|
||||||
|
if (attach && !current_zookeeper->exists(replica_path))
|
||||||
|
{
|
||||||
|
LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will be in readonly mode", replica_path);
|
||||||
|
is_readonly = true;
|
||||||
|
has_metadata_in_zookeeper = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!attach)
|
if (!attach)
|
||||||
{
|
{
|
||||||
if (!getDataParts().empty())
|
if (!getDataParts().empty())
|
||||||
throw Exception("Data directory for table already containing data parts"
|
throw Exception("Data directory for table already contains data parts"
|
||||||
" - probably it was unclean DROP table or manual intervention."
|
" - probably it was unclean DROP table or manual intervention."
|
||||||
" You must either clear directory by hand or use ATTACH TABLE"
|
" You must either clear directory by hand or use ATTACH TABLE"
|
||||||
" instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
|
" instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
|
||||||
@ -433,13 +444,17 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
{
|
{
|
||||||
/// In old tables this node may missing or be empty
|
/// In old tables this node may missing or be empty
|
||||||
String replica_metadata;
|
String replica_metadata;
|
||||||
bool replica_metadata_exists = current_zookeeper->tryGet(replica_path + "/metadata", replica_metadata);
|
const bool replica_metadata_exists = current_zookeeper->tryGet(replica_path + "/metadata", replica_metadata);
|
||||||
|
|
||||||
if (!replica_metadata_exists || replica_metadata.empty())
|
if (!replica_metadata_exists || replica_metadata.empty())
|
||||||
{
|
{
|
||||||
/// We have to check shared node granularity before we create ours.
|
/// We have to check shared node granularity before we create ours.
|
||||||
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
|
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
|
||||||
|
|
||||||
ReplicatedMergeTreeTableMetadata current_metadata(*this, metadata_snapshot);
|
ReplicatedMergeTreeTableMetadata current_metadata(*this, metadata_snapshot);
|
||||||
current_zookeeper->createOrUpdate(replica_path + "/metadata", current_metadata.toString(), zkutil::CreateMode::Persistent);
|
|
||||||
|
current_zookeeper->createOrUpdate(replica_path + "/metadata", current_metadata.toString(),
|
||||||
|
zkutil::CreateMode::Persistent);
|
||||||
}
|
}
|
||||||
|
|
||||||
checkTableStructure(replica_path, metadata_snapshot);
|
checkTableStructure(replica_path, metadata_snapshot);
|
||||||
@ -460,8 +475,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
|
current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
|
||||||
metadata_version = metadata_stat.version;
|
metadata_version = metadata_stat.version;
|
||||||
}
|
}
|
||||||
/// Temporary directories contain untinalized results of Merges or Fetches (after forced restart)
|
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
|
||||||
/// and don't allow to reinitialize them, so delete each of them immediately
|
/// don't allow to reinitialize them, delete each of them immediately.
|
||||||
clearOldTemporaryDirectories(0);
|
clearOldTemporaryDirectories(0);
|
||||||
clearOldWriteAheadLogs();
|
clearOldWriteAheadLogs();
|
||||||
}
|
}
|
||||||
@ -727,12 +742,13 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
|
|||||||
String replicas_value;
|
String replicas_value;
|
||||||
|
|
||||||
if (!zookeeper->tryGet(zookeeper_path + "/replicas", replicas_value, &replicas_stat))
|
if (!zookeeper->tryGet(zookeeper_path + "/replicas", replicas_value, &replicas_stat))
|
||||||
throw Exception(fmt::format("Cannot create a replica of the table {}, because the last replica of the table was dropped right now",
|
throw Exception(ErrorCodes::ALL_REPLICAS_LOST,
|
||||||
zookeeper_path), ErrorCodes::ALL_REPLICAS_LOST);
|
"Cannot create a replica of the table {}, because the last replica of the table was dropped right now",
|
||||||
|
zookeeper_path);
|
||||||
|
|
||||||
/// It is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica.
|
/// It is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica.
|
||||||
/// By the way, it's possible that the replica will be first, if all previous replicas were removed concurrently.
|
/// By the way, it's possible that the replica will be first, if all previous replicas were removed concurrently.
|
||||||
String is_lost_value = replicas_stat.numChildren ? "1" : "0";
|
const String is_lost_value = replicas_stat.numChildren ? "1" : "0";
|
||||||
|
|
||||||
Coordination::Requests ops;
|
Coordination::Requests ops;
|
||||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "",
|
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "",
|
||||||
@ -761,20 +777,17 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
|
|||||||
|
|
||||||
Coordination::Responses responses;
|
Coordination::Responses responses;
|
||||||
code = zookeeper->tryMulti(ops, responses);
|
code = zookeeper->tryMulti(ops, responses);
|
||||||
if (code == Coordination::Error::ZNODEEXISTS)
|
|
||||||
{
|
switch (code)
|
||||||
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
|
|
||||||
}
|
|
||||||
else if (code == Coordination::Error::ZBADVERSION)
|
|
||||||
{
|
{
|
||||||
|
case Coordination::Error::ZNODEEXISTS:
|
||||||
|
throw Exception(ErrorCodes::REPLICA_IS_ALREADY_EXIST, "Replica {} already exists", replica_path);
|
||||||
|
case Coordination::Error::ZBADVERSION:
|
||||||
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
|
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
|
||||||
}
|
break;
|
||||||
else if (code == Coordination::Error::ZNONODE)
|
case Coordination::Error::ZNONODE:
|
||||||
{
|
throw Exception(ErrorCodes::ALL_REPLICAS_LOST, "Table {} was suddenly removed", zookeeper_path);
|
||||||
throw Exception("Table " + zookeeper_path + " was suddenly removed.", ErrorCodes::ALL_REPLICAS_LOST);
|
default:
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||||
}
|
}
|
||||||
} while (code == Coordination::Error::ZBADVERSION);
|
} while (code == Coordination::Error::ZBADVERSION);
|
||||||
@ -1123,6 +1136,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
|||||||
size_t unexpected_parts_nonnew = 0;
|
size_t unexpected_parts_nonnew = 0;
|
||||||
UInt64 unexpected_parts_nonnew_rows = 0;
|
UInt64 unexpected_parts_nonnew_rows = 0;
|
||||||
UInt64 unexpected_parts_rows = 0;
|
UInt64 unexpected_parts_rows = 0;
|
||||||
|
|
||||||
for (const auto & part : unexpected_parts)
|
for (const auto & part : unexpected_parts)
|
||||||
{
|
{
|
||||||
if (part->info.level > 0)
|
if (part->info.level > 0)
|
||||||
@ -1134,20 +1148,17 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
|||||||
unexpected_parts_rows += part->rows_count;
|
unexpected_parts_rows += part->rows_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Additional helpful statistics
|
const UInt64 parts_to_fetch_blocks = std::accumulate(parts_to_fetch.cbegin(), parts_to_fetch.cend(), 0,
|
||||||
auto get_blocks_count_in_data_part = [&] (const String & part_name) -> UInt64
|
[&](UInt64 acc, const String& part_name)
|
||||||
{
|
{
|
||||||
MergeTreePartInfo part_info;
|
MergeTreePartInfo part_info;
|
||||||
|
|
||||||
if (MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
|
if (MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
|
||||||
return part_info.getBlocksCount();
|
return acc + part_info.getBlocksCount();
|
||||||
|
|
||||||
LOG_ERROR(log, "Unexpected part name: {}", part_name);
|
LOG_ERROR(log, "Unexpected part name: {}", part_name);
|
||||||
return 0;
|
return acc;
|
||||||
};
|
});
|
||||||
|
|
||||||
UInt64 parts_to_fetch_blocks = 0;
|
|
||||||
for (const String & name : parts_to_fetch)
|
|
||||||
parts_to_fetch_blocks += get_blocks_count_in_data_part(name);
|
|
||||||
|
|
||||||
/** We can automatically synchronize data,
|
/** We can automatically synchronize data,
|
||||||
* if the ratio of the total number of errors to the total number of parts (minimum - on the local filesystem or in ZK)
|
* if the ratio of the total number of errors to the total number of parts (minimum - on the local filesystem or in ZK)
|
||||||
@ -1499,7 +1510,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
|
|||||||
{
|
{
|
||||||
if (MutableDataPartPtr part = attachPartHelperFoundValidPart(entry); part)
|
if (MutableDataPartPtr part = attachPartHelperFoundValidPart(entry); part)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Found valid part to attach from local data, preparing the transaction");
|
LOG_TRACE(log, "Found valid local part for {}, preparing the transaction", part->name);
|
||||||
|
|
||||||
Transaction transaction(*this);
|
Transaction transaction(*this);
|
||||||
|
|
||||||
@ -1512,7 +1523,9 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE(log, "Didn't find part with the correct checksums, will fetch it from other replica");
|
LOG_TRACE(log, "Didn't find valid local part for {} ({}), will fetch it from other replica",
|
||||||
|
entry.new_part_name,
|
||||||
|
entry.actual_new_part_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (is_get_or_attach && entry.source_replica == replica_name)
|
if (is_get_or_attach && entry.source_replica == replica_name)
|
||||||
@ -2732,6 +2745,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
/// Remove local parts if source replica does not have them, because such parts will never be fetched by other replicas.
|
/// Remove local parts if source replica does not have them, because such parts will never be fetched by other replicas.
|
||||||
Strings local_parts_in_zk = zookeeper->getChildren(fs::path(replica_path) / "parts");
|
Strings local_parts_in_zk = zookeeper->getChildren(fs::path(replica_path) / "parts");
|
||||||
Strings parts_to_remove_from_zk;
|
Strings parts_to_remove_from_zk;
|
||||||
|
|
||||||
for (const auto & part : local_parts_in_zk)
|
for (const auto & part : local_parts_in_zk)
|
||||||
{
|
{
|
||||||
if (active_parts_set.getContainingPart(part).empty())
|
if (active_parts_set.getContainingPart(part).empty())
|
||||||
@ -2740,10 +2754,13 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
|
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
|
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
|
||||||
|
|
||||||
auto local_active_parts = getDataParts();
|
auto local_active_parts = getDataParts();
|
||||||
|
|
||||||
DataPartsVector parts_to_remove_from_working_set;
|
DataPartsVector parts_to_remove_from_working_set;
|
||||||
|
|
||||||
for (const auto & part : local_active_parts)
|
for (const auto & part : local_active_parts)
|
||||||
{
|
{
|
||||||
if (active_parts_set.getContainingPart(part->name).empty())
|
if (active_parts_set.getContainingPart(part->name).empty())
|
||||||
@ -2756,6 +2773,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
if (getSettings()->detach_old_local_parts_when_cloning_replica)
|
if (getSettings()->detach_old_local_parts_when_cloning_replica)
|
||||||
{
|
{
|
||||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||||
|
|
||||||
for (const auto & part : parts_to_remove_from_working_set)
|
for (const auto & part : parts_to_remove_from_working_set)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Detaching {}", part->relative_path);
|
LOG_INFO(log, "Detaching {}", part->relative_path);
|
||||||
@ -2768,7 +2786,35 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
|||||||
for (const String & name : active_parts)
|
for (const String & name : active_parts)
|
||||||
{
|
{
|
||||||
LogEntry log_entry;
|
LogEntry log_entry;
|
||||||
|
|
||||||
|
if (!are_restoring_replica)
|
||||||
log_entry.type = LogEntry::GET_PART;
|
log_entry.type = LogEntry::GET_PART;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Obtaining checksum for path {}", name);
|
||||||
|
|
||||||
|
// The part we want to fetch is probably present in detached/ folder.
|
||||||
|
// However, we need to get part's checksum to check if it's not corrupt.
|
||||||
|
log_entry.type = LogEntry::ATTACH_PART;
|
||||||
|
|
||||||
|
MinimalisticDataPartChecksums desired_checksums;
|
||||||
|
|
||||||
|
const fs::path part_path = fs::path(source_path) / "parts" / name;
|
||||||
|
|
||||||
|
const String part_znode = zookeeper->get(part_path);
|
||||||
|
|
||||||
|
if (!part_znode.empty())
|
||||||
|
desired_checksums = ReplicatedMergeTreePartHeader::fromString(part_znode).getChecksums();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
String desired_checksums_str = zookeeper->get(part_path / "checksums");
|
||||||
|
desired_checksums = MinimalisticDataPartChecksums::deserializeFrom(desired_checksums_str);
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto [lo, hi] = desired_checksums.hash_of_all_files;
|
||||||
|
log_entry.part_checksum = getHexUIntUppercase(hi) + getHexUIntUppercase(lo);
|
||||||
|
}
|
||||||
|
|
||||||
log_entry.source_replica = "";
|
log_entry.source_replica = "";
|
||||||
log_entry.new_part_name = name;
|
log_entry.new_part_name = name;
|
||||||
log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
|
log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
|
||||||
@ -2868,6 +2914,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
|
|||||||
Coordination::Stat is_lost_stat;
|
Coordination::Stat is_lost_stat;
|
||||||
bool is_new_replica = true;
|
bool is_new_replica = true;
|
||||||
String res;
|
String res;
|
||||||
|
|
||||||
if (zookeeper->tryGet(fs::path(replica_path) / "is_lost", res, &is_lost_stat))
|
if (zookeeper->tryGet(fs::path(replica_path) / "is_lost", res, &is_lost_stat))
|
||||||
{
|
{
|
||||||
if (res == "0")
|
if (res == "0")
|
||||||
@ -3968,6 +4015,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
|||||||
MinimalisticDataPartChecksums desired_checksums;
|
MinimalisticDataPartChecksums desired_checksums;
|
||||||
String part_path = fs::path(source_replica_path) / "parts" / part_name;
|
String part_path = fs::path(source_replica_path) / "parts" / part_name;
|
||||||
String part_znode = zookeeper->get(part_path);
|
String part_znode = zookeeper->get(part_path);
|
||||||
|
|
||||||
if (!part_znode.empty())
|
if (!part_znode.empty())
|
||||||
desired_checksums = ReplicatedMergeTreePartHeader::fromString(part_znode).getChecksums();
|
desired_checksums = ReplicatedMergeTreePartHeader::fromString(part_znode).getChecksums();
|
||||||
else
|
else
|
||||||
@ -5030,6 +5078,59 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
|
||||||
|
{
|
||||||
|
LOG_INFO(log, "Restoring replica metadata");
|
||||||
|
|
||||||
|
if (!is_readonly || has_metadata_in_zookeeper)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "It's a bug: replica is not readonly");
|
||||||
|
|
||||||
|
if (are_restoring_replica.exchange(true))
|
||||||
|
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Replica restoration in progress");
|
||||||
|
|
||||||
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||||
|
|
||||||
|
const DataPartsVector all_parts = getDataPartsVector(IMergeTreeDataPart::all_part_states);
|
||||||
|
Strings active_parts_names;
|
||||||
|
|
||||||
|
/// Why all parts (not only Committed) are moved to detached/:
|
||||||
|
/// After ZK metadata restoration ZK resets sequential counters (including block number counters), so one may
|
||||||
|
/// potentially encounter a situation that a part we want to attach already exists.
|
||||||
|
for (const auto & part : all_parts)
|
||||||
|
{
|
||||||
|
if (part->getState() == DataPartState::Committed)
|
||||||
|
active_parts_names.push_back(part->name);
|
||||||
|
|
||||||
|
forgetPartAndMoveToDetached(part);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG_INFO(log, "Moved all parts to detached/");
|
||||||
|
|
||||||
|
const bool is_first_replica = createTableIfNotExists(metadata_snapshot);
|
||||||
|
|
||||||
|
LOG_INFO(log, "Created initial ZK nodes, replica is first: {}", is_first_replica);
|
||||||
|
|
||||||
|
if (!is_first_replica)
|
||||||
|
createReplica(metadata_snapshot);
|
||||||
|
|
||||||
|
createNewZooKeeperNodes();
|
||||||
|
|
||||||
|
LOG_INFO(log, "Created ZK nodes for table");
|
||||||
|
|
||||||
|
is_readonly = false;
|
||||||
|
has_metadata_in_zookeeper = true;
|
||||||
|
|
||||||
|
if (is_first_replica)
|
||||||
|
for (const String& part_name : active_parts_names)
|
||||||
|
attachPartition(std::make_shared<ASTLiteral>(part_name), metadata_snapshot, true, getContext());
|
||||||
|
|
||||||
|
LOG_INFO(log, "Attached all partitions, starting table");
|
||||||
|
|
||||||
|
startup();
|
||||||
|
|
||||||
|
are_restoring_replica.store(false);
|
||||||
|
}
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name)
|
void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name)
|
||||||
{
|
{
|
||||||
assertNotReadonly();
|
assertNotReadonly();
|
||||||
@ -6938,8 +7039,10 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition(
|
|||||||
zookeeper.get(alter_partition_version_path, &alter_partition_version_stat);
|
zookeeper.get(alter_partition_version_path, &alter_partition_version_stat);
|
||||||
|
|
||||||
MergeTreePartInfo drop_range_info;
|
MergeTreePartInfo drop_range_info;
|
||||||
/// It prevent other replicas from assigning merges which intersect locked block number.
|
|
||||||
|
/// It would prevent other replicas from assigning merges which intersect locked block number.
|
||||||
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
|
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
|
||||||
|
|
||||||
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock))
|
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info, delimiting_block_lock))
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id);
|
LOG_INFO(log, "Will not drop partition {}, it is empty.", partition_id);
|
||||||
@ -6960,23 +7063,31 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition(
|
|||||||
entry.create_time = time(nullptr);
|
entry.create_time = time(nullptr);
|
||||||
|
|
||||||
Coordination::Requests ops;
|
Coordination::Requests ops;
|
||||||
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
|
|
||||||
|
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(),
|
||||||
|
zkutil::CreateMode::PersistentSequential));
|
||||||
|
|
||||||
/// Check and update version to avoid race with REPLACE_RANGE.
|
/// Check and update version to avoid race with REPLACE_RANGE.
|
||||||
/// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry
|
/// Otherwise new parts covered by drop_range_info may appear after execution of current DROP_RANGE entry
|
||||||
/// as a result of execution of concurrently created REPLACE_RANGE entry.
|
/// as a result of execution of concurrently created REPLACE_RANGE entry.
|
||||||
ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version));
|
ops.emplace_back(zkutil::makeCheckRequest(alter_partition_version_path, alter_partition_version_stat.version));
|
||||||
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1));
|
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", -1));
|
||||||
|
|
||||||
/// Just update version, because merges assignment relies on it
|
/// Just update version, because merges assignment relies on it
|
||||||
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
|
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
|
||||||
delimiting_block_lock->getUnlockOps(ops);
|
delimiting_block_lock->getUnlockOps(ops);
|
||||||
|
|
||||||
if (auto txn = query_context->getZooKeeperMetadataTransaction())
|
if (auto txn = query_context->getZooKeeperMetadataTransaction())
|
||||||
txn->moveOpsTo(ops);
|
txn->moveOpsTo(ops);
|
||||||
|
|
||||||
Coordination::Responses responses;
|
Coordination::Responses responses;
|
||||||
Coordination::Error code = zookeeper.tryMulti(ops, responses);
|
Coordination::Error code = zookeeper.tryMulti(ops, responses);
|
||||||
|
|
||||||
if (code == Coordination::Error::ZOK)
|
if (code == Coordination::Error::ZOK)
|
||||||
delimiting_block_lock->assumeUnlocked();
|
delimiting_block_lock->assumeUnlocked();
|
||||||
else if (code == Coordination::Error::ZBADVERSION)
|
else if (code == Coordination::Error::ZBADVERSION)
|
||||||
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed");
|
throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER,
|
||||||
|
"Cannot assign ALTER PARTITION because another ALTER PARTITION query was concurrently executed");
|
||||||
else
|
else
|
||||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper.
|
/** The engine that uses the merge tree (see MergeTreeData) and is replicated through ZooKeeper.
|
||||||
*
|
*
|
||||||
* ZooKeeper is used for the following things:
|
* ZooKeeper is used for the following things:
|
||||||
* - the structure of the table (/metadata, /columns)
|
* - the structure of the table (/metadata, /columns)
|
||||||
@ -57,6 +57,7 @@ namespace DB
|
|||||||
* Log - a sequence of entries (LogEntry) about what to do.
|
* Log - a sequence of entries (LogEntry) about what to do.
|
||||||
* Each entry is one of:
|
* Each entry is one of:
|
||||||
* - normal data insertion (GET),
|
* - normal data insertion (GET),
|
||||||
|
* - data insertion with a possible attach from local data (ATTACH),
|
||||||
* - merge (MERGE),
|
* - merge (MERGE),
|
||||||
* - delete the partition (DROP).
|
* - delete the partition (DROP).
|
||||||
*
|
*
|
||||||
@ -65,10 +66,8 @@ namespace DB
|
|||||||
* Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
|
* Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
|
||||||
* In addition, the records in the queue can be generated independently (not from the log), in the following cases:
|
* In addition, the records in the queue can be generated independently (not from the log), in the following cases:
|
||||||
* - when creating a new replica, actions are put on GET from other replicas (createReplica);
|
* - when creating a new replica, actions are put on GET from other replicas (createReplica);
|
||||||
* - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check (at start - checkParts, while running - searchForMissingPart),
|
* - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check
|
||||||
* actions are put on GET from other replicas;
|
* (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;
|
||||||
*
|
|
||||||
* TODO Update the GET part after rewriting the code (search locally).
|
|
||||||
*
|
*
|
||||||
* The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
|
* The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
|
||||||
* Such an entry is considered to be executed as soon as the queue handler sees it.
|
* Such an entry is considered to be executed as soon as the queue handler sees it.
|
||||||
@ -240,6 +239,13 @@ public:
|
|||||||
/// Get best replica having this partition on S3
|
/// Get best replica having this partition on S3
|
||||||
String getSharedDataReplica(const IMergeTreeDataPart & part) const;
|
String getSharedDataReplica(const IMergeTreeDataPart & part) const;
|
||||||
|
|
||||||
|
inline String getReplicaName() const { return replica_name; }
|
||||||
|
|
||||||
|
/// Restores table metadata if ZooKeeper lost it.
|
||||||
|
/// Used only on restarted readonly replicas (not checked). All active (Committed) parts are moved to detached/
|
||||||
|
/// folder and attached. Parts in all other states are just moved to detached/ folder.
|
||||||
|
void restoreMetadataInZooKeeper();
|
||||||
|
|
||||||
/// Get throttler for replicated fetches
|
/// Get throttler for replicated fetches
|
||||||
ThrottlerPtr getFetchesThrottler() const
|
ThrottlerPtr getFetchesThrottler() const
|
||||||
{
|
{
|
||||||
@ -253,6 +259,8 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
std::atomic_bool are_restoring_replica {false};
|
||||||
|
|
||||||
/// Get a sequential consistent view of current parts.
|
/// Get a sequential consistent view of current parts.
|
||||||
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
|
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
|
||||||
|
|
||||||
@ -332,7 +340,7 @@ private:
|
|||||||
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
|
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
|
||||||
|
|
||||||
/// Limiting parallel fetches per node
|
/// Limiting parallel fetches per node
|
||||||
static std::atomic_uint total_fetches;
|
static inline std::atomic_uint total_fetches {0};
|
||||||
|
|
||||||
/// Limiting parallel fetches per one table
|
/// Limiting parallel fetches per one table
|
||||||
std::atomic_uint current_table_fetches {0};
|
std::atomic_uint current_table_fetches {0};
|
||||||
@ -389,7 +397,8 @@ private:
|
|||||||
*/
|
*/
|
||||||
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
|
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
|
||||||
|
|
||||||
/** Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
|
/**
|
||||||
|
* Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
|
||||||
*/
|
*/
|
||||||
void createReplica(const StorageMetadataPtr & metadata_snapshot);
|
void createReplica(const StorageMetadataPtr & metadata_snapshot);
|
||||||
|
|
||||||
|
@ -16,11 +16,10 @@ def fill_node(node):
|
|||||||
'''.format(replica=node.name))
|
'''.format(replica=node.name))
|
||||||
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
cluster = ClickHouseCluster(__file__)
|
||||||
configs =["configs/remote_servers.xml"]
|
|
||||||
|
|
||||||
node_1 = cluster.add_instance('replica1', with_zookeeper=True, main_configs=configs)
|
node_1 = cluster.add_instance('replica1', with_zookeeper=True)
|
||||||
node_2 = cluster.add_instance('replica2', with_zookeeper=True, main_configs=configs)
|
node_2 = cluster.add_instance('replica2', with_zookeeper=True)
|
||||||
node_3 = cluster.add_instance('replica3', with_zookeeper=True, main_configs=configs)
|
node_3 = cluster.add_instance('replica3', with_zookeeper=True)
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def start_cluster():
|
def start_cluster():
|
||||||
|
@ -151,7 +151,7 @@ def test_grant_all_on_table():
|
|||||||
instance.query("GRANT ALL ON test.table TO A WITH GRANT OPTION")
|
instance.query("GRANT ALL ON test.table TO A WITH GRANT OPTION")
|
||||||
instance.query("GRANT ALL ON test.table TO B", user='A')
|
instance.query("GRANT ALL ON test.table TO B", user='A')
|
||||||
assert instance.query(
|
assert instance.query(
|
||||||
"SHOW GRANTS FOR B") == "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, DROP TABLE, DROP VIEW, DROP DICTIONARY, TRUNCATE, OPTIMIZE, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON test.table TO B\n"
|
"SHOW GRANTS FOR B") == "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, DROP TABLE, DROP VIEW, DROP DICTIONARY, TRUNCATE, OPTIMIZE, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, SYSTEM RESTORE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON test.table TO B\n"
|
||||||
instance.query("REVOKE ALL ON test.table FROM B", user='A')
|
instance.query("REVOKE ALL ON test.table FROM B", user='A')
|
||||||
assert instance.query("SHOW GRANTS FOR B") == ""
|
assert instance.query("SHOW GRANTS FOR B") == ""
|
||||||
|
|
||||||
|
0
tests/integration/test_restore_replica/__init__.py
Normal file
0
tests/integration/test_restore_replica/__init__.py
Normal file
@ -4,15 +4,15 @@
|
|||||||
<shard>
|
<shard>
|
||||||
<internal_replication>true</internal_replication>
|
<internal_replication>true</internal_replication>
|
||||||
<replica>
|
<replica>
|
||||||
<host>node_1_1</host>
|
<host>replica1</host>
|
||||||
<port>9000</port>
|
<port>9000</port>
|
||||||
</replica>
|
</replica>
|
||||||
<replica>
|
<replica>
|
||||||
<host>node_1_2</host>
|
<host>replica2</host>
|
||||||
<port>9000</port>
|
<port>9000</port>
|
||||||
</replica>
|
</replica>
|
||||||
<replica>
|
<replica>
|
||||||
<host>node_1_3</host>
|
<host>replica3</host>
|
||||||
<port>9000</port>
|
<port>9000</port>
|
||||||
</replica>
|
</replica>
|
||||||
</shard>
|
</shard>
|
156
tests/integration/test_restore_replica/test.py
Normal file
156
tests/integration/test_restore_replica/test.py
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
from helpers.cluster import ClickHouseKiller
|
||||||
|
from helpers.test_tools import assert_eq_with_retry
|
||||||
|
from helpers.network import PartitionManager
|
||||||
|
|
||||||
|
def fill_nodes(nodes):
|
||||||
|
for node in nodes:
|
||||||
|
node.query(
|
||||||
|
'''
|
||||||
|
CREATE TABLE test(n UInt32)
|
||||||
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/', '{replica}')
|
||||||
|
ORDER BY n PARTITION BY n % 10;
|
||||||
|
'''.format(replica=node.name))
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
configs =["configs/remote_servers.xml"]
|
||||||
|
|
||||||
|
node_1 = cluster.add_instance('replica1', with_zookeeper=True, main_configs=configs)
|
||||||
|
node_2 = cluster.add_instance('replica2', with_zookeeper=True, main_configs=configs)
|
||||||
|
node_3 = cluster.add_instance('replica3', with_zookeeper=True, main_configs=configs)
|
||||||
|
nodes = [node_1, node_2, node_3]
|
||||||
|
|
||||||
|
def fill_table():
|
||||||
|
node_1.query("TRUNCATE TABLE test")
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
node.query("SYSTEM SYNC REPLICA test")
|
||||||
|
|
||||||
|
check_data(0, 0)
|
||||||
|
|
||||||
|
# it will create multiple parts in each partition and probably cause merges
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 0 FROM numbers(200)")
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 200 FROM numbers(200)")
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 400 FROM numbers(200)")
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 600 FROM numbers(200)")
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 800 FROM numbers(200)")
|
||||||
|
check_data(499500, 1000)
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def start_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
fill_nodes(nodes)
|
||||||
|
yield cluster
|
||||||
|
|
||||||
|
except Exception as ex:
|
||||||
|
print(ex)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
def check_data(_sum: int, count: int) -> None:
|
||||||
|
res = "{}\t{}\n".format(_sum, count)
|
||||||
|
assert_eq_with_retry(node_1, "SELECT sum(n), count() FROM test", res)
|
||||||
|
assert_eq_with_retry(node_2, "SELECT sum(n), count() FROM test", res)
|
||||||
|
assert_eq_with_retry(node_3, "SELECT sum(n), count() FROM test", res)
|
||||||
|
|
||||||
|
def check_after_restoration():
|
||||||
|
check_data(1999000, 2000)
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
node.query_and_get_error("SYSTEM RESTORE REPLICA test")
|
||||||
|
|
||||||
|
def test_restore_replica_invalid_tables(start_cluster):
|
||||||
|
print("Checking the invocation on non-existent and non-replicated tables")
|
||||||
|
node_1.query_and_get_error("SYSTEM RESTORE REPLICA i_dont_exist_42")
|
||||||
|
node_1.query_and_get_error("SYSTEM RESTORE REPLICA no_db.i_dont_exist_42")
|
||||||
|
node_1.query_and_get_error("SYSTEM RESTORE REPLICA system.numbers")
|
||||||
|
|
||||||
|
def test_restore_replica_sequential(start_cluster):
|
||||||
|
zk = cluster.get_kazoo_client('zoo1')
|
||||||
|
fill_table()
|
||||||
|
|
||||||
|
print("Deleting root ZK path metadata")
|
||||||
|
zk.delete("/clickhouse/tables/test", recursive=True)
|
||||||
|
assert zk.exists("/clickhouse/tables/test") is None
|
||||||
|
|
||||||
|
node_1.query("SYSTEM RESTART REPLICA test")
|
||||||
|
node_1.query_and_get_error("INSERT INTO test SELECT number AS num FROM numbers(1000,2000) WHERE num % 2 = 0")
|
||||||
|
|
||||||
|
print("Restoring replica1")
|
||||||
|
|
||||||
|
node_1.query("SYSTEM RESTORE REPLICA test")
|
||||||
|
assert zk.exists("/clickhouse/tables/test")
|
||||||
|
check_data(499500, 1000)
|
||||||
|
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 1000 FROM numbers(1000)")
|
||||||
|
|
||||||
|
print("Restoring other replicas")
|
||||||
|
|
||||||
|
node_2.query("SYSTEM RESTART REPLICA test")
|
||||||
|
node_2.query("SYSTEM RESTORE REPLICA test")
|
||||||
|
|
||||||
|
node_3.query("SYSTEM RESTART REPLICA test")
|
||||||
|
node_3.query("SYSTEM RESTORE REPLICA test")
|
||||||
|
|
||||||
|
node_2.query("SYSTEM SYNC REPLICA test")
|
||||||
|
node_3.query("SYSTEM SYNC REPLICA test")
|
||||||
|
|
||||||
|
check_after_restoration()
|
||||||
|
|
||||||
|
def test_restore_replica_parallel(start_cluster):
|
||||||
|
zk = cluster.get_kazoo_client('zoo1')
|
||||||
|
fill_table()
|
||||||
|
|
||||||
|
print("Deleting root ZK path metadata")
|
||||||
|
zk.delete("/clickhouse/tables/test", recursive=True)
|
||||||
|
assert zk.exists("/clickhouse/tables/test") is None
|
||||||
|
|
||||||
|
node_1.query("SYSTEM RESTART REPLICA test")
|
||||||
|
node_1.query_and_get_error("INSERT INTO test SELECT number AS num FROM numbers(1000,2000) WHERE num % 2 = 0")
|
||||||
|
|
||||||
|
print("Restoring replicas in parallel")
|
||||||
|
|
||||||
|
node_2.query("SYSTEM RESTART REPLICA test")
|
||||||
|
node_3.query("SYSTEM RESTART REPLICA test")
|
||||||
|
|
||||||
|
node_1.query("SYSTEM RESTORE REPLICA test ON CLUSTER test_cluster")
|
||||||
|
|
||||||
|
assert zk.exists("/clickhouse/tables/test")
|
||||||
|
check_data(499500, 1000)
|
||||||
|
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 1000 FROM numbers(1000)")
|
||||||
|
|
||||||
|
check_after_restoration()
|
||||||
|
|
||||||
|
def test_restore_replica_alive_replicas(start_cluster):
|
||||||
|
zk = cluster.get_kazoo_client('zoo1')
|
||||||
|
fill_table()
|
||||||
|
|
||||||
|
print("Deleting replica2 path, trying to restore replica1")
|
||||||
|
zk.delete("/clickhouse/tables/test/replicas/replica2", recursive=True)
|
||||||
|
assert zk.exists("/clickhouse/tables/test/replicas/replica2") is None
|
||||||
|
node_1.query_and_get_error("SYSTEM RESTORE REPLICA test")
|
||||||
|
|
||||||
|
print("Deleting replica1 path, trying to restore replica1")
|
||||||
|
zk.delete("/clickhouse/tables/test/replicas/replica1", recursive=True)
|
||||||
|
assert zk.exists("/clickhouse/tables/test/replicas/replica1") is None
|
||||||
|
|
||||||
|
node_1.query("SYSTEM RESTART REPLICA test")
|
||||||
|
node_1.query("SYSTEM RESTORE REPLICA test")
|
||||||
|
|
||||||
|
node_2.query("SYSTEM RESTART REPLICA test")
|
||||||
|
node_2.query("SYSTEM RESTORE REPLICA test")
|
||||||
|
|
||||||
|
check_data(499500, 1000)
|
||||||
|
|
||||||
|
node_1.query("INSERT INTO test SELECT number + 1000 FROM numbers(1000)")
|
||||||
|
|
||||||
|
node_2.query("SYSTEM SYNC REPLICA test")
|
||||||
|
node_3.query("SYSTEM SYNC REPLICA test")
|
||||||
|
|
||||||
|
check_after_restoration()
|
@ -103,6 +103,7 @@ SYSTEM REPLICATION QUEUES ['SYSTEM STOP REPLICATION QUEUES','SYSTEM START REPLIC
|
|||||||
SYSTEM DROP REPLICA ['DROP REPLICA'] TABLE SYSTEM
|
SYSTEM DROP REPLICA ['DROP REPLICA'] TABLE SYSTEM
|
||||||
SYSTEM SYNC REPLICA ['SYNC REPLICA'] TABLE SYSTEM
|
SYSTEM SYNC REPLICA ['SYNC REPLICA'] TABLE SYSTEM
|
||||||
SYSTEM RESTART REPLICA ['RESTART REPLICA'] TABLE SYSTEM
|
SYSTEM RESTART REPLICA ['RESTART REPLICA'] TABLE SYSTEM
|
||||||
|
SYSTEM RESTORE REPLICA ['RESTORE REPLICA'] TABLE SYSTEM
|
||||||
SYSTEM FLUSH DISTRIBUTED ['FLUSH DISTRIBUTED'] TABLE SYSTEM FLUSH
|
SYSTEM FLUSH DISTRIBUTED ['FLUSH DISTRIBUTED'] TABLE SYSTEM FLUSH
|
||||||
SYSTEM FLUSH LOGS ['FLUSH LOGS'] GLOBAL SYSTEM FLUSH
|
SYSTEM FLUSH LOGS ['FLUSH LOGS'] GLOBAL SYSTEM FLUSH
|
||||||
SYSTEM FLUSH [] \N SYSTEM
|
SYSTEM FLUSH [] \N SYSTEM
|
||||||
|
Loading…
Reference in New Issue
Block a user