Add retries to create

This commit is contained in:
Antonio Andelic 2024-08-01 10:30:50 +02:00
parent c2df527a32
commit 7db4065898
7 changed files with 275 additions and 149 deletions

View File

@ -408,104 +408,192 @@ StorageKeeperMap::StorageKeeperMap(
if (attach)
{
checkTable<false>();
checkTable<false>(context_);
return;
}
auto client = getClient();
const auto & settings = context_->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{settings.keeper_max_retries, settings.keeper_retry_initial_backoff_ms, settings.keeper_retry_max_backoff_ms},
context_->getProcessListElement()};
if (zk_root_path != "/" && !client->exists(zk_root_path))
{
LOG_TRACE(log, "Creating root path {}", zk_root_path);
client->createAncestors(zk_root_path);
client->createIfNotExists(zk_root_path, "");
}
zk_retry.retryLoop(
[&]
{
auto client = getClient();
if (zk_root_path != "/" && !client->exists(zk_root_path))
{
LOG_TRACE(log, "Creating root path {}", zk_root_path);
client->createAncestors(zk_root_path);
client->createIfNotExists(zk_root_path, "");
}
});
std::shared_ptr<zkutil::EphemeralNodeHolder> metadata_drop_lock;
int32_t drop_lock_version = -1;
for (size_t i = 0; i < 1000; ++i)
{
std::string stored_metadata_string;
auto exists = client->tryGet(zk_metadata_path, stored_metadata_string);
if (exists)
{
// this requires same name for columns
// maybe we can do a smarter comparison for columns and primary key expression
if (stored_metadata_string != metadata_string)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
zk_root_path,
stored_metadata_string);
auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent);
/// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica
/// We still don't know if the table matches the expected metadata so table_is_valid is not changed
/// It will be checked lazily on the first operation
if (code == Coordination::Error::ZOK)
return;
if (code != Coordination::Error::ZNONODE)
throw zkutil::KeeperException(code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path);
/// ZNONODE means we dropped zk_tables_path but didn't finish drop completely
}
if (client->exists(zk_dropped_path))
{
LOG_INFO(log, "Removing leftover nodes");
auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE)
bool success = false;
zk_retry.retryLoop(
[&]
{
LOG_INFO(log, "Someone else removed leftover nodes");
}
else if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Someone else is removing leftover nodes");
continue;
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception::fromPath(code, zk_dropped_lock_path);
}
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
if (!dropTable(client, metadata_drop_lock))
continue;
}
}
auto client = getClient();
std::string stored_metadata_string;
auto exists = client->tryGet(zk_metadata_path, stored_metadata_string);
Coordination::Requests create_requests
{
zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent),
};
if (exists)
{
// this requires same name for columns
// maybe we can do a smarter comparison for columns and primary key expression
if (stored_metadata_string != metadata_string)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
zk_root_path,
stored_metadata_string);
Coordination::Responses create_responses;
auto code = client->tryMulti(create_requests, create_responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path);
continue;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, create_requests, create_responses);
}
auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent);
/// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica
/// We still don't know if the table matches the expected metadata so table_is_valid is not changed
/// It will be checked lazily on the first operation
if (code == Coordination::Error::ZOK)
{
success = true;
return;
}
table_status = TableStatus::VALID;
/// we are the first table created for the specified Keeper path, i.e. we are the first replica
return;
/// We most likely created the path but got a timeout or disconnect
if (code == Coordination::Error::ZNODEEXISTS && zk_retry.isRetry())
{
success = true;
return;
}
if (code != Coordination::Error::ZNONODE)
throw zkutil::KeeperException(
code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path);
/// ZNONODE means we dropped zk_tables_path but didn't finish drop completely
}
if (client->exists(zk_dropped_path))
{
LOG_INFO(log, "Removing leftover nodes");
bool drop_finished = false;
if (zk_retry.isRetry() && metadata_drop_lock != nullptr && drop_lock_version != -1)
{
/// if we have leftover lock from previous try, we need to recreate the ephemeral with our session
Coordination::Requests drop_lock_requests{
zkutil::makeRemoveRequest(zk_dropped_lock_path, drop_lock_version),
zkutil::makeCreateRequest(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral),
};
Coordination::Responses drop_lock_responses;
auto lock_code = client->tryMulti(drop_lock_requests, drop_lock_responses);
if (lock_code == Coordination::Error::ZBADVERSION)
{
LOG_INFO(log, "Someone else is removing leftover nodes");
metadata_drop_lock->setAlreadyRemoved();
metadata_drop_lock.reset();
return;
}
if (drop_lock_responses[0]->error == Coordination::Error::ZNONODE)
{
/// someone else removed metadata nodes or the previous ephemeral node expired
/// we will try creating dropped lock again to make sure
metadata_drop_lock->setAlreadyRemoved();
metadata_drop_lock.reset();
}
else if (lock_code == Coordination::Error::ZOK)
{
metadata_drop_lock->setAlreadyRemoved();
metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
drop_lock_version = -1;
Coordination::Stat lock_stat;
client->get(zk_dropped_lock_path, &lock_stat);
drop_lock_version = lock_stat.version;
if (!dropTable(client, metadata_drop_lock))
{
metadata_drop_lock.reset();
return;
}
drop_finished = true;
}
}
if (!drop_finished)
{
auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE)
{
LOG_INFO(log, "Someone else removed leftover nodes");
}
else if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Someone else is removing leftover nodes");
return;
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception::fromPath(code, zk_dropped_lock_path);
}
else
{
metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
drop_lock_version = -1;
Coordination::Stat lock_stat;
client->get(zk_dropped_lock_path, &lock_stat);
drop_lock_version = lock_stat.version;
if (!dropTable(client, metadata_drop_lock))
{
metadata_drop_lock.reset();
return;
}
}
}
}
Coordination::Requests create_requests{
zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent),
};
Coordination::Responses create_responses;
auto code = client->tryMulti(create_requests, create_responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(
log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path);
return;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, create_requests, create_responses);
}
table_status = TableStatus::VALID;
/// we are the first table created for the specified Keeper path, i.e. we are the first replica
success = true;
});
if (success)
return;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot create metadata for table, because it is removed concurrently or because "
"of wrong zk_root_path ({})", zk_root_path);
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot create metadata for table, because it is removed concurrently or because "
"of wrong zk_root_path ({})",
zk_root_path);
}
@ -518,7 +606,7 @@ Pipe StorageKeeperMap::read(
size_t max_block_size,
size_t num_streams)
{
checkTable<true>();
checkTable<true>(context_);
storage_snapshot->check(column_names);
FieldVectorPtr filtered_keys;
@ -592,13 +680,13 @@ Pipe StorageKeeperMap::read(
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
{
checkTable<true>();
checkTable<true>(local_context);
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot->getSampleBlock(), local_context);
}
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
checkTable<true>();
checkTable<true>(local_context);
const auto & settings = local_context->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
@ -657,7 +745,7 @@ bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::E
void StorageKeeperMap::drop()
{
auto current_table_status = getTableStatus();
auto current_table_status = getTableStatus(getContext());
if (current_table_status == TableStatus::UNKNOWN)
{
static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated "
@ -666,7 +754,7 @@ void StorageKeeperMap::drop()
}
/// if only column metadata is wrong we can still drop the table correctly
if (current_table_status == TableStatus::INVALID_KEEPER_STRUCTURE)
if (current_table_status == TableStatus::INVALID_METADATA)
return;
auto client = getClient();
@ -1029,7 +1117,7 @@ UInt64 StorageKeeperMap::keysLimit() const
return keys_limit;
}
StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus() const
StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus(const ContextPtr & local_context) const
{
std::lock_guard lock{init_mutex};
if (table_status != TableStatus::UNKNOWN)
@ -1039,57 +1127,70 @@ StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus() const
{
try
{
auto client = getClient();
const auto & settings = local_context->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms},
local_context->getProcessListElement()};
Coordination::Stat metadata_stat;
auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat);
if (metadata_stat.numChildren == 0)
zk_retry.retryLoop([&]
{
auto client = getClient();
Coordination::Stat metadata_stat;
auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat);
if (metadata_stat.numChildren == 0)
{
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
return;
}
if (metadata_string != stored_metadata_string)
{
LOG_ERROR(
log,
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
zk_root_path,
stored_metadata_string);
table_status = TableStatus::INVALID_METADATA;
return;
}
// validate all metadata and data nodes are present
Coordination::Requests requests;
requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1));
Coordination::Responses responses;
client->tryMulti(requests, responses);
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
return;
}
if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Table node ({}) is missing", zk_table_path);
return;
}
if (metadata_string != stored_metadata_string)
{
LOG_ERROR(
log,
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
zk_root_path,
stored_metadata_string);
table_status = TableStatus::INVALID_METADATA;
return;
}
if (responses[1]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Data node ({}) is missing", zk_data_path);
return;
}
// validate all metadata and data nodes are present
Coordination::Requests requests;
requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1));
if (responses[2]->error == Coordination::Error::ZOK)
{
LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path);
return;
}
Coordination::Responses responses;
client->tryMulti(requests, responses);
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Table node ({}) is missing", zk_table_path);
return;
}
if (responses[1]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Data node ({}) is missing", zk_data_path);
return;
}
if (responses[2]->error == Coordination::Error::ZOK)
{
LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path);
return;
}
table_status = TableStatus::VALID;
table_status = TableStatus::VALID;
});
}
catch (const Coordination::Exception & e)
{
@ -1227,7 +1328,7 @@ void StorageKeeperMap::checkMutationIsPossible(const MutationCommands & commands
void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr local_context)
{
checkTable<true>();
checkTable<true>(local_context);
if (commands.empty())
return;

View File

@ -78,9 +78,9 @@ public:
UInt64 keysLimit() const;
template <bool throw_on_error>
void checkTable() const
void checkTable(const ContextPtr & local_context) const
{
auto current_table_status = getTableStatus();
auto current_table_status = getTableStatus(local_context);
if (table_status == TableStatus::UNKNOWN)
{
static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated "
@ -119,7 +119,7 @@ private:
VALID
};
TableStatus getTableStatus() const;
TableStatus getTableStatus(const ContextPtr & context) const;
void restoreDataImpl(
const BackupPtr & backup,

View File

@ -0,0 +1,14 @@
<clickhouse>
<profiles>
<default>
<insert_keeper_max_retries>0</insert_keeper_max_retries>
<keeper_max_retries>0</keeper_max_retries>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -10,6 +10,7 @@ cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/enable_keeper_map.xml"],
user_configs=["configs/keeper_retries.xml"],
with_zookeeper=True,
stay_alive=True,
)
@ -49,7 +50,6 @@ def assert_keeper_exception_after_partition(query):
error = node.query_and_get_error_with_retry(
query,
sleep_time=1,
settings={"insert_keeper_max_retries": 1, "keeper_max_retries": 1},
)
assert "Coordination::Exception" in error
except:
@ -91,7 +91,6 @@ def test_keeper_map_without_zk(started_cluster):
error = node.query_and_get_error_with_retry(
"SELECT * FROM test_keeper_map_without_zk",
sleep_time=1,
settings={"keeper_max_retries": 1},
)
assert "Failed to activate table because of connection issues" in error
except:

View File

@ -1,5 +1,6 @@
<clickhouse>
<zookeeper>
<enable_fault_injections_during_startup>1</enable_fault_injections_during_startup>
<send_fault_probability>0.005</send_fault_probability>
<recv_fault_probability>0.005</recv_fault_probability>
</zookeeper>

View File

@ -0,0 +1,14 @@
<clickhouse>
<profiles>
<default>
<keeper_max_retries>20</keeper_max_retries>
<keeper_retry_max_backoff_ms>10000</keeper_retry_max_backoff_ms>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -11,6 +11,7 @@ cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/enable_keeper_map.xml"],
user_configs=["configs/keeper_retries.xml"],
with_zookeeper=True,
stay_alive=True,
)
@ -42,10 +43,6 @@ def repeat_query(query, repeat):
for _ in range(repeat):
node.query(
query,
settings={
"keeper_max_retries": 20,
"keeper_retry_max_backoff_ms": 10000,
},
)
@ -53,10 +50,6 @@ def test_queries(started_cluster):
start_clean_clickhouse()
node.query("DROP TABLE IF EXISTS keeper_map_retries SYNC")
node.query(
"CREATE TABLE keeper_map_retries (a UInt64, b UInt64) Engine=KeeperMap('/keeper_map_retries') PRIMARY KEY a"
)
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "fault_injection.xml"),
@ -66,6 +59,10 @@ def test_queries(started_cluster):
repeat_count = 10
node.query(
"CREATE TABLE keeper_map_retries (a UInt64, b UInt64) Engine=KeeperMap('/keeper_map_retries') PRIMARY KEY a",
)
repeat_query(
"INSERT INTO keeper_map_retries SELECT number, number FROM numbers(500)",
repeat_count,