Merge branch 'master' into tighten-limits-functional-tests

This commit is contained in:
Alexey Milovidov 2024-08-01 23:57:54 +02:00
commit a6339ac75e
17 changed files with 622 additions and 277 deletions

View File

@ -3494,18 +3494,22 @@ DDLWorker & Context::getDDLWorker() const
if (shared->ddl_worker_startup_task)
waitLoad(shared->ddl_worker_startup_task); // Just wait and do not prioritize, because it depends on all load and startup tasks
SharedLockGuard lock(shared->mutex);
if (!shared->ddl_worker)
{
if (!hasZooKeeper())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config");
if (!hasDistributedDDL())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no DistributedDDL configuration in server config");
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "DDL background thread is not initialized");
/// Only acquire the lock for reading ddl_worker field.
/// hasZooKeeper() and hasDistributedDDL() acquire the same lock as well and double acquisition of the lock in shared mode can lead
/// to a deadlock if an exclusive lock attempt is made in the meantime by another thread.
SharedLockGuard lock(shared->mutex);
if (shared->ddl_worker)
return *shared->ddl_worker;
}
return *shared->ddl_worker;
if (!hasZooKeeper())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config");
if (!hasDistributedDDL())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no DistributedDDL configuration in server config");
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "DDL background thread is not initialized");
}
zkutil::ZooKeeperPtr Context::getZooKeeper() const

View File

