mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
fix race between drop and create
This commit is contained in:
parent
a4d9688775
commit
e712fbecb2
@ -147,6 +147,7 @@ void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
|
||||
unused_dir_hide_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_hide_timeout_sec", unused_dir_hide_timeout_sec);
|
||||
unused_dir_rm_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_rm_timeout_sec", unused_dir_rm_timeout_sec);
|
||||
unused_dir_cleanup_period_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_cleanup_period_sec", unused_dir_cleanup_period_sec);
|
||||
drop_error_cooldown_sec = getContext()->getConfigRef().getInt64("database_catalog_drop_error_cooldown_sec", drop_error_cooldown_sec);
|
||||
|
||||
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, getContext());
|
||||
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
|
||||
|
@ -278,7 +278,6 @@ private:
|
||||
bool maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir);
|
||||
|
||||
static constexpr size_t reschedule_time_ms = 100;
|
||||
static constexpr time_t drop_error_cooldown_sec = 5;
|
||||
|
||||
mutable std::mutex databases_mutex;
|
||||
|
||||
@ -325,6 +324,9 @@ private:
|
||||
time_t unused_dir_rm_timeout_sec = default_unused_dir_rm_timeout_sec;
|
||||
static constexpr time_t default_unused_dir_cleanup_period_sec = 24 * 60 * 60; /// 1 day
|
||||
time_t unused_dir_cleanup_period_sec = default_unused_dir_cleanup_period_sec;
|
||||
|
||||
static constexpr time_t default_drop_error_cooldown_sec = 5;
|
||||
time_t drop_error_cooldown_sec = default_drop_error_cooldown_sec;
|
||||
};
|
||||
|
||||
/// This class is useful when creating a table or database.
|
||||
|
@ -903,17 +903,16 @@ void StorageReplicatedMergeTree::drop()
|
||||
/// in this case, has_metadata_in_zookeeper = false, and we also permit to drop the table.
|
||||
|
||||
bool maybe_has_metadata_in_zookeeper = !has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper;
|
||||
zkutil::ZooKeeperPtr zookeeper;
|
||||
if (maybe_has_metadata_in_zookeeper)
|
||||
{
|
||||
/// Table can be shut down, restarting thread is not active
|
||||
/// and calling StorageReplicatedMergeTree::getZooKeeper()/getAuxiliaryZooKeeper() won't suffice.
|
||||
zkutil::ZooKeeperPtr zookeeper = getZooKeeperIfTableShutDown();
|
||||
zookeeper = getZooKeeperIfTableShutDown();
|
||||
|
||||
/// If probably there is metadata in ZooKeeper, we don't allow to drop the table.
|
||||
if (!zookeeper)
|
||||
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Can't drop readonly replicated table (need to drop data in ZooKeeper as well)");
|
||||
|
||||
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings());
|
||||
}
|
||||
|
||||
/// Wait for loading of all outdated parts because
|
||||
@ -927,10 +926,13 @@ void StorageReplicatedMergeTree::drop()
|
||||
}
|
||||
|
||||
dropAllData();
|
||||
|
||||
if (maybe_has_metadata_in_zookeeper)
|
||||
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings(), &has_metadata_in_zookeeper);
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
Poco::Logger * logger, MergeTreeSettingsPtr table_settings)
|
||||
Poco::Logger * logger, MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
|
||||
{
|
||||
if (zookeeper->expired())
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Table was not dropped because ZooKeeper session has expired.");
|
||||
@ -988,12 +990,16 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
|
||||
Coordination::errorMessage(code), remote_replica_path);
|
||||
|
||||
/// And finally remove everything else recursively
|
||||
zookeeper->tryRemoveRecursive(remote_replica_path);
|
||||
}
|
||||
/// It may left some garbage if replica_path subtree is concurrently modified
|
||||
zookeeper->tryRemoveChildrenRecursive(remote_replica_path);
|
||||
|
||||
/// It may left some garbage if replica_path subtree are concurrently modified
|
||||
if (zookeeper->exists(remote_replica_path))
|
||||
LOG_ERROR(logger, "Replica was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", remote_replica_path);
|
||||
/// Update has_metadata_in_zookeeper to avoid retries. Otherwise we can accidentally remove metadata of a new table on retries
|
||||
if (has_metadata_out)
|
||||
*has_metadata_out = false;
|
||||
|
||||
if (zookeeper->tryRemove(remote_replica_path) != Coordination::Error::ZOK)
|
||||
LOG_ERROR(logger, "Replica was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", remote_replica_path);
|
||||
}
|
||||
|
||||
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
|
||||
Strings replicas;
|
||||
@ -8152,6 +8158,12 @@ StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, co
|
||||
auto shared_id = getTableSharedID();
|
||||
if (shared_id == toString(UUIDHelpers::Nil))
|
||||
{
|
||||
if (zookeeper->exists(zookeeper_path))
|
||||
{
|
||||
LOG_WARNING(log, "Not removing shared data for part {} because replica does not have metadata in ZooKeeper, "
|
||||
"but table path exist and other replicas may exist. It may leave some garbage on S3", part.name);
|
||||
return std::make_pair(false, NameSet{});
|
||||
}
|
||||
LOG_TRACE(log, "Part {} blobs can be removed, because table {} completely dropped", part.name, getStorageID().getNameForLogs());
|
||||
return std::make_pair(true, NameSet{});
|
||||
}
|
||||
@ -8177,9 +8189,18 @@ StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, co
|
||||
return std::make_pair(true, NameSet{});
|
||||
}
|
||||
|
||||
/// If table was completely dropped (no meta in zookeeper) we can safely remove parts
|
||||
if (has_metadata_in_zookeeper.has_value() && !has_metadata_in_zookeeper)
|
||||
{
|
||||
if (zookeeper->exists(zookeeper_path))
|
||||
{
|
||||
LOG_WARNING(log, "Not removing shared data for part {} because replica does not have metadata in ZooKeeper, "
|
||||
"but table path exist and other replicas may exist. It may leave some garbage on S3", part.name);
|
||||
return std::make_pair(false, NameSet{});
|
||||
}
|
||||
|
||||
/// If table was completely dropped (no meta in zookeeper) we can safely remove parts
|
||||
return std::make_pair(true, NameSet{});
|
||||
}
|
||||
|
||||
/// We remove parts during table shutdown. If exception happen, restarting thread will be already turned
|
||||
/// off and nobody will reconnect our zookeeper connection. In this case we use zookeeper connection from
|
||||
|
@ -228,7 +228,7 @@ public:
|
||||
/** Remove a specific replica from zookeeper.
|
||||
*/
|
||||
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr);
|
||||
Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);
|
||||
|
||||
/// Removes table from ZooKeeper after the last replica was dropped
|
||||
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
|
||||
|
@ -3263,7 +3263,7 @@ class ClickHouseInstance:
|
||||
sleep_time=0.5,
|
||||
check_callback=lambda x: True,
|
||||
):
|
||||
logging.debug(f"Executing query {sql} on {self.name}")
|
||||
#logging.debug(f"Executing query {sql} on {self.name}")
|
||||
result = None
|
||||
for i in range(retry_count):
|
||||
try:
|
||||
@ -3282,7 +3282,7 @@ class ClickHouseInstance:
|
||||
return result
|
||||
time.sleep(sleep_time)
|
||||
except Exception as ex:
|
||||
logging.debug("Retry {} got exception {}".format(i + 1, ex))
|
||||
#logging.debug("Retry {} got exception {}".format(i + 1, ex))
|
||||
time.sleep(sleep_time)
|
||||
|
||||
if result is not None:
|
||||
|
@ -53,4 +53,6 @@
|
||||
<shard>0</shard>
|
||||
</macros>
|
||||
|
||||
<database_atomic_delay_before_drop_table_sec>3</database_atomic_delay_before_drop_table_sec>
|
||||
<database_atomic_drop_error_cooldown_sec>0</database_atomic_drop_error_cooldown_sec>
|
||||
</clickhouse>
|
||||
|
@ -1,9 +1,11 @@
|
||||
import logging
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
logging.getLogger().addHandler(logging.StreamHandler())
|
||||
@ -127,3 +129,57 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
|
||||
assert len(
|
||||
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
|
||||
) == (3 * FILES_OVERHEAD) + (files_per_part * 3)
|
||||
|
||||
|
||||
def test_drop_table(cluster):
|
||||
node = list(cluster.instances.values())[0]
|
||||
node2 = list(cluster.instances.values())[1]
|
||||
node.query(
|
||||
"create table test_drop_table (n int) engine=ReplicatedMergeTree('/test/drop_table', '1') order by n partition by n % 99 settings storage_policy='s3'"
|
||||
)
|
||||
node2.query(
|
||||
"create table test_drop_table (n int) engine=ReplicatedMergeTree('/test/drop_table', '2') order by n partition by n % 99 settings storage_policy='s3'"
|
||||
)
|
||||
node.query("insert into test_drop_table select * from numbers(1000)")
|
||||
node2.query("system sync replica test_drop_table")
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm._add_rule(
|
||||
{
|
||||
"probability": 0.01,
|
||||
"destination": node.ip_address,
|
||||
"source_port": 2181,
|
||||
"action": "REJECT --reject-with tcp-reset",
|
||||
}
|
||||
)
|
||||
pm._add_rule(
|
||||
{
|
||||
"probability": 0.01,
|
||||
"source": node.ip_address,
|
||||
"destination_port": 2181,
|
||||
"action": "REJECT --reject-with tcp-reset",
|
||||
}
|
||||
)
|
||||
node.query("drop table test_drop_table")
|
||||
for i in range(0, 100):
|
||||
node.query_and_get_answer_with_error(
|
||||
"create table if not exists test_drop_table (n int) "
|
||||
"engine=ReplicatedMergeTree('/test/drop_table', '1') "
|
||||
"order by n partition by n % 99 settings storage_policy='s3'"
|
||||
)
|
||||
time.sleep(0.2)
|
||||
|
||||
replicas = node.query_with_retry(
|
||||
"select name from system.zookeeper where path='/test/drop_table/replicas'"
|
||||
)
|
||||
if "1" in replicas and "test_drop_table" not in node.query("show tables"):
|
||||
node2.query("system drop replica '1' from table test_drop_table")
|
||||
node.query(
|
||||
"create table test_drop_table (n int) engine=ReplicatedMergeTree('/test/drop_table', '1') "
|
||||
"order by n partition by n % 99 settings storage_policy='s3'"
|
||||
)
|
||||
node.query("system sync replica test_drop_table", settings={"receive_timeout": 60})
|
||||
node2.query("drop table test_drop_table")
|
||||
assert "1000\t499500\n" == node.query(
|
||||
"select count(n), sum(n) from test_drop_table"
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user