mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge pull request #11592 from ClickHouse/replicated-merge-tree-create-drop-race-garbage
Fix race conditions in CREATE/DROP of different replicas of ReplicatedMergeTree
This commit is contained in:
commit
89df9915bb
@ -122,7 +122,12 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
|
||||
return statement_stream.str();
|
||||
}
|
||||
|
||||
DatabaseOnDisk::DatabaseOnDisk(const String & name, const String & metadata_path_, const String & data_path_, const String & logger, const Context & context)
|
||||
DatabaseOnDisk::DatabaseOnDisk(
|
||||
const String & name,
|
||||
const String & metadata_path_,
|
||||
const String & data_path_,
|
||||
const String & logger,
|
||||
const Context & context)
|
||||
: DatabaseWithOwnTablesBase(name, logger, context)
|
||||
, metadata_path(metadata_path_)
|
||||
, data_path(data_path_)
|
||||
@ -154,7 +159,6 @@ void DatabaseOnDisk::createTable(
|
||||
/// A race condition would be possible if a table with the same name is simultaneously created using CREATE and using ATTACH.
|
||||
/// But there is protection from it - see using DDLGuard in InterpreterCreateQuery.
|
||||
|
||||
|
||||
if (isDictionaryExist(table_name))
|
||||
throw Exception("Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.",
|
||||
ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
|
@ -152,7 +152,8 @@ MergeTreeData::MergeTreeData(
|
||||
|
||||
if (metadata.sample_by_ast != nullptr)
|
||||
{
|
||||
StorageMetadataKeyField candidate_sampling_key = StorageMetadataKeyField::getKeyFromAST(metadata.sample_by_ast, getColumns(), global_context);
|
||||
StorageMetadataKeyField candidate_sampling_key = StorageMetadataKeyField::getKeyFromAST(
|
||||
metadata.sample_by_ast, getColumns(), global_context);
|
||||
|
||||
const auto & pk_sample_block = getPrimaryKey().sample_block;
|
||||
if (!pk_sample_block.has(candidate_sampling_key.column_names[0]) && !attach
|
||||
@ -1304,6 +1305,24 @@ void MergeTreeData::dropAllData()
|
||||
LOG_TRACE(log, "dropAllData: done.");
|
||||
}
|
||||
|
||||
void MergeTreeData::dropIfEmpty()
|
||||
{
|
||||
LOG_TRACE(log, "dropIfEmpty");
|
||||
|
||||
auto lock = lockParts();
|
||||
|
||||
if (!data_parts_by_info.empty())
|
||||
return;
|
||||
|
||||
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
|
||||
{
|
||||
/// Non recursive, exception is thrown if there are more files.
|
||||
disk->remove(path + "format_version.txt");
|
||||
disk->remove(path + "detached");
|
||||
disk->remove(path);
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
|
@ -485,6 +485,9 @@ public:
|
||||
/// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
|
||||
void dropAllData();
|
||||
|
||||
/// Drop data directories if they are empty. It is safe to call this method if table creation was unsuccessful.
|
||||
void dropIfEmpty();
|
||||
|
||||
/// Moves the entire data directory.
|
||||
/// Flushes the uncompressed blocks cache and the marks cache.
|
||||
/// Must be called with locked lockStructureForAlter().
|
||||
|
@ -194,15 +194,19 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
zookeeper_path = "/" + zookeeper_path;
|
||||
replica_path = zookeeper_path + "/replicas/" + replica_name;
|
||||
|
||||
queue_updating_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
|
||||
queue_updating_task = global_context.getSchedulePool().createTask(
|
||||
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
|
||||
|
||||
mutations_updating_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsUpdatingTask)", [this]{ mutationsUpdatingTask(); });
|
||||
mutations_updating_task = global_context.getSchedulePool().createTask(
|
||||
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsUpdatingTask)", [this]{ mutationsUpdatingTask(); });
|
||||
|
||||
merge_selecting_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
|
||||
merge_selecting_task = global_context.getSchedulePool().createTask(
|
||||
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
|
||||
/// Will be activated if we win leader election.
|
||||
merge_selecting_task->deactivate();
|
||||
|
||||
mutations_finalizing_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); });
|
||||
mutations_finalizing_task = global_context.getSchedulePool().createTask(
|
||||
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); });
|
||||
|
||||
if (global_context.hasZooKeeper())
|
||||
current_zookeeper = global_context.getZooKeeper();
|
||||
@ -248,20 +252,30 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
if (!getDataParts().empty())
|
||||
throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
|
||||
|
||||
createTableIfNotExists();
|
||||
try
|
||||
{
|
||||
bool is_first_replica = createTableIfNotExists();
|
||||
|
||||
/// We have to check granularity on other replicas. If it's fixed we
|
||||
/// must create our new replica with fixed granularity and store this
|
||||
/// information in /replica/metadata.
|
||||
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
|
||||
/// We have to check granularity on other replicas. If it's fixed we
|
||||
/// must create our new replica with fixed granularity and store this
|
||||
/// information in /replica/metadata.
|
||||
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
|
||||
|
||||
checkTableStructure(zookeeper_path);
|
||||
checkTableStructure(zookeeper_path);
|
||||
|
||||
Coordination::Stat metadata_stat;
|
||||
current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
|
||||
metadata_version = metadata_stat.version;
|
||||
Coordination::Stat metadata_stat;
|
||||
current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
|
||||
metadata_version = metadata_stat.version;
|
||||
|
||||
createReplica();
|
||||
if (!is_first_replica)
|
||||
createReplica();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// If replica was not created, rollback creation of data directory.
|
||||
dropIfEmpty();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -403,46 +417,306 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::createTableIfNotExists()
|
||||
bool StorageReplicatedMergeTree::createTableIfNotExists()
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
zookeeper->createAncestors(zookeeper_path);
|
||||
|
||||
for (size_t i = 0; i < 1000; ++i)
|
||||
{
|
||||
/// Invariant: "replicas" does not exist if there is no table or if there are leftovers from incompletely dropped table.
|
||||
if (zookeeper->exists(zookeeper_path + "/replicas"))
|
||||
{
|
||||
LOG_DEBUG(log, "This table {} is already created, will add new replica", zookeeper_path);
|
||||
return false;
|
||||
}
|
||||
|
||||
/// There are leftovers from incompletely dropped table.
|
||||
if (zookeeper->exists(zookeeper_path + "/dropped"))
|
||||
{
|
||||
/// This condition may happen when the previous drop attempt was not completed
|
||||
/// or when table is dropped by another replica right now.
|
||||
/// This is Ok because another replica is definitely going to drop the table.
|
||||
|
||||
LOG_WARNING(log, "Removing leftovers from table {} (this might take several minutes)", zookeeper_path);
|
||||
|
||||
Strings children;
|
||||
int32_t code = zookeeper->tryGetChildren(zookeeper_path, children);
|
||||
if (code == Coordination::ZNONODE)
|
||||
{
|
||||
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & child : children)
|
||||
if (child != "dropped")
|
||||
zookeeper->tryRemoveRecursive(zookeeper_path + "/" + child);
|
||||
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/dropped", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1));
|
||||
code = zookeeper->tryMulti(ops, responses);
|
||||
|
||||
if (code == Coordination::ZNONODE)
|
||||
{
|
||||
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
|
||||
}
|
||||
else if (code == Coordination::ZNOTEMPTY)
|
||||
{
|
||||
throw Exception(fmt::format(
|
||||
"The old table was not completely removed from ZooKeeper, {} still exists and may contain some garbage. But it should never happen according to the logic of operations (it's a bug).", zookeeper_path), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
else if (code != Coordination::ZOK)
|
||||
{
|
||||
/// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation.
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "The leftovers from table {} was successfully removed from ZooKeeper", zookeeper_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Creating table {}", zookeeper_path);
|
||||
|
||||
/// We write metadata of table so that the replicas can check table parameters with them.
|
||||
String metadata = ReplicatedMergeTreeTableMetadata(*this).toString();
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
|
||||
|
||||
/// Check that the table is not being dropped right now.
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/dropped", "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/dropped", -1));
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", metadata,
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/columns", getColumns().toString(),
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/blocks", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/block_numbers", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/nonincrement_block_numbers", "",
|
||||
zkutil::CreateMode::Persistent)); /// /nonincrement_block_numbers dir is unused, but is created nonetheless for backwards compatibility.
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/leader_election", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/temp", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name,
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// And create first replica atomically. See also "createReplica" method that is used to create not the first replicas.
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "0",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", metadata,
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(),
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
if (code == Coordination::ZNODEEXISTS)
|
||||
{
|
||||
LOG_WARNING(log, "It looks like the table {} was created by another server at the same moment, will retry", zookeeper_path);
|
||||
continue;
|
||||
}
|
||||
else if (code != Coordination::ZOK)
|
||||
{
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
throw Exception("Cannot create table, because it is created concurrently every time or because of logical error", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::createReplica()
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
if (zookeeper->exists(zookeeper_path))
|
||||
return;
|
||||
LOG_DEBUG(log, "Creating replica {}", replica_path);
|
||||
|
||||
LOG_DEBUG(log, "Creating table {}", zookeeper_path);
|
||||
int32_t code;
|
||||
|
||||
zookeeper->createAncestors(zookeeper_path);
|
||||
do
|
||||
{
|
||||
Coordination::Stat replicas_stat;
|
||||
String replicas_value;
|
||||
|
||||
/// We write metadata of table so that the replicas can check table parameters with them.
|
||||
String metadata = ReplicatedMergeTreeTableMetadata(*this).toString();
|
||||
code = zookeeper->tryGet(zookeeper_path + "/replicas", replicas_value, &replicas_stat);
|
||||
if (code == Coordination::ZNONODE)
|
||||
throw Exception(fmt::format("Cannot create a replica of the table {}, because the last replica of the table was dropped right now",
|
||||
zookeeper_path), ErrorCodes::ALL_REPLICAS_LOST);
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", metadata,
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/columns", getColumns().toString(),
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/blocks", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/block_numbers", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/nonincrement_block_numbers", "",
|
||||
zkutil::CreateMode::Persistent)); /// /nonincrement_block_numbers dir is unused, but is created nonetheless for backwards compatibility.
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/leader_election", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/temp", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
/// It is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica.
|
||||
/// By the way, it's possible that the replica will be first, if all previous replicas were removed concurrently.
|
||||
String is_lost_value = replicas_stat.numChildren ? "1" : "0";
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
if (code && code != Coordination::ZNODEEXISTS)
|
||||
throw Coordination::Exception(code);
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value,
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", ReplicatedMergeTreeTableMetadata(*this).toString(),
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(),
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
|
||||
|
||||
Coordination::Responses responses;
|
||||
code = zookeeper->tryMulti(ops, responses);
|
||||
if (code == Coordination::ZNODEEXISTS)
|
||||
{
|
||||
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
|
||||
}
|
||||
else if (code == Coordination::ZBADVERSION)
|
||||
{
|
||||
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
|
||||
}
|
||||
else if (code == Coordination::ZNONODE)
|
||||
{
|
||||
throw Exception("Table " + zookeeper_path + " was suddenly removed.", ErrorCodes::ALL_REPLICAS_LOST);
|
||||
}
|
||||
else
|
||||
{
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
}
|
||||
} while (code == Coordination::ZBADVERSION);
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::drop()
|
||||
{
|
||||
{
|
||||
auto zookeeper = tryGetZooKeeper();
|
||||
|
||||
if (is_readonly || !zookeeper)
|
||||
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
|
||||
|
||||
shutdown();
|
||||
|
||||
if (zookeeper->expired())
|
||||
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
|
||||
|
||||
LOG_INFO(log, "Removing replica {}", replica_path);
|
||||
replica_is_active_node = nullptr;
|
||||
/// It may left some garbage if replica_path subtree are concurently modified
|
||||
zookeeper->tryRemoveRecursive(replica_path);
|
||||
if (zookeeper->exists(replica_path))
|
||||
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", replica_path);
|
||||
|
||||
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
|
||||
Strings replicas;
|
||||
if (Coordination::ZOK == zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) && replicas.empty())
|
||||
{
|
||||
LOG_INFO(log, "{} is the last replica, will remove table", replica_path);
|
||||
|
||||
/** At this moment, another replica can be created and we cannot remove the table.
|
||||
* Try to remove /replicas node first. If we successfully removed it,
|
||||
* it guarantees that we are the only replica that proceed to remove the table
|
||||
* and no new replicas can be created after that moment (it requires the existence of /replicas node).
|
||||
* and table cannot be recreated with new /replicas node on another servers while we are removing data,
|
||||
* because table creation is executed in single transaction that will conflict with remaining nodes.
|
||||
*/
|
||||
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/replicas", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/dropped", "", zkutil::CreateMode::Persistent));
|
||||
int32_t code = zookeeper->tryMulti(ops, responses);
|
||||
|
||||
if (code == Coordination::ZNONODE || code == Coordination::ZNODEEXISTS)
|
||||
{
|
||||
LOG_WARNING(log, "Table {} is already started to be removing by another replica right now", replica_path);
|
||||
}
|
||||
else if (code == Coordination::ZNOTEMPTY)
|
||||
{
|
||||
LOG_WARNING(log, "Another replica was suddenly created, will keep the table {}", replica_path);
|
||||
}
|
||||
else if (code != Coordination::ZOK)
|
||||
{
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO(log, "Removing table {} (this might take several minutes)", zookeeper_path);
|
||||
|
||||
Strings children;
|
||||
code = zookeeper->tryGetChildren(zookeeper_path, children);
|
||||
if (code == Coordination::ZNONODE)
|
||||
{
|
||||
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & child : children)
|
||||
if (child != "dropped")
|
||||
zookeeper->tryRemoveRecursive(zookeeper_path + "/" + child);
|
||||
|
||||
ops.clear();
|
||||
responses.clear();
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/dropped", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1));
|
||||
code = zookeeper->tryMulti(ops, responses);
|
||||
|
||||
if (code == Coordination::ZNONODE)
|
||||
{
|
||||
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
|
||||
}
|
||||
else if (code == Coordination::ZNOTEMPTY)
|
||||
{
|
||||
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, {} still exists and may contain some garbage.",
|
||||
zookeeper_path);
|
||||
}
|
||||
else if (code != Coordination::ZOK)
|
||||
{
|
||||
/// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation.
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO(log, "Table {} was successfully removed from ZooKeeper", zookeeper_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dropAllData();
|
||||
}
|
||||
|
||||
|
||||
@ -542,48 +816,6 @@ static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const Strin
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::createReplica()
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
LOG_DEBUG(log, "Creating replica {}", replica_path);
|
||||
|
||||
int32_t code;
|
||||
|
||||
do
|
||||
{
|
||||
Coordination::Stat replicas_stat;
|
||||
String last_added_replica = zookeeper->get(zookeeper_path + "/replicas", &replicas_stat);
|
||||
|
||||
/// If it is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica.
|
||||
String is_lost_value = last_added_replica.empty() ? "0" : "1";
|
||||
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value, zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", ReplicatedMergeTreeTableMetadata(*this).toString(), zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent));
|
||||
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
|
||||
|
||||
code = zookeeper->tryMulti(ops, responses);
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
|
||||
else if (code == Coordination::Error::ZBADVERSION)
|
||||
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
|
||||
else
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
} while (code == Coordination::Error::ZBADVERSION);
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
@ -3699,41 +3931,6 @@ void StorageReplicatedMergeTree::checkPartitionCanBeDropped(const ASTPtr & parti
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::drop()
|
||||
{
|
||||
{
|
||||
auto zookeeper = tryGetZooKeeper();
|
||||
|
||||
if (is_readonly || !zookeeper)
|
||||
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
|
||||
|
||||
shutdown();
|
||||
|
||||
if (zookeeper->expired())
|
||||
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
|
||||
|
||||
LOG_INFO(log, "Removing replica {}", replica_path);
|
||||
replica_is_active_node = nullptr;
|
||||
/// It may left some garbage if replica_path subtree are concurently modified
|
||||
zookeeper->tryRemoveRecursive(replica_path);
|
||||
if (zookeeper->exists(replica_path))
|
||||
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", replica_path);
|
||||
|
||||
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
|
||||
Strings replicas;
|
||||
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
|
||||
{
|
||||
LOG_INFO(log, "Removing table {} (this might take several minutes)", zookeeper_path);
|
||||
zookeeper->tryRemoveRecursive(zookeeper_path);
|
||||
if (zookeeper->exists(zookeeper_path))
|
||||
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", zookeeper_path);
|
||||
}
|
||||
}
|
||||
|
||||
dropAllData();
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
|
||||
{
|
||||
MergeTreeData::rename(new_path_to_table_data, new_table_id);
|
||||
|
@ -291,9 +291,10 @@ private:
|
||||
template <class Func>
|
||||
void foreachCommittedParts(const Func & func) const;
|
||||
|
||||
/** Creates the minimum set of nodes in ZooKeeper.
|
||||
/** Creates the minimum set of nodes in ZooKeeper and create first replica.
|
||||
* Returns true if was created, false if exists.
|
||||
*/
|
||||
void createTableIfNotExists();
|
||||
bool createTableIfNotExists();
|
||||
|
||||
/** Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
|
||||
*/
|
||||
|
28
tests/queries/0_stateless/01305_replica_create_drop_zookeeper.sh
Executable file
28
tests/queries/0_stateless/01305_replica_create_drop_zookeeper.sh
Executable file
@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
function thread()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1;
|
||||
CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 |
|
||||
grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed'
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread;
|
||||
|
||||
TIMEOUT=10
|
||||
|
||||
timeout $TIMEOUT bash -c 'thread 1' &
|
||||
timeout $TIMEOUT bash -c 'thread 2' &
|
||||
|
||||
wait
|
||||
|
||||
for i in {1,2}; do $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_table_$i"; done
|
Loading…
Reference in New Issue
Block a user