@ -43,6 +43,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/IBackupCoordination.h>
@ -78,6 +79,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int LIMIT_EXCEEDED;
extern const int CANNOT_RESTORE_TABLE;
extern const int INVALID_STATE;
}
namespace
@ -120,7 +122,7 @@ public:
: SinkToStorage(header), storage(storage_), context(std::move(context_))
{
auto primary_key = storage.getPrimaryKey();
assert(primary_key.size() == 1);
chassert(primary_key.size() == 1);
primary_key_pos = getHeader().getPositionByName(primary_key[0]);
}
@ -171,81 +173,94 @@ public:
template <bool for_update>
void finalize(bool strict)
{
auto zookeeper = storage.getClient();
const auto & settings = context->getSettingsRef();
auto keys_limit = storage.keysLimit();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{
settings.insert_keeper_max_retries,
settings.insert_keeper_retry_initial_backoff_ms,
settings.insert_keeper_retry_max_backoff_ms},
context->getProcessListElement()};
size_t current_keys_num = 0;
size_t new_keys_num = 0;
// We use keys limit as a soft limit so we ignore some cases when it can be still exceeded
// (e.g if parallel insert queries are being run)
if (keys_limit != 0)
zk_retry.retryLoop([&]()
{
Coordination::Stat data_stat;
zookeeper->get(storage.dataPath(), &data_stat);
current_keys_num = data_stat.numChildren;
}
auto zookeeper = storage.getClient();
auto keys_limit = storage.keysLimit();
std::vector<std::string> key_paths;
key_paths.reserve(new_values.size());
for (const auto & [key, _] : new_values)
key_paths.push_back(storage.fullPathForKey(key));
size_t current_keys_num = 0;
size_t new_keys_num = 0;
zkutil::ZooKeeper::MultiExistsResponse results;
if constexpr (!for_update)
{
if (!strict)
results = zookeeper->exists(key_paths);
}
Coordination::Requests requests;
requests.reserve(key_paths.size());
for (size_t i = 0; i < key_paths.size(); ++i)
{
auto key = fs::path(key_paths[i]).filename();
if constexpr (for_update)
// We use keys limit as a soft limit so we ignore some cases when it can be still exceeded
// (e.g if parallel insert queries are being run)
if (keys_limit != 0)
{
int32_t version = -1;
if (strict)
version = versions.at(key);
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version));
Coordination::Stat data_stat;
zookeeper->get(storage.dataPath(), &data_stat);
current_keys_num = data_stat.numChildren;
}
else
std::vector<std::string> key_paths;
key_paths.reserve(new_values.size());
for (const auto & [key, _] : new_values)
key_paths.push_back(storage.fullPathForKey(key));
zkutil::ZooKeeper::MultiExistsResponse results;
if constexpr (!for_update)
{
if (!strict && results[i].error == Coordination::Error::ZOK)
if (!strict)
results = zookeeper->exists(key_paths);
}
Coordination::Requests requests;
requests.reserve(key_paths.size());
for (size_t i = 0; i < key_paths.size(); ++i)
{
auto key = fs::path(key_paths[i]).filename();
if constexpr (for_update)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
int32_t version = -1;
if (strict)
version = versions.at(key);
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
if (!strict && results[i].error == Coordination::Error::ZOK)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
}
}
}
}
if (new_keys_num != 0)
{
auto will_be = current_keys_num + new_keys_num;
if (keys_limit != 0 && will_be > keys_limit)
throw Exception(
ErrorCodes::LIMIT_EXCEEDED,
"Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}",
new_keys_num,
keys_limit,
will_be);
}
if (new_keys_num != 0)
{
auto will_be = current_keys_num + new_keys_num;
if (keys_limit != 0 && will_be > keys_limit)
throw Exception(
ErrorCodes::LIMIT_EXCEEDED,
"Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}",
new_keys_num,
keys_limit,
will_be);
}
zookeeper->multi(requests, /* check_session_valid */ true);
zookeeper->multi(requests, /* check_session_valid */ true);
});
}
};
template <typename KeyContainer>
class StorageKeeperMapSource : public ISource
class StorageKeeperMapSource : public ISource, WithContext
{
const StorageKeeperMap & storage;
size_t max_block_size;
@ -276,8 +291,15 @@ public:
KeyContainerPtr container_,
KeyContainerIter begin_,
KeyContainerIter end_,
bool with_version_column_)
: ISource(getHeader(header, with_version_column_)), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
bool with_version_column_,
ContextPtr context_)
: ISource(getHeader(header, with_version_column_))
, WithContext(std::move(context_))
, storage(storage_)
, max_block_size(max_block_size_)
, container(std::move(container_))
, it(begin_)
, end(end_)
, with_version_column(with_version_column_)
{
}
@ -302,12 +324,12 @@ public:
for (auto & raw_key : raw_keys)
raw_key = base64Encode(raw_key, /* url_encoding */ true);
return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column);
return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column, getContext());
}
else
{
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column);
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column, getContext());
it += elem_num;
return chunk;
}
@ -386,104 +408,192 @@ StorageKeeperMap::StorageKeeperMap(
if (attach)
{
checkTable<false>();
checkTable<false>(context_);
return;
}
auto client = getClient();
const auto & settings = context_->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{settings.keeper_max_retries, settings.keeper_retry_initial_backoff_ms, settings.keeper_retry_max_backoff_ms},
context_->getProcessListElement()};
if (zk_root_path != "/" && !client->exists(zk_root_path))
{
LOG_TRACE(log, "Creating root path {}", zk_root_path);
client->createAncestors(zk_root_path);
client->createIfNotExists(zk_root_path, "");
}
zk_retry.retryLoop(
[&]
{
auto client = getClient();
if (zk_root_path != "/" && !client->exists(zk_root_path))
{
LOG_TRACE(log, "Creating root path {}", zk_root_path);
client->createAncestors(zk_root_path);
client->createIfNotExists(zk_root_path, "");
}
});
std::shared_ptr<zkutil::EphemeralNodeHolder> metadata_drop_lock;
int32_t drop_lock_version = -1;
for (size_t i = 0; i < 1000; ++i)
{
std::string stored_metadata_string;
auto exists = client->tryGet(zk_metadata_path, stored_metadata_string);
if (exists)
{
// this requires same name for columns
// maybe we can do a smarter comparison for columns and primary key expression
if (stored_metadata_string != metadata_string)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
zk_root_path,
stored_metadata_string);
auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent);
/// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica
/// We still don't know if the table matches the expected metadata so table_is_valid is not changed
/// It will be checked lazily on the first operation
if (code == Coordination::Error::ZOK)
return;
if (code != Coordination::Error::ZNONODE)
throw zkutil::KeeperException(code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path);
/// ZNONODE means we dropped zk_tables_path but didn't finish drop completely
}
if (client->exists(zk_dropped_path))
{
LOG_INFO(log, "Removing leftover nodes");
auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE)
bool success = false;
zk_retry.retryLoop(
[&]
{
LOG_INFO(log, "Someone else removed leftover nodes");
}
else if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Someone else is removing leftover nodes");
continue;
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception::fromPath(code, zk_dropped_lock_path);
}
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
if (!dropTable(client, metadata_drop_lock))
continue;
}
}
auto client = getClient();
std::string stored_metadata_string;
auto exists = client->tryGet(zk_metadata_path, stored_metadata_string);
Coordination::Requests create_requests
{
zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent),
};
if (exists)
{
// this requires same name for columns
// maybe we can do a smarter comparison for columns and primary key expression
if (stored_metadata_string != metadata_string)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
zk_root_path,
stored_metadata_string);
Coordination::Responses create_responses;
auto code = client->tryMulti(create_requests, create_responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path);
continue;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, create_requests, create_responses);
}
auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent);
/// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica
/// We still don't know if the table matches the expected metadata so table_is_valid is not changed
/// It will be checked lazily on the first operation
if (code == Coordination::Error::ZOK)
{
success = true;
return;
}
table_is_valid = true;
/// we are the first table created for the specified Keeper path, i.e. we are the first replica
return;
/// We most likely created the path but got a timeout or disconnect
if (code == Coordination::Error::ZNODEEXISTS && zk_retry.isRetry())
{
success = true;
return;
}
if (code != Coordination::Error::ZNONODE)
throw zkutil::KeeperException(
code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path);
/// ZNONODE means we dropped zk_tables_path but didn't finish drop completely
}
if (client->exists(zk_dropped_path))
{
LOG_INFO(log, "Removing leftover nodes");
bool drop_finished = false;
if (zk_retry.isRetry() && metadata_drop_lock != nullptr && drop_lock_version != -1)
{
/// if we have leftover lock from previous try, we need to recreate the ephemeral with our session
Coordination::Requests drop_lock_requests{
zkutil::makeRemoveRequest(zk_dropped_lock_path, drop_lock_version),
zkutil::makeCreateRequest(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral),
};
Coordination::Responses drop_lock_responses;
auto lock_code = client->tryMulti(drop_lock_requests, drop_lock_responses);
if (lock_code == Coordination::Error::ZBADVERSION)
{
LOG_INFO(log, "Someone else is removing leftover nodes");
metadata_drop_lock->setAlreadyRemoved();
metadata_drop_lock.reset();
return;
}
if (drop_lock_responses[0]->error == Coordination::Error::ZNONODE)
{
/// someone else removed metadata nodes or the previous ephemeral node expired
/// we will try creating dropped lock again to make sure
metadata_drop_lock->setAlreadyRemoved();
metadata_drop_lock.reset();
}
else if (lock_code == Coordination::Error::ZOK)
{
metadata_drop_lock->setAlreadyRemoved();
metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
drop_lock_version = -1;
Coordination::Stat lock_stat;
client->get(zk_dropped_lock_path, &lock_stat);
drop_lock_version = lock_stat.version;
if (!dropTable(client, metadata_drop_lock))
{
metadata_drop_lock.reset();
return;
}
drop_finished = true;
}
}
if (!drop_finished)
{
auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE)
{
LOG_INFO(log, "Someone else removed leftover nodes");
}
else if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Someone else is removing leftover nodes");
return;
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception::fromPath(code, zk_dropped_lock_path);
}
else
{
metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
drop_lock_version = -1;
Coordination::Stat lock_stat;
client->get(zk_dropped_lock_path, &lock_stat);
drop_lock_version = lock_stat.version;
if (!dropTable(client, metadata_drop_lock))
{
metadata_drop_lock.reset();
return;
}
}
}
}
Coordination::Requests create_requests{
zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent),
};
Coordination::Responses create_responses;
auto code = client->tryMulti(create_requests, create_responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(
log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path);
return;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, create_requests, create_responses);
}
table_status = TableStatus::VALID;
/// we are the first table created for the specified Keeper path, i.e. we are the first replica
success = true;
});
if (success)
return;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot create metadata for table, because it is removed concurrently or because "
"of wrong zk_root_path ({})", zk_root_path);
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot create metadata for table, because it is removed concurrently or because "
"of wrong zk_root_path ({})",
zk_root_path);
}
@ -496,7 +606,7 @@ Pipe StorageKeeperMap::read(
size_t max_block_size,
size_t num_streams)
{
checkTable<true>();
checkTable<true>(context_);
storage_snapshot->check(column_names);
FieldVectorPtr filtered_keys;
@ -529,8 +639,8 @@ Pipe StorageKeeperMap::read(
size_t num_keys = keys->size();
size_t num_threads = std::min<size_t>(num_streams, keys->size());
assert(num_keys <= std::numeric_limits<uint32_t>::max());
assert(num_threads <= std::numeric_limits<uint32_t>::max());
chassert(num_keys <= std::numeric_limits<uint32_t>::max());
chassert(num_threads <= std::numeric_limits<uint32_t>::max());
for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx)
{
@ -539,29 +649,59 @@ Pipe StorageKeeperMap::read(
using KeyContainer = typename KeyContainerPtr::element_type;
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column));
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column, context_));
}
return Pipe::unitePipes(std::move(pipes));
};
auto client = getClient();
if (all_scan)
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(zk_data_path)));
{
const auto & settings = context_->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms},
context_->getProcessListElement()};
std::vector<std::string> children;
zk_retry.retryLoop([&]
{
auto client = getClient();
children = client->getChildren(zk_data_path);
});
return process_keys(std::make_shared<std::vector<std::string>>(std::move(children)));
}
return process_keys(std::move(filtered_keys));
}
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
{
checkTable<true>();
checkTable<true>(local_context);
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot->getSampleBlock(), local_context);
}
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
checkTable<true>();
auto client = getClient();
client->tryRemoveChildrenRecursive(zk_data_path, true);
checkTable<true>(local_context);
const auto & settings = local_context->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms},
local_context->getProcessListElement()};
zk_retry.retryLoop([&]
{
auto client = getClient();
client->tryRemoveChildrenRecursive(zk_data_path, true);
});
}
bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock)
@ -605,7 +745,18 @@ bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::E
void StorageKeeperMap::drop()
{
checkTable<true>();
auto current_table_status = getTableStatus(getContext());
if (current_table_status == TableStatus::UNKNOWN)
{
static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated "
"once a connection is established and metadata is verified";
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
}
/// if only column metadata is wrong we can still drop the table correctly
if (current_table_status == TableStatus::INVALID_METADATA)
return;
auto client = getClient();
// we allow ZNONODE in case we got hardware error on previous drop
@ -966,78 +1117,91 @@ UInt64 StorageKeeperMap::keysLimit() const
return keys_limit;
}
std::optional<bool> StorageKeeperMap::isTableValid() const
StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus(const ContextPtr & local_context) const
{
std::lock_guard lock{init_mutex};
if (table_is_valid.has_value())
return table_is_valid;
if (table_status != TableStatus::UNKNOWN)
return table_status;
[&]
{
try
{
auto client = getClient();
const auto & settings = local_context->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms},
local_context->getProcessListElement()};
Coordination::Stat metadata_stat;
auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat);
if (metadata_stat.numChildren == 0)
zk_retry.retryLoop([&]
{
table_is_valid = false;
return;
}
auto client = getClient();
if (metadata_string != stored_metadata_string)
{
LOG_ERROR(
log,
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
zk_root_path,
stored_metadata_string);
table_is_valid = false;
return;
}
Coordination::Stat metadata_stat;
auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat);
// validate all metadata and data nodes are present
Coordination::Requests requests;
requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1));
if (metadata_stat.numChildren == 0)
{
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
return;
}
Coordination::Responses responses;
client->tryMulti(requests, responses);
if (metadata_string != stored_metadata_string)
{
LOG_ERROR(
log,
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
zk_root_path,
stored_metadata_string);
table_status = TableStatus::INVALID_METADATA;
return;
}
table_is_valid = false;
if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Table node ({}) is missing", zk_table_path);
return;
}
// validate all metadata and data nodes are present
Coordination::Requests requests;
requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1));
requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1));
if (responses[1]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Data node ({}) is missing", zk_data_path);
return;
}
Coordination::Responses responses;
client->tryMulti(requests, responses);
if (responses[2]->error == Coordination::Error::ZOK)
{
LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path);
return;
}
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Table node ({}) is missing", zk_table_path);
return;
}
table_is_valid = true;
if (responses[1]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Data node ({}) is missing", zk_data_path);
return;
}
if (responses[2]->error == Coordination::Error::ZOK)
{
LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path);
return;
}
table_status = TableStatus::VALID;
});
}
catch (const Coordination::Exception & e)
{
tryLogCurrentException(log);
if (!Coordination::isHardwareError(e.code))
table_is_valid = false;
table_status = TableStatus::INVALID_KEEPER_STRUCTURE;
}
}();
return table_is_valid;
return table_status;
}
Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const
@ -1050,10 +1214,11 @@ Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPOD
if (raw_keys.size() != keys[0].column->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());
return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false);
return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false, getContext());
}
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const
Chunk StorageKeeperMap::getBySerializedKeys(
const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version, const ContextPtr & local_context) const
{
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
MutableColumns columns = sample_block.cloneEmptyColumns();
@ -1070,17 +1235,27 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
null_map->resize_fill(keys.size(), 1);
}
auto client = getClient();
Strings full_key_paths;
full_key_paths.reserve(keys.size());
for (const auto & key : keys)
{
full_key_paths.emplace_back(fullPathForKey(key));
}
auto values = client->tryGet(full_key_paths);
const auto & settings = local_context->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms},
local_context->getProcessListElement()};
zkutil::ZooKeeper::MultiTryGetResponse values;
zk_retry.retryLoop([&]{
auto client = getClient();
values = client->tryGet(full_key_paths);
});
for (size_t i = 0; i < keys.size(); ++i)
{
@ -1153,14 +1328,14 @@ void StorageKeeperMap::checkMutationIsPossible(const MutationCommands & commands
void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr local_context)
{
checkTable<true>();
checkTable<true>(local_context);
if (commands.empty())
return;
bool strict = local_context->getSettingsRef().keeper_map_strict_mode;
assert(commands.size() == 1);
chassert(commands.size() == 1);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto storage = getStorageID();
@ -1168,16 +1343,16 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
if (commands.front().type == MutationCommand::Type::DELETE)
{
MutationsInterpreter::Settings settings(true);
settings.return_all_columns = true;
settings.return_mutated_rows = true;
MutationsInterpreter::Settings mutation_settings(true);
mutation_settings.return_all_columns = true;
mutation_settings.return_mutated_rows = true;
auto interpreter = std::make_unique<MutationsInterpreter>(
storage_ptr,
metadata_snapshot,
commands,
local_context,
settings);
mutation_settings);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
@ -1186,8 +1361,6 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto primary_key_pos = header.getPositionByName(primary_key);
auto version_position = header.getPositionByName(std::string{version_column_name});
auto client = getClient();
Block block;
while (executor.pull(block))
{
@ -1215,7 +1388,23 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
}
Coordination::Responses responses;
auto status = client->tryMulti(delete_requests, responses, /* check_session_valid */ true);
const auto & settings = local_context->getSettingsRef();
ZooKeeperRetriesControl zk_retry{
getName(),
getLogger(getName()),
ZooKeeperRetriesInfo{
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms},
local_context->getProcessListElement()};
Coordination::Error status;
zk_retry.retryLoop([&]
{
auto client = getClient();
status = client->tryMulti(delete_requests, responses, /* check_session_valid */ true);
});
if (status == Coordination::Error::ZOK)
return;
@ -1227,16 +1416,21 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
for (const auto & delete_request : delete_requests)
{
auto code = client->tryRemove(delete_request->getPath());
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw zkutil::KeeperException::fromPath(code, delete_request->getPath());
zk_retry.retryLoop([&]
{
auto client = getClient();
status = client->tryRemove(delete_request->getPath());
});
if (status != Coordination::Error::ZOK && status != Coordination::Error::ZNONODE)
throw zkutil::KeeperException::fromPath(status, delete_request->getPath());
}
}
return;
}
assert(commands.front().type == MutationCommand::Type::UPDATE);
chassert(commands.front().type == MutationCommand::Type::UPDATE);
if (commands.front().column_to_update_expression.contains(primary_key))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated (cannot update column {})", primary_key);

