mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Merge remote-tracking branch 'origin/master' into pr-local-plan
This commit is contained in:
commit
36036c3da9
@ -34,17 +34,13 @@ curl https://clickhouse.com/ | sh
|
||||
|
||||
Every month we get together with the community (users, contributors, customers, those interested in learning more about ClickHouse) to discuss what is coming in the latest release. If you are interested in sharing what you've built on ClickHouse, let us know.
|
||||
|
||||
* [v24.7 Community Call](https://clickhouse.com/company/events/v24-7-community-release-call) - Jul 30
|
||||
* [v24.8 Community Call](https://clickhouse.com/company/events/v24-8-community-release-call) - August 29
|
||||
|
||||
## Upcoming Events
|
||||
|
||||
Keep an eye out for upcoming meetups and events around the world. Somewhere else you want us to be? Please feel free to reach out to tyler `<at>` clickhouse `<dot>` com. You can also peruse [ClickHouse Events](https://clickhouse.com/company/news-events) for a list of all upcoming trainings, meetups, speaking engagements, etc.
|
||||
|
||||
* [ClickHouse Meetup in Paris](https://www.meetup.com/clickhouse-france-user-group/events/300783448/) - Jul 9
|
||||
* [ClickHouse Cloud - Live Update Call](https://clickhouse.com/company/events/202407-cloud-update-live) - Jul 9
|
||||
* [ClickHouse Meetup @ Ramp - New York City](https://www.meetup.com/clickhouse-new-york-user-group/events/300595845/) - Jul 9
|
||||
* [AWS Summit in New York](https://clickhouse.com/company/events/2024-07-awssummit-nyc) - Jul 10
|
||||
* [ClickHouse Meetup @ Klaviyo - Boston](https://www.meetup.com/clickhouse-boston-user-group/events/300907870) - Jul 11
|
||||
* MORE COMING SOON!
|
||||
|
||||
## Recent Recordings
|
||||
* **Recent Meetup Videos**: [Meetup Playlist](https://www.youtube.com/playlist?list=PL0Z2YDlm0b3iNDUzpY1S3L_iV4nARda_U) Whenever possible recordings of the ClickHouse Community Meetups are edited and presented as individual talks. Current featuring "Modern SQL in 2023", "Fast, Concurrent, and Consistent Asynchronous INSERTS in ClickHouse", and "Full-Text Indices: Design and Experiments"
|
||||
|
@ -103,8 +103,6 @@ Default: 2
|
||||
|
||||
The policy on how to perform a scheduling for background merges and mutations. Possible values are: `round_robin` and `shortest_task_first`.
|
||||
|
||||
## background_merges_mutations_scheduling_policy
|
||||
|
||||
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.
|
||||
Could be applied from the `default` profile for backward compatibility.
|
||||
|
||||
|
@ -3494,18 +3494,22 @@ DDLWorker & Context::getDDLWorker() const
|
||||
if (shared->ddl_worker_startup_task)
|
||||
waitLoad(shared->ddl_worker_startup_task); // Just wait and do not prioritize, because it depends on all load and startup tasks
|
||||
|
||||
SharedLockGuard lock(shared->mutex);
|
||||
if (!shared->ddl_worker)
|
||||
{
|
||||
if (!hasZooKeeper())
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config");
|
||||
|
||||
if (!hasDistributedDDL())
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no DistributedDDL configuration in server config");
|
||||
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "DDL background thread is not initialized");
|
||||
/// Only acquire the lock for reading ddl_worker field.
|
||||
/// hasZooKeeper() and hasDistributedDDL() acquire the same lock as well and double acquisition of the lock in shared mode can lead
|
||||
/// to a deadlock if an exclusive lock attempt is made in the meantime by another thread.
|
||||
SharedLockGuard lock(shared->mutex);
|
||||
if (shared->ddl_worker)
|
||||
return *shared->ddl_worker;
|
||||
}
|
||||
return *shared->ddl_worker;
|
||||
|
||||
if (!hasZooKeeper())
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config");
|
||||
|
||||
if (!hasDistributedDDL())
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no DistributedDDL configuration in server config");
|
||||
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "DDL background thread is not initialized");
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr Context::getZooKeeper() const
|
||||
|
@ -43,6 +43,7 @@
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperRetries.h>
|
||||
|
||||
#include <Backups/BackupEntriesCollector.h>
|
||||
#include <Backups/IBackupCoordination.h>
|
||||
@ -78,6 +79,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int LIMIT_EXCEEDED;
|
||||
extern const int CANNOT_RESTORE_TABLE;
|
||||
extern const int INVALID_STATE;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -120,7 +122,7 @@ public:
|
||||
: SinkToStorage(header), storage(storage_), context(std::move(context_))
|
||||
{
|
||||
auto primary_key = storage.getPrimaryKey();
|
||||
assert(primary_key.size() == 1);
|
||||
chassert(primary_key.size() == 1);
|
||||
primary_key_pos = getHeader().getPositionByName(primary_key[0]);
|
||||
}
|
||||
|
||||
@ -171,81 +173,94 @@ public:
|
||||
template <bool for_update>
|
||||
void finalize(bool strict)
|
||||
{
|
||||
auto zookeeper = storage.getClient();
|
||||
const auto & settings = context->getSettingsRef();
|
||||
|
||||
auto keys_limit = storage.keysLimit();
|
||||
ZooKeeperRetriesControl zk_retry{
|
||||
getName(),
|
||||
getLogger(getName()),
|
||||
ZooKeeperRetriesInfo{
|
||||
settings.insert_keeper_max_retries,
|
||||
settings.insert_keeper_retry_initial_backoff_ms,
|
||||
settings.insert_keeper_retry_max_backoff_ms},
|
||||
context->getProcessListElement()};
|
||||
|
||||
size_t current_keys_num = 0;
|
||||
size_t new_keys_num = 0;
|
||||
|
||||
// We use keys limit as a soft limit so we ignore some cases when it can be still exceeded
|
||||
// (e.g if parallel insert queries are being run)
|
||||
if (keys_limit != 0)
|
||||
zk_retry.retryLoop([&]()
|
||||
{
|
||||
Coordination::Stat data_stat;
|
||||
zookeeper->get(storage.dataPath(), &data_stat);
|
||||
current_keys_num = data_stat.numChildren;
|
||||
}
|
||||
auto zookeeper = storage.getClient();
|
||||
auto keys_limit = storage.keysLimit();
|
||||
|
||||
std::vector<std::string> key_paths;
|
||||
key_paths.reserve(new_values.size());
|
||||
for (const auto & [key, _] : new_values)
|
||||
key_paths.push_back(storage.fullPathForKey(key));
|
||||
size_t current_keys_num = 0;
|
||||
size_t new_keys_num = 0;
|
||||
|
||||
zkutil::ZooKeeper::MultiExistsResponse results;
|
||||
|
||||
if constexpr (!for_update)
|
||||
{
|
||||
if (!strict)
|
||||
results = zookeeper->exists(key_paths);
|
||||
}
|
||||
|
||||
Coordination::Requests requests;
|
||||
requests.reserve(key_paths.size());
|
||||
for (size_t i = 0; i < key_paths.size(); ++i)
|
||||
{
|
||||
auto key = fs::path(key_paths[i]).filename();
|
||||
|
||||
if constexpr (for_update)
|
||||
// We use keys limit as a soft limit so we ignore some cases when it can be still exceeded
|
||||
// (e.g if parallel insert queries are being run)
|
||||
if (keys_limit != 0)
|
||||
{
|
||||
int32_t version = -1;
|
||||
if (strict)
|
||||
version = versions.at(key);
|
||||
|
||||
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version));
|
||||
Coordination::Stat data_stat;
|
||||
zookeeper->get(storage.dataPath(), &data_stat);
|
||||
current_keys_num = data_stat.numChildren;
|
||||
}
|
||||
else
|
||||
|
||||
std::vector<std::string> key_paths;
|
||||
key_paths.reserve(new_values.size());
|
||||
for (const auto & [key, _] : new_values)
|
||||
key_paths.push_back(storage.fullPathForKey(key));
|
||||
|
||||
zkutil::ZooKeeper::MultiExistsResponse results;
|
||||
|
||||
if constexpr (!for_update)
|
||||
{
|
||||
if (!strict && results[i].error == Coordination::Error::ZOK)
|
||||
if (!strict)
|
||||
results = zookeeper->exists(key_paths);
|
||||
}
|
||||
|
||||
Coordination::Requests requests;
|
||||
requests.reserve(key_paths.size());
|
||||
for (size_t i = 0; i < key_paths.size(); ++i)
|
||||
{
|
||||
auto key = fs::path(key_paths[i]).filename();
|
||||
|
||||
if constexpr (for_update)
|
||||
{
|
||||
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
|
||||
int32_t version = -1;
|
||||
if (strict)
|
||||
version = versions.at(key);
|
||||
|
||||
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version));
|
||||
}
|
||||
else
|
||||
{
|
||||
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
|
||||
++new_keys_num;
|
||||
if (!strict && results[i].error == Coordination::Error::ZOK)
|
||||
{
|
||||
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
|
||||
}
|
||||
else
|
||||
{
|
||||
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
|
||||
++new_keys_num;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (new_keys_num != 0)
|
||||
{
|
||||
auto will_be = current_keys_num + new_keys_num;
|
||||
if (keys_limit != 0 && will_be > keys_limit)
|
||||
throw Exception(
|
||||
ErrorCodes::LIMIT_EXCEEDED,
|
||||
"Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}",
|
||||
new_keys_num,
|
||||
keys_limit,
|
||||
will_be);
|
||||
}
|
||||
if (new_keys_num != 0)
|
||||
{
|
||||
auto will_be = current_keys_num + new_keys_num;
|
||||
if (keys_limit != 0 && will_be > keys_limit)
|
||||
throw Exception(
|
||||
ErrorCodes::LIMIT_EXCEEDED,
|
||||
"Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}",
|
||||
new_keys_num,
|
||||
keys_limit,
|
||||
will_be);
|
||||
}
|
||||
|
||||
zookeeper->multi(requests, /* check_session_valid */ true);
|
||||
zookeeper->multi(requests, /* check_session_valid */ true);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
template <typename KeyContainer>
|
||||
class StorageKeeperMapSource : public ISource
|
||||
class StorageKeeperMapSource : public ISource, WithContext
|
||||
{
|
||||
const StorageKeeperMap & storage;
|
||||
size_t max_block_size;
|
||||
@ -276,8 +291,15 @@ public:
|
||||
KeyContainerPtr container_,
|
||||
KeyContainerIter begin_,
|
||||
KeyContainerIter end_,
|
||||
bool with_version_column_)
|
||||
: ISource(getHeader(header, with_version_column_)), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
|
||||
bool with_version_column_,
|
||||
ContextPtr context_)
|
||||
: ISource(getHeader(header, with_version_column_))
|
||||
, WithContext(std::move(context_))
|
||||
, storage(storage_)
|
||||
, max_block_size(max_block_size_)
|
||||
, container(std::move(container_))
|
||||
, it(begin_)
|
||||
, end(end_)
|
||||
, with_version_column(with_version_column_)
|
||||
{
|
||||
}
|
||||
@ -302,12 +324,12 @@ public:
|
||||
for (auto & raw_key : raw_keys)
|
||||
raw_key = base64Encode(raw_key, /* url_encoding */ true);
|
||||
|
||||
return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column);
|
||||
return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column, getContext());
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
|
||||
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column);
|
||||
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column, getContext());
|
||||
it += elem_num;
|
||||
return chunk;
|
||||
}
|
||||
@ -386,104 +408,192 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
|
||||
if (attach)
|
||||
{
|
||||
checkTable<false>();
|
||||
checkTable<false>(context_);
|
||||
return;
|
||||
}
|
||||
|
||||
auto client = getClient();
|
||||
const auto & settings = context_->getSettingsRef();
|
||||
ZooKeeperRetriesControl zk_retry{
|
||||
getName(),
|
||||
getLogger(getName()),
|
||||
ZooKeeperRetriesInfo{settings.keeper_max_retries, settings.keeper_retry_initial_backoff_ms, settings.keeper_retry_max_backoff_ms},
|
||||
context_->getProcessListElement()};
|
||||
|
||||
if (zk_root_path != "/" && !client->exists(zk_root_path))
|
||||
{
|
||||
LOG_TRACE(log, "Creating root path {}", zk_root_path);
|
||||
client->createAncestors(zk_root_path);
|
||||
client->createIfNotExists(zk_root_path, "");
|
||||
}
|
||||
zk_retry.retryLoop(
|
||||
[&]
|
||||
{
|
||||
auto client = getClient();
|
||||
|
||||
if (zk_root_path != "/" && !client->exists(zk_root_path))
|
||||
{
|
||||
LOG_TRACE(log, "Creating root path {}", zk_root_path);
|
||||
client->createAncestors(zk_root_path);
|
||||
client->createIfNotExists(zk_root_path, "");
|
||||
}
|
||||
});
|
||||
|
||||
std::shared_ptr<zkutil::EphemeralNodeHolder> metadata_drop_lock;
|
||||
int32_t drop_lock_version = -1;
|
||||
for (size_t i = 0; i < 1000; ++i)
|
||||
{
|
||||
std::string stored_metadata_string;
|
||||
auto exists = client->tryGet(zk_metadata_path, stored_metadata_string);
|
||||
|
||||
if (exists)
|
||||
{
|
||||
// this requires same name for columns
|
||||
// maybe we can do a smarter comparison for columns and primary key expression
|
||||
if (stored_metadata_string != metadata_string)
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
|
||||
zk_root_path,
|
||||
stored_metadata_string);
|
||||
|
||||
auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent);
|
||||
|
||||
/// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica
|
||||
/// We still don't know if the table matches the expected metadata so table_is_valid is not changed
|
||||
/// It will be checked lazily on the first operation
|
||||
if (code == Coordination::Error::ZOK)
|
||||
return;
|
||||
|
||||
if (code != Coordination::Error::ZNONODE)
|
||||
throw zkutil::KeeperException(code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path);
|
||||
|
||||
/// ZNONODE means we dropped zk_tables_path but didn't finish drop completely
|
||||
}
|
||||
|
||||
if (client->exists(zk_dropped_path))
|
||||
{
|
||||
LOG_INFO(log, "Removing leftover nodes");
|
||||
auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
bool success = false;
|
||||
zk_retry.retryLoop(
|
||||
[&]
|
||||
{
|
||||
LOG_INFO(log, "Someone else removed leftover nodes");
|
||||
}
|
||||
else if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_INFO(log, "Someone else is removing leftover nodes");
|
||||
continue;
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
{
|
||||
throw Coordination::Exception::fromPath(code, zk_dropped_lock_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
|
||||
if (!dropTable(client, metadata_drop_lock))
|
||||
continue;
|
||||
}
|
||||
}
|
||||
auto client = getClient();
|
||||
std::string stored_metadata_string;
|
||||
auto exists = client->tryGet(zk_metadata_path, stored_metadata_string);
|
||||
|
||||
Coordination::Requests create_requests
|
||||
{
|
||||
zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent),
|
||||
};
|
||||
if (exists)
|
||||
{
|
||||
// this requires same name for columns
|
||||
// maybe we can do a smarter comparison for columns and primary key expression
|
||||
if (stored_metadata_string != metadata_string)
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
|
||||
zk_root_path,
|
||||
stored_metadata_string);
|
||||
|
||||
Coordination::Responses create_responses;
|
||||
auto code = client->tryMulti(create_requests, create_responses);
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_INFO(log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path);
|
||||
continue;
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
{
|
||||
zkutil::KeeperMultiException::check(code, create_requests, create_responses);
|
||||
}
|
||||
auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent);
|
||||
|
||||
/// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica
|
||||
/// We still don't know if the table matches the expected metadata so table_is_valid is not changed
|
||||
/// It will be checked lazily on the first operation
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
success = true;
|
||||
return;
|
||||
}
|
||||
|
||||
table_is_valid = true;
|
||||
/// we are the first table created for the specified Keeper path, i.e. we are the first replica
|
||||
return;
|
||||
/// We most likely created the path but got a timeout or disconnect
|
||||
if (code == Coordination::Error::ZNODEEXISTS && zk_retry.isRetry())
|
||||
{
|
||||
success = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (code != Coordination::Error::ZNONODE)
|
||||
throw zkutil::KeeperException(
|
||||
code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path);
|
||||
|
||||
/// ZNONODE means we dropped zk_tables_path but didn't finish drop completely
|
||||
}
|
||||
|
||||
if (client->exists(zk_dropped_path))
|
||||
{
|
||||
LOG_INFO(log, "Removing leftover nodes");
|
||||
|
||||
bool drop_finished = false;
|
||||
if (zk_retry.isRetry() && metadata_drop_lock != nullptr && drop_lock_version != -1)
|
||||
{
|
||||
/// if we have leftover lock from previous try, we need to recreate the ephemeral with our session
|
||||
Coordination::Requests drop_lock_requests{
|
||||
zkutil::makeRemoveRequest(zk_dropped_lock_path, drop_lock_version),
|
||||
zkutil::makeCreateRequest(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral),
|
||||
};
|
||||
|
||||
Coordination::Responses drop_lock_responses;
|
||||
auto lock_code = client->tryMulti(drop_lock_requests, drop_lock_responses);
|
||||
if (lock_code == Coordination::Error::ZBADVERSION)
|
||||
{
|
||||
LOG_INFO(log, "Someone else is removing leftover nodes");
|
||||
metadata_drop_lock->setAlreadyRemoved();
|
||||
metadata_drop_lock.reset();
|
||||
return;
|
||||
}
|
||||
|
||||
if (drop_lock_responses[0]->error == Coordination::Error::ZNONODE)
|
||||
{
|
||||
/// someone else removed metadata nodes or the previous ephemeral node expired
|
||||
/// we will try creating dropped lock again to make sure
|
||||
metadata_drop_lock->setAlreadyRemoved();
|
||||
metadata_drop_lock.reset();
|
||||
}
|
||||
else if (lock_code == Coordination::Error::ZOK)
|
||||
{
|
||||
metadata_drop_lock->setAlreadyRemoved();
|
||||
metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
|
||||
drop_lock_version = -1;
|
||||
Coordination::Stat lock_stat;
|
||||
client->get(zk_dropped_lock_path, &lock_stat);
|
||||
drop_lock_version = lock_stat.version;
|
||||
if (!dropTable(client, metadata_drop_lock))
|
||||
{
|
||||
metadata_drop_lock.reset();
|
||||
return;
|
||||
}
|
||||
drop_finished = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!drop_finished)
|
||||
{
|
||||
auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
{
|
||||
LOG_INFO(log, "Someone else removed leftover nodes");
|
||||
}
|
||||
else if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_INFO(log, "Someone else is removing leftover nodes");
|
||||
return;
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
{
|
||||
throw Coordination::Exception::fromPath(code, zk_dropped_lock_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
|
||||
drop_lock_version = -1;
|
||||
Coordination::Stat lock_stat;
|
||||
client->get(zk_dropped_lock_path, &lock_stat);
|
||||
drop_lock_version = lock_stat.version;
|
||||
if (!dropTable(client, metadata_drop_lock))
|
||||
{
|
||||
metadata_drop_lock.reset();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Coordination::Requests create_requests{
|
||||
zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent),
|
||||
};
|
||||
|
||||
Coordination::Responses create_responses;
|
||||
auto code = client->tryMulti(create_requests, create_responses);
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_INFO(
|
||||
log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path);
|
||||
return;
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
{
|
||||
zkutil::KeeperMultiException::check(code, create_requests, create_responses);
|
||||
}
|
||||
|
||||
table_status = TableStatus::VALID;
|
||||
/// we are the first table created for the specified Keeper path, i.e. we are the first replica
|
||||
success = true;
|
||||
});
|
||||
|
||||
if (success)
|
||||
return;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Cannot create metadata for table, because it is removed concurrently or because "
|
||||
"of wrong zk_root_path ({})", zk_root_path);
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Cannot create metadata for table, because it is removed concurrently or because "
|
||||
"of wrong zk_root_path ({})",
|
||||
zk_root_path);
|
||||
}
|
||||
|
||||
|
||||
@ -496,7 +606,7 @@ Pipe StorageKeeperMap::read(
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
checkTable<true>();
|
||||
checkTable<true>(context_);
|
||||
storage_snapshot->check(column_names);
|
||||
|
||||
FieldVectorPtr filtered_keys;
|
||||
@ -529,8 +639,8 @@ Pipe StorageKeeperMap::read(
|
||||
size_t num_keys = keys->size();
|
||||
size_t num_threads = std::min<size_t>(num_streams, keys->size());
|
||||
|
||||
assert(num_keys <= std::numeric_limits<uint32_t>::max());
|
||||
assert(num_threads <= std::numeric_limits<uint32_t>::max());
|
||||
chassert(num_keys <= std::numeric_limits<uint32_t>::max());
|
||||
chassert(num_threads <= std::numeric_limits<uint32_t>::max());
|
||||
|
||||
for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx)
|
||||
{
|
||||
@ -539,29 +649,59 @@ Pipe StorageKeeperMap::read(
|
||||
|
||||
using KeyContainer = typename KeyContainerPtr::element_type;
|
||||
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
|
||||
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column));
|
||||
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column, context_));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
};
|
||||
|
||||
auto client = getClient();
|
||||
if (all_scan)
|
||||
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(zk_data_path)));
|
||||
{
|
||||
const auto & settings = context_->getSettingsRef();
|
||||
ZooKeeperRetriesControl zk_retry{
|
||||
getName(),
|
||||
getLogger(getName()),
|
||||
ZooKeeperRetriesInfo{
|
||||
settings.keeper_max_retries,
|
||||
settings.keeper_retry_initial_backoff_ms,
|
||||
settings.keeper_retry_max_backoff_ms},
|
||||
context_->getProcessListElement()};
|
||||
|
||||
std::vector<std::string> children;
|
||||
zk_retry.retryLoop([&]
|
||||
{
|
||||
auto client = getClient();
|
||||
children = client->getChildren(zk_data_path);
|
||||
});
|
||||
return process_keys(std::make_shared<std::vector<std::string>>(std::move(children)));
|
||||
}
|
||||
|
||||
return process_keys(std::move(filtered_keys));
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
|
||||
{
|
||||
checkTable<true>();
|
||||
checkTable<true>(local_context);
|
||||
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot->getSampleBlock(), local_context);
|
||||
}
|
||||
|
||||
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
|
||||
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
|
||||
{
|
||||
checkTable<true>();
|
||||
auto client = getClient();
|
||||
client->tryRemoveChildrenRecursive(zk_data_path, true);
|
||||
checkTable<true>(local_context);
|
||||
const auto & settings = local_context->getSettingsRef();
|
||||
ZooKeeperRetriesControl zk_retry{
|
||||
getName(),
|
||||
getLogger(getName()),
|
||||
ZooKeeperRetriesInfo{
|
||||
settings.keeper_max_retries,
|
||||
settings.keeper_retry_initial_backoff_ms,
|
||||
settings.keeper_retry_max_backoff_ms},
|
||||
local_context->getProcessListElement()};
|
||||
|
||||
zk_retry.retryLoop([&]
|
||||
{
|
||||
auto client = getClient();
|
||||
client->tryRemoveChildrenRecursive(zk_data_path, true);
|
||||
});
|
||||
}
|
||||
|
||||
bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock)
|
||||
@ -605,7 +745,18 @@ bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::E
|
||||
|
||||
void StorageKeeperMap::drop()
|
||||
{
|
||||
checkTable<true>();
|
||||
auto current_table_status = getTableStatus(getContext());
|
||||
if (current_table_status == TableStatus::UNKNOWN)
|
||||
{
|
||||
static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated "
|
||||
"once a connection is established and metadata is verified";
|
||||
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
|
||||
}
|
||||
|
||||
/// if only column metadata is wrong we can still drop the table correctly
|
||||
if (current_table_status == TableStatus::INVALID_METADATA)
|
||||
return;
|
||||
|
||||
auto client = getClient();
|
||||
|
||||
// we allow ZNONODE in case we got hardware error on previous drop
|
||||
@ -966,78 +1117,91 @@ UInt64 StorageKeeperMap::keysLimit() const
|
||||
return keys_limit;
|
||||
}
|
||||
|
||||
std::optional<bool> StorageKeeperMap::isTableValid() const
|
||||
StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus(const ContextPtr & local_context) const
|
||||
{
|
||||
std::lock_guard lock{init_mutex};
|
||||
if (table_is_valid.has_value())
|
||||
return table_is_valid;
|
||||
if (table_status != TableStatus::UNKNOWN)
|
||||
return table_status;
|
||||
|
||||
[&]
|
||||
{
|
||||
try
|
||||
{
|
||||
auto client = getClient();
|
||||
const auto & settings = local_context->getSettingsRef();
|
||||
ZooKeeperRetriesControl zk_retry{
|
||||
getName(),
|
||||
getLogger(getName()),
|
||||
ZooKeeperRetriesInfo{
|
||||
settings.keeper_max_retries,
|
||||
settings.keeper_retry_initial_backoff_ms,
|
||||
settings.keeper_retry_max_backoff_ms},
|
||||
local_context->getProcessListElement()};
|
||||
|
||||
Coordination::Stat metadata_stat;
|
||||
auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat);
|
||||
|
||||
if (metadata_stat.numChildren == 0)
|
||||
zk_retry.retryLoop([&]
|
||||
{
|
||||
table_is_valid = false;
|
||||
return;
|
||||
}
|
||||
auto client = getClient();
|
||||
|
||||
if (metadata_string != stored_metadata_string)
|
||||
{
|
||||
LOG_ERROR(
|
||||
log,
|
||||
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
|
||||
zk_root_path,
|
||||
stored_metadata_string);
|
||||
table_is_valid = false;
|
||||
return;
|
||||
}
|
||||
Coordination::Stat metadata_stat;
|
||||
auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat);
|
||||
|
||||
// validate all metadata and data nodes are present
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1));
|
||||
if (metadata_stat.numChildren == 0)
|
||||
{
|
||||
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
|
||||
return;
|
||||
}
|
||||
|
||||
Coordination::Responses responses;
|
||||
client->tryMulti(requests, responses);
|
||||
if (metadata_string != stored_metadata_string)
|
||||
{
|
||||
LOG_ERROR(
|
||||
log,
|
||||
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
|
||||
zk_root_path,
|
||||
stored_metadata_string);
|
||||
table_status = TableStatus::INVALID_METADATA;
|
||||
return;
|
||||
}
|
||||
|
||||
table_is_valid = false;
|
||||
if (responses[0]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Table node ({}) is missing", zk_table_path);
|
||||
return;
|
||||
}
|
||||
// validate all metadata and data nodes are present
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1));
|
||||
|
||||
if (responses[1]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Data node ({}) is missing", zk_data_path);
|
||||
return;
|
||||
}
|
||||
Coordination::Responses responses;
|
||||
client->tryMulti(requests, responses);
|
||||
|
||||
if (responses[2]->error == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path);
|
||||
return;
|
||||
}
|
||||
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
|
||||
if (responses[0]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Table node ({}) is missing", zk_table_path);
|
||||
return;
|
||||
}
|
||||
|
||||
table_is_valid = true;
|
||||
if (responses[1]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Data node ({}) is missing", zk_data_path);
|
||||
return;
|
||||
}
|
||||
|
||||
if (responses[2]->error == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path);
|
||||
return;
|
||||
}
|
||||
|
||||
table_status = TableStatus::VALID;
|
||||
});
|
||||
}
|
||||
catch (const Coordination::Exception & e)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
|
||||
if (!Coordination::isHardwareError(e.code))
|
||||
table_is_valid = false;
|
||||
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
|
||||
}
|
||||
}();
|
||||
|
||||
return table_is_valid;
|
||||
return table_status;
|
||||
}
|
||||
|
||||
Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const
|
||||
@ -1050,10 +1214,11 @@ Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPOD
|
||||
if (raw_keys.size() != keys[0].column->size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());
|
||||
|
||||
return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false);
|
||||
return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false, getContext());
|
||||
}
|
||||
|
||||
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const
|
||||
Chunk StorageKeeperMap::getBySerializedKeys(
|
||||
const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version, const ContextPtr & local_context) const
|
||||
{
|
||||
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
|
||||
MutableColumns columns = sample_block.cloneEmptyColumns();
|
||||
@ -1070,17 +1235,27 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
|
||||
null_map->resize_fill(keys.size(), 1);
|
||||
}
|
||||
|
||||
auto client = getClient();
|
||||
|
||||
Strings full_key_paths;
|
||||
full_key_paths.reserve(keys.size());
|
||||
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
full_key_paths.emplace_back(fullPathForKey(key));
|
||||
}
|
||||
|
||||
auto values = client->tryGet(full_key_paths);
|
||||
const auto & settings = local_context->getSettingsRef();
|
||||
ZooKeeperRetriesControl zk_retry{
|
||||
getName(),
|
||||
getLogger(getName()),
|
||||
ZooKeeperRetriesInfo{
|
||||
settings.keeper_max_retries,
|
||||
settings.keeper_retry_initial_backoff_ms,
|
||||
settings.keeper_retry_max_backoff_ms},
|
||||
local_context->getProcessListElement()};
|
||||
|
||||
zkutil::ZooKeeper::MultiTryGetResponse values;
|
||||
zk_retry.retryLoop([&]{
|
||||
auto client = getClient();
|
||||
values = client->tryGet(full_key_paths);
|
||||
});
|
||||
|
||||
for (size_t i = 0; i < keys.size(); ++i)
|
||||
{
|
||||
@ -1153,14 +1328,14 @@ void StorageKeeperMap::checkMutationIsPossible(const MutationCommands & commands
|
||||
|
||||
void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr local_context)
|
||||
{
|
||||
checkTable<true>();
|
||||
checkTable<true>(local_context);
|
||||
|
||||
if (commands.empty())
|
||||
return;
|
||||
|
||||
bool strict = local_context->getSettingsRef().keeper_map_strict_mode;
|
||||
|
||||
assert(commands.size() == 1);
|
||||
chassert(commands.size() == 1);
|
||||
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
auto storage = getStorageID();
|
||||
@ -1168,16 +1343,16 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
|
||||
|
||||
if (commands.front().type == MutationCommand::Type::DELETE)
|
||||
{
|
||||
MutationsInterpreter::Settings settings(true);
|
||||
settings.return_all_columns = true;
|
||||
settings.return_mutated_rows = true;
|
||||
MutationsInterpreter::Settings mutation_settings(true);
|
||||
mutation_settings.return_all_columns = true;
|
||||
mutation_settings.return_mutated_rows = true;
|
||||
|
||||
auto interpreter = std::make_unique<MutationsInterpreter>(
|
||||
storage_ptr,
|
||||
metadata_snapshot,
|
||||
commands,
|
||||
local_context,
|
||||
settings);
|
||||
mutation_settings);
|
||||
|
||||
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
|
||||
PullingPipelineExecutor executor(pipeline);
|
||||
@ -1186,8 +1361,6 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
|
||||
auto primary_key_pos = header.getPositionByName(primary_key);
|
||||
auto version_position = header.getPositionByName(std::string{version_column_name});
|
||||
|
||||
auto client = getClient();
|
||||
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
{
|
||||
@ -1215,7 +1388,23 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
|
||||
}
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto status = client->tryMulti(delete_requests, responses, /* check_session_valid */ true);
|
||||
|
||||
const auto & settings = local_context->getSettingsRef();
|
||||
ZooKeeperRetriesControl zk_retry{
|
||||
getName(),
|
||||
getLogger(getName()),
|
||||
ZooKeeperRetriesInfo{
|
||||
settings.keeper_max_retries,
|
||||
settings.keeper_retry_initial_backoff_ms,
|
||||
settings.keeper_retry_max_backoff_ms},
|
||||
local_context->getProcessListElement()};
|
||||
|
||||
Coordination::Error status;
|
||||
zk_retry.retryLoop([&]
|
||||
{
|
||||
auto client = getClient();
|
||||
status = client->tryMulti(delete_requests, responses, /* check_session_valid */ true);
|
||||
});
|
||||
|
||||
if (status == Coordination::Error::ZOK)
|
||||
return;
|
||||
@ -1227,16 +1416,21 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
|
||||
|
||||
for (const auto & delete_request : delete_requests)
|
||||
{
|
||||
auto code = client->tryRemove(delete_request->getPath());
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
||||
throw zkutil::KeeperException::fromPath(code, delete_request->getPath());
|
||||
zk_retry.retryLoop([&]
|
||||
{
|
||||
auto client = getClient();
|
||||
status = client->tryRemove(delete_request->getPath());
|
||||
});
|
||||
|
||||
if (status != Coordination::Error::ZOK && status != Coordination::Error::ZNONODE)
|
||||
throw zkutil::KeeperException::fromPath(status, delete_request->getPath());
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
assert(commands.front().type == MutationCommand::Type::UPDATE);
|
||||
chassert(commands.front().type == MutationCommand::Type::UPDATE);
|
||||
if (commands.front().column_to_update_expression.contains(primary_key))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated (cannot update column {})", primary_key);
|
||||
|
||||
|
@ -54,7 +54,8 @@ public:
|
||||
Names getPrimaryKey() const override { return {primary_key}; }
|
||||
|
||||
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
|
||||
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const;
|
||||
Chunk getBySerializedKeys(
|
||||
std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version, const ContextPtr & local_context) const;
|
||||
|
||||
Block getSampleBlock(const Names &) const override;
|
||||
|
||||
@ -77,10 +78,10 @@ public:
|
||||
UInt64 keysLimit() const;
|
||||
|
||||
template <bool throw_on_error>
|
||||
void checkTable() const
|
||||
void checkTable(const ContextPtr & local_context) const
|
||||
{
|
||||
auto is_table_valid = isTableValid();
|
||||
if (!is_table_valid.has_value())
|
||||
auto current_table_status = getTableStatus(local_context);
|
||||
if (table_status == TableStatus::UNKNOWN)
|
||||
{
|
||||
static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated "
|
||||
"once a connection is established and metadata is verified";
|
||||
@ -93,10 +94,10 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
if (!*is_table_valid)
|
||||
if (current_table_status != TableStatus::VALID)
|
||||
{
|
||||
static constexpr auto error_msg
|
||||
= "Failed to activate table because of invalid metadata in ZooKeeper. Please DETACH table";
|
||||
= "Failed to activate table because of invalid metadata in ZooKeeper. Please DROP/DETACH table";
|
||||
if constexpr (throw_on_error)
|
||||
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
|
||||
else
|
||||
@ -110,7 +111,15 @@ public:
|
||||
private:
|
||||
bool dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock);
|
||||
|
||||
std::optional<bool> isTableValid() const;
|
||||
enum class TableStatus : uint8_t
|
||||
{
|
||||
UNKNOWN,
|
||||
INVALID_METADATA,
|
||||
INVALID_KEEPER_STRUCTURE,
|
||||
VALID
|
||||
};
|
||||
|
||||
TableStatus getTableStatus(const ContextPtr & context) const;
|
||||
|
||||
void restoreDataImpl(
|
||||
const BackupPtr & backup,
|
||||
@ -142,7 +151,8 @@ private:
|
||||
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
|
||||
|
||||
mutable std::mutex init_mutex;
|
||||
mutable std::optional<bool> table_is_valid;
|
||||
|
||||
mutable TableStatus table_status{TableStatus::UNKNOWN};
|
||||
|
||||
LoggerPtr log;
|
||||
};
|
||||
|
@ -2,7 +2,7 @@ version: '2.3'
|
||||
|
||||
services:
|
||||
minio1:
|
||||
image: minio/minio:RELEASE.2023-09-30T07-02-29Z
|
||||
image: minio/minio:RELEASE.2024-07-31T05-46-26Z
|
||||
volumes:
|
||||
- data1-1:/data1
|
||||
- ${MINIO_CERTS_DIR:-}:/certs
|
||||
|
14
tests/integration/test_keeper_map/configs/keeper_retries.xml
Normal file
14
tests/integration/test_keeper_map/configs/keeper_retries.xml
Normal file
@ -0,0 +1,14 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<insert_keeper_max_retries>0</insert_keeper_max_retries>
|
||||
<keeper_max_retries>0</keeper_max_retries>
|
||||
</default>
|
||||
</profiles>
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
</default>
|
||||
</users>
|
||||
</clickhouse>
|
@ -10,6 +10,7 @@ cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/enable_keeper_map.xml"],
|
||||
user_configs=["configs/keeper_retries.xml"],
|
||||
with_zookeeper=True,
|
||||
stay_alive=True,
|
||||
)
|
||||
@ -46,7 +47,10 @@ def assert_keeper_exception_after_partition(query):
|
||||
with PartitionManager() as pm:
|
||||
pm.drop_instance_zk_connections(node)
|
||||
try:
|
||||
error = node.query_and_get_error_with_retry(query, sleep_time=1)
|
||||
error = node.query_and_get_error_with_retry(
|
||||
query,
|
||||
sleep_time=1,
|
||||
)
|
||||
assert "Coordination::Exception" in error
|
||||
except:
|
||||
print_iptables_rules()
|
||||
@ -63,6 +67,7 @@ def run_query(query):
|
||||
|
||||
|
||||
def test_keeper_map_without_zk(started_cluster):
|
||||
run_query("DROP TABLE IF EXISTS test_keeper_map_without_zk SYNC")
|
||||
assert_keeper_exception_after_partition(
|
||||
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_without_zk') PRIMARY KEY(key);"
|
||||
)
|
||||
@ -84,7 +89,8 @@ def test_keeper_map_without_zk(started_cluster):
|
||||
node.restart_clickhouse(60)
|
||||
try:
|
||||
error = node.query_and_get_error_with_retry(
|
||||
"SELECT * FROM test_keeper_map_without_zk", sleep_time=1
|
||||
"SELECT * FROM test_keeper_map_without_zk",
|
||||
sleep_time=1,
|
||||
)
|
||||
assert "Failed to activate table because of connection issues" in error
|
||||
except:
|
||||
@ -101,12 +107,12 @@ def test_keeper_map_without_zk(started_cluster):
|
||||
)
|
||||
assert "Failed to activate table because of invalid metadata in ZooKeeper" in error
|
||||
|
||||
node.query("DETACH TABLE test_keeper_map_without_zk")
|
||||
|
||||
client.stop()
|
||||
|
||||
|
||||
def test_keeper_map_with_failed_drop(started_cluster):
|
||||
run_query("DROP TABLE IF EXISTS test_keeper_map_with_failed_drop SYNC")
|
||||
run_query("DROP TABLE IF EXISTS test_keeper_map_with_failed_drop_another SYNC")
|
||||
run_query(
|
||||
"CREATE TABLE test_keeper_map_with_failed_drop (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_with_failed_drop') PRIMARY KEY(key);"
|
||||
)
|
||||
|
@ -0,0 +1,3 @@
|
||||
<clickhouse>
|
||||
<keeper_map_path_prefix>/test_keeper_map</keeper_map_path_prefix>
|
||||
</clickhouse>
|
@ -0,0 +1,7 @@
|
||||
<clickhouse>
|
||||
<zookeeper>
|
||||
<enable_fault_injections_during_startup>1</enable_fault_injections_during_startup>
|
||||
<send_fault_probability>0.005</send_fault_probability>
|
||||
<recv_fault_probability>0.005</recv_fault_probability>
|
||||
</zookeeper>
|
||||
</clickhouse>
|
@ -0,0 +1,14 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<keeper_max_retries>20</keeper_max_retries>
|
||||
<keeper_retry_max_backoff_ms>10000</keeper_retry_max_backoff_ms>
|
||||
</default>
|
||||
</profiles>
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
</default>
|
||||
</users>
|
||||
</clickhouse>
|
75
tests/integration/test_keeper_map_retries/test.py
Normal file
75
tests/integration/test_keeper_map_retries/test.py
Normal file
@ -0,0 +1,75 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
import os
|
||||
|
||||
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/enable_keeper_map.xml"],
|
||||
user_configs=["configs/keeper_retries.xml"],
|
||||
with_zookeeper=True,
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
|
||||
def start_clean_clickhouse():
|
||||
# remove fault injection if present
|
||||
if "fault_injection.xml" in node.exec_in_container(
|
||||
["bash", "-c", "ls /etc/clickhouse-server/config.d"]
|
||||
):
|
||||
print("Removing fault injection")
|
||||
node.exec_in_container(
|
||||
["bash", "-c", "rm /etc/clickhouse-server/config.d/fault_injection.xml"]
|
||||
)
|
||||
node.restart_clickhouse()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def repeat_query(query, repeat):
|
||||
for _ in range(repeat):
|
||||
node.query(
|
||||
query,
|
||||
)
|
||||
|
||||
|
||||
def test_queries(started_cluster):
|
||||
start_clean_clickhouse()
|
||||
|
||||
node.query("DROP TABLE IF EXISTS keeper_map_retries SYNC")
|
||||
node.stop_clickhouse()
|
||||
node.copy_file_to_container(
|
||||
os.path.join(CONFIG_DIR, "fault_injection.xml"),
|
||||
"/etc/clickhouse-server/config.d/fault_injection.xml",
|
||||
)
|
||||
node.start_clickhouse()
|
||||
|
||||
repeat_count = 10
|
||||
|
||||
node.query(
|
||||
"CREATE TABLE keeper_map_retries (a UInt64, b UInt64) Engine=KeeperMap('/keeper_map_retries') PRIMARY KEY a",
|
||||
)
|
||||
|
||||
repeat_query(
|
||||
"INSERT INTO keeper_map_retries SELECT number, number FROM numbers(500)",
|
||||
repeat_count,
|
||||
)
|
||||
repeat_query("SELECT * FROM keeper_map_retries", repeat_count)
|
||||
repeat_query(
|
||||
"ALTER TABLE keeper_map_retries UPDATE b = 3 WHERE a > 2", repeat_count
|
||||
)
|
||||
repeat_query("ALTER TABLE keeper_map_retries DELETE WHERE a > 2", repeat_count)
|
||||
repeat_query("TRUNCATE keeper_map_retries", repeat_count)
|
@ -11,7 +11,12 @@ CREATE VIEW number_view as SELECT * FROM numbers(10) as tb;
|
||||
CREATE MATERIALIZED VIEW null_mv Engine = Log AS SELECT * FROM null_table LEFT JOIN number_view as tb USING number;
|
||||
CREATE TABLE null_table_buffer (number UInt64) ENGINE = Buffer(currentDatabase(), null_table, 1, 1, 1, 100, 200, 10000, 20000);
|
||||
INSERT INTO null_table_buffer VALUES (1);
|
||||
SELECT sleep(3) FORMAT Null;
|
||||
-- OPTIMIZE query should flush Buffer table, but still it is not guaranteed
|
||||
-- (see the comment StorageBuffer::optimize)
|
||||
-- But the combination of OPTIMIZE + sleep + OPTIMIZE should be enough.
|
||||
OPTIMIZE TABLE null_table_buffer;
|
||||
SELECT sleep(1) FORMAT Null;
|
||||
OPTIMIZE TABLE null_table_buffer;
|
||||
-- Insert about should've landed into `null_mv`
|
||||
SELECT count() FROM null_mv;
|
||||
1
|
||||
|
@ -13,7 +13,13 @@ CREATE MATERIALIZED VIEW null_mv Engine = Log AS SELECT * FROM null_table LEFT J
|
||||
|
||||
CREATE TABLE null_table_buffer (number UInt64) ENGINE = Buffer(currentDatabase(), null_table, 1, 1, 1, 100, 200, 10000, 20000);
|
||||
INSERT INTO null_table_buffer VALUES (1);
|
||||
SELECT sleep(3) FORMAT Null;
|
||||
|
||||
-- OPTIMIZE query should flush Buffer table, but still it is not guaranteed
|
||||
-- (see the comment StorageBuffer::optimize)
|
||||
-- But the combination of OPTIMIZE + sleep + OPTIMIZE should be enough.
|
||||
OPTIMIZE TABLE null_table_buffer;
|
||||
SELECT sleep(1) FORMAT Null;
|
||||
OPTIMIZE TABLE null_table_buffer;
|
||||
|
||||
-- Insert about should've landed into `null_mv`
|
||||
SELECT count() FROM null_mv;
|
||||
|
@ -13,20 +13,9 @@ $CLICKHOUSE_CLIENT -nm -q "
|
||||
CREATE TABLE $database_name.02911_backup_restore_keeper_map3 (key UInt64, value String) Engine=KeeperMap('/' || currentDatabase() || '/test02911_different') PRIMARY KEY key;
|
||||
"
|
||||
|
||||
# KeeperMap table engine doesn't have internal retries for interaction with Keeper. Do it on our own, otherwise tests with overloaded server can be flaky.
|
||||
while true
|
||||
do
|
||||
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000;
|
||||
" 2>&1 | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue
|
||||
break
|
||||
done
|
||||
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000;"
|
||||
|
||||
while true
|
||||
do
|
||||
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000;
|
||||
" 2>&1 | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue
|
||||
break
|
||||
done
|
||||
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000;"
|
||||
|
||||
backup_path="$database_name"
|
||||
for i in $(seq 1 3); do
|
||||
|
Loading…
Reference in New Issue
Block a user