View File

@ -54,7 +54,8 @@ public:
Names getPrimaryKey() const override { return {primary_key}; }
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const;
Chunk getBySerializedKeys(
std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version, const ContextPtr & local_context) const;
Block getSampleBlock(const Names &) const override;
@ -77,10 +78,10 @@ public:
UInt64 keysLimit() const;
template <bool throw_on_error>
void checkTable() const
void checkTable(const ContextPtr & local_context) const
{
auto is_table_valid = isTableValid();
if (!is_table_valid.has_value())
auto current_table_status = getTableStatus(local_context);
if (table_status == TableStatus::UNKNOWN)
{
static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated "
"once a connection is established and metadata is verified";
@ -93,10 +94,10 @@ public:
}
}
if (!*is_table_valid)
if (current_table_status != TableStatus::VALID)
{
static constexpr auto error_msg
= "Failed to activate table because of invalid metadata in ZooKeeper. Please DETACH table";
= "Failed to activate table because of invalid metadata in ZooKeeper. Please DROP/DETACH table";
if constexpr (throw_on_error)
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
else
@ -110,7 +111,15 @@ public:
private:
bool dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock);
std::optional<bool> isTableValid() const;
enum class TableStatus : uint8_t
{
UNKNOWN,
INVALID_METADATA,
INVALID_KEEPER_STRUCTURE,
VALID
};
TableStatus getTableStatus(const ContextPtr & context) const;
void restoreDataImpl(
const BackupPtr & backup,
@ -142,7 +151,8 @@ private:
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
mutable std::mutex init_mutex;
mutable std::optional<bool> table_is_valid;
mutable TableStatus table_status{TableStatus::UNKNOWN};
LoggerPtr log;
};

View File

@ -2,7 +2,7 @@ version: '2.3'
services:
minio1:
image: minio/minio:RELEASE.2023-09-30T07-02-29Z
image: minio/minio:RELEASE.2024-07-31T05-46-26Z
volumes:
- data1-1:/data1
- ${MINIO_CERTS_DIR:-}:/certs

View File

@ -3922,7 +3922,11 @@ class ClickHouseInstance:
)
def contains_in_log(
self, substring, from_host=False, filename="clickhouse-server.log"
self,
substring,
from_host=False,
filename="clickhouse-server.log",
exclusion_substring="",
):
if from_host:
# We check fist file exists but want to look for all rotated logs as well
@ -3930,7 +3934,7 @@ class ClickHouseInstance:
[
"bash",
"-c",
f'[ -f {self.logs_dir}/{filename} ] && zgrep -aH "{substring}" {self.logs_dir}/{filename}* || true',
f'[ -f {self.logs_dir}/{filename} ] && zgrep -aH "{substring}" {self.logs_dir}/{filename}* | ( [ -z "{exclusion_substring}" ] && cat || grep -v "${exclusion_substring}" ) || true',
]
)
else:
@ -3938,7 +3942,7 @@ class ClickHouseInstance:
[
"bash",
"-c",
f'[ -f /var/log/clickhouse-server/{filename} ] && zgrep -aH "{substring}" /var/log/clickhouse-server/{filename} || true',
f'[ -f /var/log/clickhouse-server/{filename} ] && zgrep -aH "{substring}" /var/log/clickhouse-server/{filename} | ( [ -z "{exclusion_substring}" ] && cat || grep -v "${exclusion_substring}" ) || true',
]
)
return len(result) > 0

View File

@ -0,0 +1,14 @@
<clickhouse>
<profiles>
<default>
<insert_keeper_max_retries>0</insert_keeper_max_retries>
<keeper_max_retries>0</keeper_max_retries>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -10,6 +10,7 @@ cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/enable_keeper_map.xml"],
user_configs=["configs/keeper_retries.xml"],
with_zookeeper=True,
stay_alive=True,
)
@ -46,7 +47,10 @@ def assert_keeper_exception_after_partition(query):
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
try:
error = node.query_and_get_error_with_retry(query, sleep_time=1)
error = node.query_and_get_error_with_retry(
query,
sleep_time=1,
)
assert "Coordination::Exception" in error
except:
print_iptables_rules()
@ -63,6 +67,7 @@ def run_query(query):
def test_keeper_map_without_zk(started_cluster):
run_query("DROP TABLE IF EXISTS test_keeper_map_without_zk SYNC")
assert_keeper_exception_after_partition(
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_without_zk') PRIMARY KEY(key);"
)
@ -84,7 +89,8 @@ def test_keeper_map_without_zk(started_cluster):
node.restart_clickhouse(60)
try:
error = node.query_and_get_error_with_retry(
"SELECT * FROM test_keeper_map_without_zk", sleep_time=1
"SELECT * FROM test_keeper_map_without_zk",
sleep_time=1,
)
assert "Failed to activate table because of connection issues" in error
except:
@ -101,12 +107,12 @@ def test_keeper_map_without_zk(started_cluster):
)
assert "Failed to activate table because of invalid metadata in ZooKeeper" in error
node.query("DETACH TABLE test_keeper_map_without_zk")
client.stop()
def test_keeper_map_with_failed_drop(started_cluster):
run_query("DROP TABLE IF EXISTS test_keeper_map_with_failed_drop SYNC")
run_query("DROP TABLE IF EXISTS test_keeper_map_with_failed_drop_another SYNC")
run_query(
"CREATE TABLE test_keeper_map_with_failed_drop (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_with_failed_drop') PRIMARY KEY(key);"
)

View File

@ -0,0 +1,3 @@
<clickhouse>
<keeper_map_path_prefix>/test_keeper_map</keeper_map_path_prefix>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<zookeeper>
<enable_fault_injections_during_startup>1</enable_fault_injections_during_startup>
<send_fault_probability>0.005</send_fault_probability>
<recv_fault_probability>0.005</recv_fault_probability>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,14 @@
<clickhouse>
<profiles>
<default>
<keeper_max_retries>20</keeper_max_retries>
<keeper_retry_max_backoff_ms>10000</keeper_retry_max_backoff_ms>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,75 @@
import pytest
from helpers.cluster import ClickHouseCluster
import os
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/enable_keeper_map.xml"],
user_configs=["configs/keeper_retries.xml"],
with_zookeeper=True,
stay_alive=True,
)
def start_clean_clickhouse():
# remove fault injection if present
if "fault_injection.xml" in node.exec_in_container(
["bash", "-c", "ls /etc/clickhouse-server/config.d"]
):
print("Removing fault injection")
node.exec_in_container(
["bash", "-c", "rm /etc/clickhouse-server/config.d/fault_injection.xml"]
)
node.restart_clickhouse()
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def repeat_query(query, repeat):
for _ in range(repeat):
node.query(
query,
)
def test_queries(started_cluster):
start_clean_clickhouse()
node.query("DROP TABLE IF EXISTS keeper_map_retries SYNC")
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "fault_injection.xml"),
"/etc/clickhouse-server/config.d/fault_injection.xml",
)
node.start_clickhouse()
repeat_count = 10
node.query(
"CREATE TABLE keeper_map_retries (a UInt64, b UInt64) Engine=KeeperMap('/keeper_map_retries') PRIMARY KEY a",
)
repeat_query(
"INSERT INTO keeper_map_retries SELECT number, number FROM numbers(500)",
repeat_count,
)
repeat_query("SELECT * FROM keeper_map_retries", repeat_count)
repeat_query(
"ALTER TABLE keeper_map_retries UPDATE b = 3 WHERE a > 2", repeat_count
)
repeat_query("ALTER TABLE keeper_map_retries DELETE WHERE a > 2", repeat_count)
repeat_query("TRUNCATE keeper_map_retries", repeat_count)

View File

@ -13,6 +13,7 @@ node = cluster.add_instance(
with_zookeeper=True,
with_azurite=True,
)
base_search_query = "SELECT COUNT() FROM system.query_log WHERE query LIKE "
@pytest.fixture(scope="module", autouse=True)
@ -35,7 +36,7 @@ def check_logs(must_contain=[], must_not_contain=[]):
.replace("]", "\\]")
.replace("*", "\\*")
)
assert node.contains_in_log(escaped_str)
assert node.contains_in_log(escaped_str, exclusion_substring=base_search_query)
for str in must_not_contain:
escaped_str = (
@ -44,7 +45,9 @@ def check_logs(must_contain=[], must_not_contain=[]):
.replace("]", "\\]")
.replace("*", "\\*")
)
assert not node.contains_in_log(escaped_str)
assert not node.contains_in_log(
escaped_str, exclusion_substring=base_search_query
)
for str in must_contain:
escaped_str = str.replace("'", "\\'")
@ -60,7 +63,7 @@ def system_query_log_contains_search_pattern(search_pattern):
return (
int(
node.query(
f"SELECT COUNT() FROM system.query_log WHERE query LIKE '%{search_pattern}%'"
f"{base_search_query}'%{search_pattern}%' AND query NOT LIKE '{base_search_query}%'"
).strip()
)
>= 1
@ -105,7 +108,6 @@ def test_create_alter_user():
must_not_contain=[
password,
"IDENTIFIED BY",
"IDENTIFIED BY",
"IDENTIFIED WITH plaintext_password BY",
],
)
@ -366,10 +368,7 @@ def test_table_functions():
f"remoteSecure(named_collection_6, addresses_expr = '127.{{2..11}}', database = 'default', table = 'remote_table', user = 'remote_user', password = '{password}')",
f"s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
f"s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '{password}')",
(
f"deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
"DNS_ERROR",
),
f"deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
f"azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
f"azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
f"azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",

View File

@ -1,6 +1,7 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tags: no-fasttest, no-parallel
# Tag no-fasttest: needs pv
# Tag no-parallel: reads from a system table
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -12,9 +13,23 @@ ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS t; CREATE TABLE t (x UInt64)
seq 1 1000 | pv --quiet --rate-limit 400 | ${CLICKHOUSE_CLIENT} --query "INSERT INTO t FORMAT TSV"
# We check that the value of NetworkReceiveElapsedMicroseconds correctly includes the time spent waiting data from the client.
${CLICKHOUSE_CLIENT} --query "SYSTEM FLUSH LOGS;
WITH ProfileEvents['NetworkReceiveElapsedMicroseconds'] AS time
SELECT time >= 1000000 ? 1 : time FROM system.query_log
WHERE current_database = currentDatabase() AND query_kind = 'Insert' AND event_date >= yesterday() AND type = 2 ORDER BY event_time DESC LIMIT 1;"
result=$(${CLICKHOUSE_CLIENT} --query "SYSTEM FLUSH LOGS;
WITH ProfileEvents['NetworkReceiveElapsedMicroseconds'] AS elapsed_us
SELECT elapsed_us FROM system.query_log
WHERE current_database = currentDatabase() AND query_kind = 'Insert' AND event_date >= yesterday() AND type = 'QueryFinish'
ORDER BY event_time DESC LIMIT 1;")
elapsed_us=$(echo $result | sed 's/ .*//')
min_elapsed_us=1000000
if [[ "$elapsed_us" -ge "$min_elapsed_us" ]]; then
echo 1
else
# Print debug info
${CLICKHOUSE_CLIENT} --query "
WITH ProfileEvents['NetworkReceiveElapsedMicroseconds'] AS elapsed_us
SELECT query_start_time_microseconds, event_time_microseconds, query_duration_ms, elapsed_us, query FROM system.query_log
WHERE current_database = currentDatabase() and event_date >= yesterday() AND type = 'QueryFinish' ORDER BY query_start_time;"
fi
${CLICKHOUSE_CLIENT} --query "DROP TABLE t"

View File

@ -11,7 +11,12 @@ CREATE VIEW number_view as SELECT * FROM numbers(10) as tb;
CREATE MATERIALIZED VIEW null_mv Engine = Log AS SELECT * FROM null_table LEFT JOIN number_view as tb USING number;
CREATE TABLE null_table_buffer (number UInt64) ENGINE = Buffer(currentDatabase(), null_table, 1, 1, 1, 100, 200, 10000, 20000);
INSERT INTO null_table_buffer VALUES (1);
SELECT sleep(3) FORMAT Null;
-- OPTIMIZE query should flush Buffer table, but still it is not guaranteed
-- (see the comment StorageBuffer::optimize)
-- But the combination of OPTIMIZE + sleep + OPTIMIZE should be enough.
OPTIMIZE TABLE null_table_buffer;
SELECT sleep(1) FORMAT Null;
OPTIMIZE TABLE null_table_buffer;
-- Insert about should've landed into `null_mv`
SELECT count() FROM null_mv;
1

View File

@ -13,7 +13,13 @@ CREATE MATERIALIZED VIEW null_mv Engine = Log AS SELECT * FROM null_table LEFT J
CREATE TABLE null_table_buffer (number UInt64) ENGINE = Buffer(currentDatabase(), null_table, 1, 1, 1, 100, 200, 10000, 20000);
INSERT INTO null_table_buffer VALUES (1);
SELECT sleep(3) FORMAT Null;
-- OPTIMIZE query should flush Buffer table, but still it is not guaranteed
-- (see the comment StorageBuffer::optimize)
-- But the combination of OPTIMIZE + sleep + OPTIMIZE should be enough.
OPTIMIZE TABLE null_table_buffer;
SELECT sleep(1) FORMAT Null;
OPTIMIZE TABLE null_table_buffer;
-- Insert about should've landed into `null_mv`
SELECT count() FROM null_mv;

View File

@ -13,20 +13,9 @@ $CLICKHOUSE_CLIENT -nm -q "
CREATE TABLE $database_name.02911_backup_restore_keeper_map3 (key UInt64, value String) Engine=KeeperMap('/' || currentDatabase() || '/test02911_different') PRIMARY KEY key;
"
# KeeperMap table engine doesn't have internal retries for interaction with Keeper. Do it on our own, otherwise tests with overloaded server can be flaky.
while true
do
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000;
" 2>&1 | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue
break
done
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000;"
while true
do
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000;
" 2>&1 | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue
break
done
$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000;"
backup_path="$database_name"
for i in $(seq 1 3); do