Merge pull request #43116 from ClickHouse/revert-42607-igor/insert_zk_retries_retry

Revert " Keeper retries during insert (clean)"
This commit is contained in:
Alexander Tokmakov 2022-11-10 14:12:58 +03:00 committed by GitHub
commit 64e6a00f94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 250 additions and 1451 deletions

View File

@ -34,7 +34,7 @@ using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
class TestKeeper final : public IKeeper
{
public:
explicit TestKeeper(const zkutil::ZooKeeperArgs & args_);
TestKeeper(const zkutil::ZooKeeperArgs & args_);
~TestKeeper() override;
bool isExpired() const override { return expired; }

View File

@ -156,7 +156,7 @@ public:
using Ptr = std::shared_ptr<ZooKeeper>;
using ErrorsList = std::initializer_list<Coordination::Error>;
explicit ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/** Config of the form:
<zookeeper>

View File

@ -658,11 +658,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, optimize_sorting_by_input_stream_properties, true, "Optimize sorting by sorting properties of input stream", 0) \
M(UInt64, insert_keeper_max_retries, 0, "Max retries for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_initial_backoff_ms, 100, "Initial backoff timeout for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_max_backoff_ms, 10000, "Max backoff timeout for keeper operations during insert", 0) \
M(Float, insert_keeper_fault_injection_probability, 0.0f, "Approximate probability of failure for a keeper request during insert. Valid value is in interval [0.0f, 1.0f]", 0) \
M(UInt64, insert_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

View File

@ -2,7 +2,6 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/logger_useful.h>
#include <base/types.h>
#include <Storages/MergeTree/ZooKeeperWithFaultInjection.h>
namespace DB
@ -13,22 +12,22 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_)
: zookeeper(zookeeper_), path_prefix(path_prefix_), path(path_)
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, zkutil::ZooKeeper & zookeeper_, const String & path_)
: zookeeper(&zookeeper_), path_prefix(path_prefix_), path(path_)
{
if (path.size() <= path_prefix.size())
throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
}
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path)
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, const String & deduplication_path)
{
String path;
if (deduplication_path.empty())
{
String holder_path = temp_path + "/" + EphemeralLockInZooKeeper::LEGACY_LOCK_OTHER;
path = zookeeper_->create(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential);
path = zookeeper_.create(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential);
}
else
{
@ -40,15 +39,11 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
ops.emplace_back(zkutil::makeCreateRequest(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential));
Coordination::Responses responses;
Coordination::Error e = zookeeper_->tryMulti(ops, responses);
Coordination::Error e = zookeeper_.tryMulti(ops, responses);
if (e != Coordination::Error::ZOK)
{
if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
"Deduplication path already exists: deduplication_path={}",
deduplication_path);
return {};
}
else
@ -87,31 +82,9 @@ EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper()
{
unlock();
}
catch (const zkutil::KeeperException & e)
{
if (Coordination::isHardwareError(e.code))
LOG_DEBUG(
&Poco::Logger::get("EphemeralLockInZooKeeper"),
"ZooKeeper communication error during unlock: code={} message='{}'",
e.code,
e.message());
else if (e.code == Coordination::Error::ZNONODE)
/// To avoid additional round-trip for unlocking,
/// ephemeral node can be deleted explicitly as part of another multi op request to ZK
/// and marked as such via assumeUnlocked() if we got successful response.
/// But it's possible that the multi op request can be executed on server side, and client will not get response due to network issue.
/// In such case, assumeUnlocked() will not be called, so we'll get ZNONODE error here since the noded is already deleted
LOG_DEBUG(
&Poco::Logger::get("EphemeralLockInZooKeeper"),
"ZooKeeper node was already deleted: code={} message={}",
e.code,
e.message());
else
tryLogCurrentException("EphemeralLockInZooKeeper");
}
catch (...)
{
tryLogCurrentException("EphemeralLockInZooKeeper");
tryLogCurrentException("~EphemeralLockInZooKeeper");
}
}

View File

@ -12,8 +12,6 @@
namespace DB
{
class ZooKeeperWithFaultInjection;
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr<ZooKeeperWithFaultInjection>;
namespace ErrorCodes
{
@ -27,14 +25,13 @@ namespace ErrorCodes
class EphemeralLockInZooKeeper : public boost::noncopyable
{
friend std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, const String & deduplication_path);
protected:
EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_);
EphemeralLockInZooKeeper() = delete;
EphemeralLockInZooKeeper(const String & path_prefix_, zkutil::ZooKeeper & zookeeper_, const String & path_);
public:
EphemeralLockInZooKeeper() = delete;
/// Fake "secondary node" names for blocks with and without "deduplication_path"
static constexpr const char * LEGACY_LOCK_INSERT = "abandonable_lock-insert";
static constexpr const char * LEGACY_LOCK_OTHER = "abandonable_lock-other";
@ -56,7 +53,7 @@ public:
bool isLocked() const
{
return zookeeper.get();
return zookeeper;
}
String getPath() const
@ -94,13 +91,13 @@ public:
~EphemeralLockInZooKeeper();
private:
ZooKeeperWithFaultInjectionPtr zookeeper;
zkutil::ZooKeeper * zookeeper = nullptr;
String path_prefix;
String path;
};
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, const String & deduplication_path);
/// Acquires block number locks in all partitions.

View File

@ -112,8 +112,6 @@ void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
&& now - last_refresh_time < REFRESH_STATE_MINIMUM_INTERVAL_SECONDS)
return;
LOG_DEBUG(storage.log, "Updating strategy picker state");
auto zookeeper = storage.getZooKeeper();
auto all_replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
@ -156,8 +154,6 @@ void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
last_refresh_time = now;
current_replica_index = current_replica_index_tmp;
active_replicas = active_replicas_tmp;
LOG_DEBUG(storage.log, "Strategy picker state updated, current replica: {}, active replicas: [{}]", current_replica_index, fmt::join(active_replicas, ", "));
}

View File

@ -9,6 +9,7 @@
#include <Core/Block.h>
#include <IO/Operators.h>
namespace ProfileEvents
{
extern const Event DuplicatedInsertedBlocks;
@ -31,7 +32,6 @@ namespace ErrorCodes
extern const int DUPLICATE_DATA_PART;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int LOGICAL_ERROR;
extern const int TABLE_IS_READ_ONLY;
extern const int QUERY_WAS_CANCELLED;
}
@ -84,7 +84,7 @@ ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
ReplicatedMergeTreeSink::~ReplicatedMergeTreeSink() = default;
/// Allow to verify that the session in ZooKeeper is still alive.
static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper)
static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
{
if (!zookeeper)
throw Exception("No ZooKeeper session.", ErrorCodes::NO_ZOOKEEPER);
@ -93,7 +93,7 @@ static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper)
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
}
size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper)
size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper)
{
if (!isQuorumEnabled())
return 0;
@ -103,7 +103,6 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(const ZooKeeperWithFault
Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas");
Strings exists_paths;
exists_paths.reserve(replicas.size());
for (const auto & replica : replicas)
if (replica != storage.replica_name)
exists_paths.emplace_back(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active");
@ -111,28 +110,20 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(const ZooKeeperWithFault
auto exists_result = zookeeper->exists(exists_paths);
auto get_results = zookeeper->get(Strings{storage.replica_path + "/is_active", storage.replica_path + "/host"});
Coordination::Error keeper_error = Coordination::Error::ZOK;
size_t active_replicas = 1; /// Assume current replica is active (will check below)
for (size_t i = 0; i < exists_paths.size(); ++i)
{
auto error = exists_result[i].error;
if (error == Coordination::Error::ZOK)
auto status = exists_result[i];
if (status.error == Coordination::Error::ZOK)
++active_replicas;
else if (Coordination::isHardwareError(error))
keeper_error = error;
}
size_t replicas_number = replicas.size();
size_t quorum_size = getQuorumSize(replicas_number);
if (active_replicas < quorum_size)
{
if (Coordination::isHardwareError(keeper_error))
throw Coordination::Exception("Failed to check number of alive replicas", keeper_error);
throw Exception(ErrorCodes::TOO_FEW_LIVE_REPLICAS, "Number of alive replicas ({}) is less than requested quorum ({}/{}).",
active_replicas, quorum_size, replicas_number);
}
/** Is there a quorum for the last part for which a quorum is needed?
* Write of all the parts with the included quorum is linearly ordered.
@ -165,34 +156,15 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
const auto & settings = context->getSettingsRef();
zookeeper_retries_info = ZooKeeperRetriesInfo(
"ReplicatedMergeTreeSink::consume",
settings.insert_keeper_max_retries ? log : nullptr,
settings.insert_keeper_max_retries,
settings.insert_keeper_retry_initial_backoff_ms,
settings.insert_keeper_retry_max_backoff_ms);
ZooKeeperWithFaultInjectionPtr zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.insert_keeper_fault_injection_probability,
settings.insert_keeper_fault_injection_seed,
storage.getZooKeeper(),
"ReplicatedMergeTreeSink::consume",
log);
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
/** If write is with quorum, then we check that the required number of replicas is now live,
* and also that for all previous parts for which quorum is required, this quorum is reached.
* And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node).
* TODO Too complex logic, you can do better.
*/
size_t replicas_num = 0;
ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info);
quorum_retries_ctl.retryLoop(
[&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
replicas_num = checkQuorumPrecondition(zookeeper);
});
size_t replicas_num = checkQuorumPrecondition(zookeeper);
if (!storage_snapshot->object_columns.empty())
convertDynamicColumnsToTuples(block, storage_snapshot);
@ -204,6 +176,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
size_t streams = 0;
bool support_parallel_write = false;
const Settings & settings = context->getSettingsRef();
for (auto & current_block : part_blocks)
{
@ -283,7 +256,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
finishDelayedChunk(zookeeper);
}
void ReplicatedMergeTreeSink::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
void ReplicatedMergeTreeSink::finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper)
{
if (!delayed_chunk)
return;
@ -298,7 +271,7 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(const ZooKeeperWithFaultInjecti
try
{
commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num, false);
commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num);
last_block_is_duplicate = last_block_is_duplicate || part->is_duplicate;
@ -321,9 +294,8 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
{
/// NOTE: No delay in this case. That's Ok.
auto origin_zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(origin_zookeeper);
auto zookeeper = std::make_shared<ZooKeeperWithFaultInjection>(origin_zookeeper);
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
size_t replicas_num = checkQuorumPrecondition(zookeeper);
@ -332,7 +304,7 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
try
{
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
commitPart(zookeeper, part, "", replicas_num, true);
commitPart(zookeeper, part, "", replicas_num);
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
}
catch (...)
@ -343,11 +315,10 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
}
void ReplicatedMergeTreeSink::commitPart(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
zkutil::ZooKeeperPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part,
const String & block_id,
size_t replicas_num,
bool writing_existing_part)
size_t replicas_num)
{
/// It is possible that we alter a part with different types of source columns.
/// In this case, if column was not altered, the result type will be different with what we have in metadata.
@ -355,6 +326,8 @@ void ReplicatedMergeTreeSink::commitPart(
///
/// metadata_snapshot->check(part->getColumns());
assertSessionIsNotExpired(zookeeper);
String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
/// There is one case when we need to retry transaction in a loop.
@ -364,65 +337,14 @@ void ReplicatedMergeTreeSink::commitPart(
bool is_already_existing_part = false;
/// for retries due to keeper error
bool part_committed_locally_but_zookeeper = false;
Coordination::Error write_part_info_keeper_error = Coordination::Error::ZOK;
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info);
retries_ctl.retryLoop([&]()
while (true)
{
zookeeper->setKeeper(storage.getZooKeeper());
if (storage.is_readonly)
{
/// stop retries if in shutdown
if (storage.shutdown_called)
throw Exception(
ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to shutdown: replica_path={}", storage.replica_path);
/// When we attach existing parts it's okay to be in read-only mode
/// For example during RESTORE REPLICA.
if (!writing_existing_part)
{
retries_ctl.setUserError(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode: replica_path={}", storage.replica_path);
return;
}
}
if (retries_ctl.isRetry())
{
/// If we are retrying, check if last iteration was actually successful,
/// we could get network error on committing part to zk
/// but the operation could be completed by zk server
/// If this flag is true, then part is in Active state, and we'll not retry anymore
/// we only check if part was committed to zk and return success or failure correspondingly
/// Note: if commit to zk failed then cleanup thread will mark the part as Outdated later
if (part_committed_locally_but_zookeeper)
{
/// check that info about the part was actually written in zk
if (zookeeper->exists(fs::path(storage.replica_path) / "parts" / part->name))
{
LOG_DEBUG(log, "Part was successfully committed on previous iteration: part_id={}", part->name);
}
else
{
retries_ctl.setUserError(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Insert failed due to zookeeper error. Please retry. Reason: {}",
Coordination::errorMessage(write_part_info_keeper_error));
}
retries_ctl.stopRetries();
return;
}
}
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
/// Allocate new block number and check for duplicates
const bool deduplicate_block = !block_id.empty();
bool deduplicate_block = !block_id.empty();
String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "";
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
ThreadFuzzer::maybeInjectSleep();
@ -546,13 +468,7 @@ void ReplicatedMergeTreeSink::commitPart(
else
quorum_path = storage.zookeeper_path + "/quorum/status";
if (!retries_ctl.callAndCatchAll(
[&]()
{
waitForQuorum(
zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_version, replicas_num);
}))
return;
waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_version, replicas_num);
}
else
{
@ -561,7 +477,6 @@ void ReplicatedMergeTreeSink::commitPart(
return;
}
LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.",
block_id, existing_part_name);
@ -593,7 +508,8 @@ void ReplicatedMergeTreeSink::commitPart(
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::DUPLICATE_DATA_PART && e.code() != ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
if (e.code() != ErrorCodes::DUPLICATE_DATA_PART
&& e.code() != ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
throw;
}
@ -610,26 +526,15 @@ void ReplicatedMergeTreeSink::commitPart(
part->name);
}
try
{
ThreadFuzzer::maybeInjectSleep();
storage.lockSharedData(*part, zookeeper, false, {});
ThreadFuzzer::maybeInjectSleep();
}
catch (const Exception &)
{
transaction.rollbackPartsToTemporaryState();
ThreadFuzzer::maybeInjectSleep();
part->is_temp = true;
part->renameTo(temporary_part_relative_path, false);
throw;
}
storage.lockSharedData(*part, false, {});
ThreadFuzzer::maybeInjectSleep();
Coordination::Responses responses;
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
if (multi_code == Coordination::Error::ZOK)
{
transaction.commit();
@ -644,32 +549,18 @@ void ReplicatedMergeTreeSink::commitPart(
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED,
"Insert query (for block {}) was cancelled by concurrent ALTER PARTITION", block_number_lock->getPath());
}
else if (Coordination::isHardwareError(multi_code))
else if (multi_code == Coordination::Error::ZCONNECTIONLOSS
|| multi_code == Coordination::Error::ZOPERATIONTIMEOUT)
{
write_part_info_keeper_error = multi_code;
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
transaction.commit();
/// Setting this flag is point of no return
/// On next retry, we'll just check if actually operation succeed or failed
/// and return ok or error correspondingly
part_committed_locally_but_zookeeper = true;
/// if all retries will be exhausted by accessing zookeeper on fresh retry -> we'll add committed part to queue in the action
/// here lambda capture part name, it's ok since we'll not generate new one for this insert,
/// see comments around 'part_committed_locally_but_zookeeper' flag
retries_ctl.actionAfterLastFailedRetry(
[&storage = storage, part_name = part->name]()
{ storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); });
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
/// We do not know whether or not data has been inserted.
retries_ctl.setUserError(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown status, client must retry. Reason: {}",
Coordination::errorMessage(multi_code));
return;
throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)),
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
else if (Coordination::isUserError(multi_code))
{
@ -689,72 +580,62 @@ void ReplicatedMergeTreeSink::commitPart(
part->renameTo(temporary_part_relative_path, false);
/// If this part appeared on other replica than it's better to try to write it locally one more time. If it's our part
/// than it will be ignored on the next iteration.
/// than it will be ignored on the next itration.
++loop_counter;
if (loop_counter == max_iterations)
{
part->is_duplicate = true; /// Part is duplicate, just remove it from local FS
throw Exception("Too many transaction retries - it may indicate an error", ErrorCodes::DUPLICATE_DATA_PART);
}
retries_ctl.requestUnconditionalRetry(); /// we want one more iteration w/o counting it as a try and timeout
return;
continue;
}
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
{
storage.unlockSharedData(*part, zookeeper);
storage.unlockSharedData(*part);
transaction.rollback();
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
}
else
{
storage.unlockSharedData(*part, zookeeper);
storage.unlockSharedData(*part);
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
transaction.rollback();
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected logical error while adding block {} with ID '{}': {}, path {}",
block_number,
block_id,
Coordination::errorMessage(multi_code),
failed_op_path);
throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ Coordination::errorMessage(multi_code) + ", path " + failed_op_path,
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
else if (Coordination::isHardwareError(multi_code))
{
storage.unlockSharedData(*part);
transaction.rollback();
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
else
{
storage.unlockSharedData(*part, zookeeper);
storage.unlockSharedData(*part);
transaction.rollback();
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
block_id,
Coordination::errorMessage(multi_code));
throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
},
[&zookeeper]() { zookeeper->cleanupEphemeralNodes(); });
break;
}
if (isQuorumEnabled())
{
ZooKeeperRetriesControl quorum_retries_ctl("waitForQuorum", zookeeper_retries_info);
quorum_retries_ctl.retryLoop([&]()
if (is_already_existing_part)
{
zookeeper->setKeeper(storage.getZooKeeper());
/// We get duplicate part without fetch
/// Check if this quorum insert is parallel or not
if (zookeeper->exists(storage.zookeeper_path + "/quorum/parallel/" + part->name))
storage.updateQuorum(part->name, true);
else if (zookeeper->exists(storage.zookeeper_path + "/quorum/status"))
storage.updateQuorum(part->name, false);
}
if (is_already_existing_part)
{
/// We get duplicate part without fetch
/// Check if this quorum insert is parallel or not
if (zookeeper->exists(storage.zookeeper_path + "/quorum/parallel/" + part->name))
storage.updateQuorum(part->name, true);
else if (zookeeper->exists(storage.zookeeper_path + "/quorum/status"))
storage.updateQuorum(part->name, false);
}
if (!quorum_retries_ctl.callAndCatchAll(
[&]()
{ waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num); }))
return;
});
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num);
}
}
@ -769,11 +650,11 @@ void ReplicatedMergeTreeSink::onFinish()
{
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
finishDelayedChunk(std::make_shared<ZooKeeperWithFaultInjection>(zookeeper));
finishDelayedChunk(zookeeper);
}
void ReplicatedMergeTreeSink::waitForQuorum(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
zkutil::ZooKeeperPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,
Int32 is_active_node_version,

View File

@ -3,8 +3,6 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <base/types.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Storages/MergeTree/ZooKeeperWithFaultInjection.h>
namespace Poco { class Logger; }
@ -62,7 +60,6 @@ public:
}
private:
ZooKeeperRetriesInfo zookeeper_retries_info;
struct QuorumInfo
{
String status_path;
@ -74,24 +71,20 @@ private:
/// Checks active replicas.
/// Returns total number of replicas.
size_t checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper);
size_t checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper);
/// Rename temporary part and commit to ZooKeeper.
void commitPart(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
zkutil::ZooKeeperPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part,
const String & block_id,
size_t replicas_num,
bool writing_existing_part);
size_t replicas_num);
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
/// Also checks that replica still alive.
void waitForQuorum(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,
int is_active_node_version,
size_t replicas_num) const;
zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name,
const std::string & quorum_path, int is_active_node_version, size_t replicas_num) const;
StorageReplicatedMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
@ -123,7 +116,7 @@ private:
struct DelayedChunk;
std::unique_ptr<DelayedChunk> delayed_chunk;
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
void finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper);
};
}

View File

@ -1,265 +0,0 @@
#pragma once
#include <base/sleep.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int OK;
}
struct ZooKeeperRetriesInfo
{
ZooKeeperRetriesInfo() = default;
ZooKeeperRetriesInfo(std::string name_, Poco::Logger * logger_, UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
: name(std::move(name_))
, logger(logger_)
, max_retries(max_retries_)
, curr_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_))
, max_backoff_ms(max_backoff_ms_)
{
}
std::string name;
Poco::Logger * logger = nullptr;
UInt64 max_retries = 0;
UInt64 curr_backoff_ms = 0;
UInt64 max_backoff_ms = 0;
UInt64 retry_count = 0;
};
class ZooKeeperRetriesControl
{
public:
ZooKeeperRetriesControl(std::string name_, ZooKeeperRetriesInfo & retries_info_) : name(std::move(name_)), retries_info(retries_info_)
{
}
void retryLoop(auto && f)
{
retryLoop(f, []() {});
}
void retryLoop(auto && f, auto && iteration_cleanup)
{
while (canTry())
{
try
{
f();
iteration_cleanup();
}
catch (const zkutil::KeeperException & e)
{
iteration_cleanup();
if (!Coordination::isHardwareError(e.code))
throw;
setKeeperError(e.code, e.message());
}
catch (...)
{
iteration_cleanup();
throw;
}
}
}
bool callAndCatchAll(auto && f)
{
try
{
f();
return true;
}
catch (const zkutil::KeeperException & e)
{
setKeeperError(e.code, e.message());
}
catch (const Exception & e)
{
setUserError(e.code(), e.what());
}
return false;
}
void setUserError(int code, std::string message)
{
if (retries_info.logger)
LOG_TRACE(
retries_info.logger, "ZooKeeperRetriesControl: {}/{}: setUserError: error={} message={}", retries_info.name, name, code, message);
/// if current iteration is already failed, keep initial error
if (!iteration_succeeded)
return;
iteration_succeeded = false;
user_error.code = code;
user_error.message = std::move(message);
keeper_error = KeeperError{};
}
template <typename... Args>
void setUserError(int code, fmt::format_string<Args...> fmt, Args &&... args)
{
setUserError(code, fmt::format(fmt, std::forward<Args>(args)...));
}
void setKeeperError(Coordination::Error code, std::string message)
{
if (retries_info.logger)
LOG_TRACE(
retries_info.logger, "ZooKeeperRetriesControl: {}/{}: setKeeperError: error={} message={}", retries_info.name, name, code, message);
/// if current iteration is already failed, keep initial error
if (!iteration_succeeded)
return;
iteration_succeeded = false;
keeper_error.code = code;
keeper_error.message = std::move(message);
user_error = UserError{};
}
void stopRetries() { stop_retries = true; }
void requestUnconditionalRetry() { unconditional_retry = true; }
bool isLastRetry() const { return retries_info.retry_count >= retries_info.max_retries; }
bool isRetry() const { return retries_info.retry_count > 0; }
Coordination::Error getLastKeeperErrorCode() const { return keeper_error.code; }
/// action will be called only once and only after latest failed retry
void actionAfterLastFailedRetry(std::function<void()> f) { action_after_last_failed_retry = std::move(f); }
private:
struct KeeperError
{
using Code = Coordination::Error;
Code code = Code::ZOK;
std::string message;
};
struct UserError
{
int code = ErrorCodes::OK;
std::string message;
};
bool canTry()
{
++iteration_count;
/// first iteration is ordinary execution, no further checks needed
if (0 == iteration_count)
return true;
if (unconditional_retry)
{
unconditional_retry = false;
return true;
}
/// iteration succeeded -> no need to retry
if (iteration_succeeded)
{
/// avoid unnecessary logs, - print something only in case of retries
if (retries_info.logger && iteration_count > 1)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: succeeded after: iterations={} total_retries={}",
retries_info.name,
name,
iteration_count,
retries_info.retry_count);
return false;
}
if (stop_retries)
{
logLastError("stop retries on request");
action_after_last_failed_retry();
throwIfError();
return false;
}
if (retries_info.retry_count >= retries_info.max_retries)
{
logLastError("retry limit is reached");
action_after_last_failed_retry();
throwIfError();
return false;
}
/// retries
++retries_info.retry_count;
logLastError("will retry due to error");
sleepForMilliseconds(retries_info.curr_backoff_ms);
retries_info.curr_backoff_ms = std::min(retries_info.curr_backoff_ms * 2, retries_info.max_backoff_ms);
/// reset the flag, it will be set to false in case of error
iteration_succeeded = true;
return true;
}
void throwIfError() const
{
if (user_error.code != ErrorCodes::OK)
throw Exception(user_error.code, user_error.message);
if (keeper_error.code != KeeperError::Code::ZOK)
throw zkutil::KeeperException(keeper_error.code, keeper_error.message);
}
void logLastError(std::string_view header)
{
if (user_error.code == ErrorCodes::OK)
{
if (retries_info.logger)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: {}: retry_count={} timeout={}ms error={} message={}",
retries_info.name,
name,
header,
retries_info.retry_count,
retries_info.curr_backoff_ms,
keeper_error.code,
keeper_error.message);
}
else
{
if (retries_info.logger)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: {}: retry_count={} timeout={}ms error={} message={}",
retries_info.name,
name,
header,
retries_info.retry_count,
retries_info.curr_backoff_ms,
user_error.code,
user_error.message);
}
}
std::string name;
ZooKeeperRetriesInfo & retries_info;
Int64 iteration_count = -1;
UserError user_error;
KeeperError keeper_error;
std::function<void()> action_after_last_failed_retry = []() {};
bool unconditional_retry = false;
bool iteration_succeeded = true;
bool stop_retries = false;
};
}

View File

@ -1,527 +0,0 @@
#pragma once
#include <random>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/randomSeed.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class RandomFaultInjection
{
public:
RandomFaultInjection(double probability, UInt64 seed_) : rndgen(seed_), distribution(probability) { }
void beforeOperation()
{
if (distribution(rndgen))
throw zkutil::KeeperException("Fault injection before operation", Coordination::Error::ZSESSIONEXPIRED);
}
void afterOperation()
{
if (distribution(rndgen))
throw zkutil::KeeperException("Fault injection after operation", Coordination::Error::ZOPERATIONTIMEOUT);
}
private:
std::mt19937_64 rndgen;
std::bernoulli_distribution distribution;
};
///
/// ZooKeeperWithFaultInjection mimics ZooKeeper interface and inject failures according to failure policy if set
///
class ZooKeeperWithFaultInjection
{
using zk = zkutil::ZooKeeper;
zk::Ptr keeper;
zk::Ptr keeper_prev;
std::unique_ptr<RandomFaultInjection> fault_policy;
std::string name;
Poco::Logger * logger = nullptr;
UInt64 calls_total = 0;
UInt64 calls_without_fault_injection = 0;
const UInt64 seed = 0;
std::vector<std::string> ephemeral_nodes;
ZooKeeperWithFaultInjection(
zk::Ptr const & keeper_,
double fault_injection_probability,
UInt64 fault_injection_seed,
std::string name_,
Poco::Logger * logger_)
: keeper(keeper_), name(std::move(name_)), logger(logger_), seed(fault_injection_seed)
{
fault_policy = std::make_unique<RandomFaultInjection>(fault_injection_probability, fault_injection_seed);
if (unlikely(logger))
LOG_TRACE(
logger,
"ZooKeeperWithFaultInjection created: name={} seed={} fault_probability={}",
name,
seed,
fault_injection_probability);
}
public:
using Ptr = std::shared_ptr<ZooKeeperWithFaultInjection>;
static ZooKeeperWithFaultInjection::Ptr createInstance(
double fault_injection_probability, UInt64 fault_injection_seed, const zk::Ptr & zookeeper, std::string name, Poco::Logger * logger)
{
/// validate all parameters here, constructor just accept everything
if (fault_injection_probability < 0.0)
fault_injection_probability = .0;
else if (fault_injection_probability > 1.0)
fault_injection_probability = 1.0;
if (0 == fault_injection_seed)
fault_injection_seed = randomSeed();
if (fault_injection_probability > 0.0)
return std::shared_ptr<ZooKeeperWithFaultInjection>(
new ZooKeeperWithFaultInjection(zookeeper, fault_injection_probability, fault_injection_seed, std::move(name), logger));
/// if no fault injection provided, create instance which will not log anything
return std::make_shared<ZooKeeperWithFaultInjection>(zookeeper);
}
explicit ZooKeeperWithFaultInjection(zk::Ptr const & keeper_) : keeper(keeper_) { }
~ZooKeeperWithFaultInjection()
{
if (unlikely(logger))
LOG_TRACE(
logger,
"ZooKeeperWithFaultInjection report: name={} seed={} calls_total={} calls_succeeded={} calls_failed={} failure_rate={}",
name,
seed,
calls_total,
calls_without_fault_injection,
calls_total - calls_without_fault_injection,
float(calls_total - calls_without_fault_injection) / calls_total);
}
void setKeeper(zk::Ptr const & keeper_) { keeper = keeper_; }
bool isNull() const { return keeper.get() == nullptr; }
///
/// mirror ZooKeeper interface
///
Strings getChildren(
const std::string & path,
Coordination::Stat * stat = nullptr,
const zkutil::EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return access("getChildren", path, [&]() { return keeper->getChildren(path, stat, watch, list_request_type); });
}
Coordination::Error tryGetChildren(
const std::string & path,
Strings & res,
Coordination::Stat * stat = nullptr,
const zkutil::EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return access("tryGetChildren", path, [&]() { return keeper->tryGetChildren(path, res, stat, watch, list_request_type); });
}
zk::FutureExists asyncExists(const std::string & path, Coordination::WatchCallback watch_callback = {})
{
return access("asyncExists", path, [&]() { return keeper->asyncExists(path, watch_callback); });
}
zk::FutureGet asyncTryGet(const std::string & path)
{
return access("asyncTryGet", path, [&]() { return keeper->asyncTryGet(path); });
}
bool tryGet(
const std::string & path,
std::string & res,
Coordination::Stat * stat = nullptr,
const zkutil::EventPtr & watch = nullptr,
Coordination::Error * code = nullptr)
{
return access("tryGet", path, [&]() { return keeper->tryGet(path, res, stat, watch, code); });
}
Coordination::Error tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses)
{
constexpr auto method = "tryMulti";
auto error = access(
method,
!requests.empty() ? requests.front()->getPath() : "",
[&]() { return keeper->tryMulti(requests, responses); },
[&](const Coordination::Error & original_error)
{
if (original_error == Coordination::Error::ZOK)
faultInjectionPostAction(method, requests, responses);
},
[&]()
{
responses.clear();
for (size_t i = 0; i < requests.size(); ++i)
responses.emplace_back(std::make_shared<Coordination::ZooKeeperErrorResponse>());
});
/// collect ephemeral nodes when no fault was injected (to clean up on demand)
if (unlikely(fault_policy) && Coordination::Error::ZOK == error)
{
doForEachCreatedEphemeralNode(
method, requests, responses, [&](const String & path_created) { ephemeral_nodes.push_back(path_created); });
}
return error;
}
Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses)
{
constexpr auto method = "tryMultiNoThrow";
constexpr auto no_throw = true;
constexpr auto inject_failure_before_op = false;
auto error = access<no_throw, inject_failure_before_op>(
method,
!requests.empty() ? requests.front()->getPath() : "",
[&]() { return keeper->tryMultiNoThrow(requests, responses); },
[&](const Coordination::Error & original_error)
{
if (original_error == Coordination::Error::ZOK)
faultInjectionPostAction(method, requests, responses);
},
[&]()
{
responses.clear();
for (size_t i = 0; i < requests.size(); ++i)
responses.emplace_back(std::make_shared<Coordination::ZooKeeperErrorResponse>());
});
/// collect ephemeral nodes when no fault was injected (to clean up later)
if (unlikely(fault_policy) && Coordination::Error::ZOK == error)
{
doForEachCreatedEphemeralNode(
method, requests, responses, [&](const String & path_created) { ephemeral_nodes.push_back(path_created); });
}
return error;
}
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const zkutil::EventPtr & watch = nullptr)
{
return access("get", path, [&]() { return keeper->get(path, stat, watch); });
}
zkutil::ZooKeeper::MultiGetResponse get(const std::vector<std::string> & paths)
{
return access("get", !paths.empty() ? paths.front() : "", [&]() { return keeper->get(paths); });
}
bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const zkutil::EventPtr & watch = nullptr)
{
return access("exists", path, [&]() { return keeper->exists(path, stat, watch); });
}
zkutil::ZooKeeper::MultiExistsResponse exists(const std::vector<std::string> & paths)
{
return access("exists", !paths.empty() ? paths.front() : "", [&]() { return keeper->exists(paths); });
}
std::string create(const std::string & path, const std::string & data, int32_t mode)
{
auto path_created = access(
"create",
path,
[&]() { return keeper->create(path, data, mode); },
[&](std::string const & result_path)
{
try
{
if (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral)
{
keeper->remove(result_path);
if (unlikely(logger))
LOG_TRACE(logger, "ZooKeeperWithFaultInjection cleanup: seed={} func={} path={}", seed, "create", result_path);
}
}
catch (const zkutil::KeeperException & e)
{
if (unlikely(logger))
LOG_TRACE(
logger,
"ZooKeeperWithFaultInjection cleanup FAILED: seed={} func={} path={} code={} message={} ",
seed,
"create",
result_path,
e.code,
e.message());
}
});
/// collect ephemeral nodes when no fault was injected (to clean up later)
if (unlikely(fault_policy))
{
if (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral)
ephemeral_nodes.push_back(path_created);
}
return path_created;
}
Coordination::Responses multi(const Coordination::Requests & requests)
{
constexpr auto method = "multi";
auto result = access(
method,
!requests.empty() ? requests.front()->getPath() : "",
[&]() { return keeper->multi(requests); },
[&](Coordination::Responses & responses) { faultInjectionPostAction(method, requests, responses); });
/// collect ephemeral nodes to clean up
if (unlikely(fault_policy))
{
doForEachCreatedEphemeralNode(
method, requests, result, [&](const String & path_created) { ephemeral_nodes.push_back(path_created); });
}
return result;
}
void createAncestors(const std::string & path)
{
access("createAncestors", path, [&]() { return keeper->createAncestors(path); });
}
Coordination::Error tryRemove(const std::string & path, int32_t version = -1)
{
return access("tryRemove", path, [&]() { return keeper->tryRemove(path, version); });
}
void cleanupEphemeralNodes()
{
for (const auto & path : ephemeral_nodes)
{
try
{
if (keeper_prev)
keeper_prev->tryRemove(path);
}
catch (...)
{
if (unlikely(logger))
tryLogCurrentException(logger, "Exception during ephemeral nodes clean up");
}
}
ephemeral_nodes.clear();
}
private:
void faultInjectionBefore(std::function<void()> fault_cleanup)
{
try
{
if (unlikely(fault_policy))
fault_policy->beforeOperation();
}
catch (const zkutil::KeeperException &)
{
fault_cleanup();
throw;
}
}
void faultInjectionAfter(std::function<void()> fault_cleanup)
{
try
{
if (unlikely(fault_policy))
fault_policy->afterOperation();
}
catch (const zkutil::KeeperException &)
{
fault_cleanup();
throw;
}
}
void doForEachCreatedEphemeralNode(
const char * method, const Coordination::Requests & requests, const Coordination::Responses & responses, auto && action)
{
if (responses.empty())
return;
if (responses.size() != requests.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Number of responses doesn't match number of requests: method={} requests={} responses={}",
method,
requests.size(),
responses.size());
/// find create request with ephemeral flag
std::vector<std::pair<size_t, const Coordination::CreateRequest *>> create_requests;
for (size_t i = 0; i < requests.size(); ++i)
{
const auto * create_req = dynamic_cast<const Coordination::CreateRequest *>(requests[i].get());
if (create_req && create_req->is_ephemeral)
create_requests.emplace_back(i, create_req);
}
for (auto && [i, req] : create_requests)
{
const auto * create_resp = dynamic_cast<const Coordination::CreateResponse *>(responses.at(i).get());
if (!create_resp)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Response should be CreateResponse: method={} index={} path={}", method, i, req->path);
action(create_resp->path_created);
}
}
void faultInjectionPostAction(const char * method, const Coordination::Requests & requests, Coordination::Responses & responses)
{
doForEachCreatedEphemeralNode(method, requests, responses, [&](const String & path_created) { keeper->remove(path_created); });
}
template <typename T>
struct FaultCleanupTypeImpl
{
using Type = std::function<void(T &)>;
};
template <>
struct FaultCleanupTypeImpl<void>
{
using Type = std::function<void()>;
};
template <typename T>
using FaultCleanupType = typename FaultCleanupTypeImpl<T>::Type;
template <
bool no_throw_access = false,
bool inject_failure_before_op = true,
int inject_failure_after_op = true,
typename Operation,
typename Result = std::invoke_result_t<Operation>>
Result access(
const char * func_name,
const std::string & path,
Operation operation,
FaultCleanupType<Result> fault_after_op_cleanup = {},
FaultCleanupType<void> fault_before_op_cleanup = {})
{
try
{
++calls_total;
if (!keeper)
throw zkutil::KeeperException(
"Session is considered to be expired due to fault injection", Coordination::Error::ZSESSIONEXPIRED);
if constexpr (inject_failure_before_op)
{
faultInjectionBefore(
[&]
{
if (fault_before_op_cleanup)
fault_before_op_cleanup();
});
}
if constexpr (!std::is_same_v<Result, void>)
{
Result res = operation();
/// if connectivity error occurred w/o fault injection -> just return it
if constexpr (std::is_same_v<Coordination::Error, Result>)
{
if (Coordination::isHardwareError(res))
return res;
}
if constexpr (inject_failure_after_op)
{
faultInjectionAfter(
[&]
{
if (fault_after_op_cleanup)
fault_after_op_cleanup(res);
});
}
++calls_without_fault_injection;
if (unlikely(logger))
LOG_TRACE(logger, "ZooKeeperWithFaultInjection call SUCCEEDED: seed={} func={} path={}", seed, func_name, path);
return res;
}
else
{
operation();
if constexpr (inject_failure_after_op)
{
faultInjectionAfter(
[&fault_after_op_cleanup]
{
if (fault_after_op_cleanup)
fault_after_op_cleanup();
});
}
++calls_without_fault_injection;
if (unlikely(logger))
LOG_TRACE(logger, "ZooKeeperWithFaultInjection call SUCCEEDED: seed={} func={} path={}", seed, func_name, path);
}
}
catch (const zkutil::KeeperException & e)
{
if (unlikely(logger))
LOG_TRACE(
logger,
"ZooKeeperWithFaultInjection call FAILED: seed={} func={} path={} code={} message={} ",
seed,
func_name,
path,
e.code,
e.message());
/// save valid pointer to clean up ephemeral nodes later if necessary
if (keeper)
keeper_prev = keeper;
keeper.reset();
/// for try*NoThrow() methods
if constexpr (no_throw_access)
return e.code;
if constexpr (std::is_same_v<Coordination::Error, Result>)
{
/// try*() methods throws at least on hardware error and return only on user errors
/// todo: the methods return only on subset of user errors, and throw on another errors
/// to mimic the methods exactly - we need to specify errors on which to return for each such method
if (Coordination::isHardwareError(e.code))
throw;
return e.code;
}
throw;
}
}
};
using ZooKeeperWithFaultInjectionPtr = ZooKeeperWithFaultInjection::Ptr;
}

View File

@ -4479,16 +4479,9 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
/// If table is read-only because it doesn't have metadata in zk yet, then it's not possible to insert into it
/// Without this check, we'll write data parts on disk, and afterwards will remove them since we'll fail to commit them into zk
/// In case of remote storage like s3, it'll generate unnecessary PUT requests
if (is_readonly && (!has_metadata_in_zookeeper.has_value() || false == has_metadata_in_zookeeper.value()))
throw Exception(
ErrorCodes::TABLE_IS_READ_ONLY,
"Table is in readonly mode since table metadata was not found in zookeeper: replica_path={}",
replica_path);
const auto storage_settings_ptr = getSettings();
assertNotReadonly();
const Settings & query_settings = local_context->getSettingsRef();
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
@ -5003,7 +4996,8 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
Int64 mutation_version;
{
delimiting_block_lock = allocateBlockNumber(partition_id, getZooKeeper());
auto zookeeper = getZooKeeper();
delimiting_block_lock = allocateBlockNumber(partition_id, zookeeper);
right = delimiting_block_lock->getNumber();
/// Make sure we cover all parts in drop range.
/// There might be parts with mutation version greater than current block number
@ -5284,7 +5278,7 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_table_data, c
}
bool StorageReplicatedMergeTree::existsNodeCached(const ZooKeeperWithFaultInjectionPtr & zookeeper, const std::string & path) const
bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path) const
{
{
std::lock_guard lock(existing_nodes_cache_mutex);
@ -5292,7 +5286,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const ZooKeeperWithFaultInject
return true;
}
bool res = zookeeper->exists(path);
bool res = getZooKeeper()->exists(path);
if (res)
{
@ -5304,22 +5298,9 @@ bool StorageReplicatedMergeTree::existsNodeCached(const ZooKeeperWithFaultInject
}
std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id,
const zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path,
const String & zookeeper_path_prefix) const
{
return allocateBlockNumber(
partition_id, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_block_id_path, zookeeper_path_prefix);
}
std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const String & zookeeper_block_id_path,
const String & zookeeper_path_prefix) const
std::optional<EphemeralLockInZooKeeper>
StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path, const String & zookeeper_path_prefix) const
{
String zookeeper_table_path;
if (zookeeper_path_prefix.empty())
@ -5330,7 +5311,7 @@ std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBloc
String block_numbers_path = fs::path(zookeeper_table_path) / "block_numbers";
String partition_path = fs::path(block_numbers_path) / partition_id;
if (!existsNodeCached(zookeeper, partition_path))
if (!existsNodeCached(partition_path))
{
Coordination::Requests ops;
/// Check that table is not being dropped ("host" is the first node that is removed on replica drop)
@ -5348,9 +5329,10 @@ std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBloc
}
return createEphemeralLockInZooKeeper(
fs::path(partition_path) / "block-", fs::path(zookeeper_table_path) / "temp", zookeeper, zookeeper_block_id_path);
fs::path(partition_path) / "block-", fs::path(zookeeper_table_path) / "temp", *zookeeper, zookeeper_block_id_path);
}
Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry(
const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
{
@ -7103,7 +7085,7 @@ CancellationCode StorageReplicatedMergeTree::killPartMoveToShard(const UUID & ta
void StorageReplicatedMergeTree::getCommitPartOps(
Coordination::Requests & ops,
const DataPartPtr & part,
MutableDataPartPtr & part,
const String & block_id_path) const
{
const String & part_name = part->name;
@ -7685,28 +7667,11 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node);
createZeroCopyLockNode(
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false);
createZeroCopyLockNode(zookeeper, zookeeper_node, zkutil::CreateMode::Ephemeral, false);
}
}
void StorageReplicatedMergeTree::lockSharedData(
const IMergeTreeDataPart & part,
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files) const
{
auto zookeeper = tryGetZooKeeper();
if (zookeeper)
return lockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), replace_existing_lock, hardlinked_files);
else
return lockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(nullptr), replace_existing_lock, hardlinked_files);
}
void StorageReplicatedMergeTree::lockSharedData(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files) const
void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const
{
auto settings = getSettings();
@ -7716,7 +7681,8 @@ void StorageReplicatedMergeTree::lockSharedData(
if (!part.getDataPartStorage().supportZeroCopyReplication())
return;
if (zookeeper->isNull())
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return;
String id = part.getUniqueId();
@ -7750,14 +7716,7 @@ void StorageReplicatedMergeTree::lockSharedData(
}
}
std::pair<bool, NameSet>
StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
{
return unlockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(nullptr));
}
std::pair<bool, NameSet>
StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper) const
std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
{
auto settings = getSettings();
if (!settings->allow_remote_fs_zero_copy_replication)
@ -7803,10 +7762,11 @@ StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, co
/// We remove parts during table shutdown. If exception happen, restarting thread will be already turned
/// off and nobody will reconnect our zookeeper connection. In this case we use zookeeper connection from
/// context.
zkutil::ZooKeeperPtr zookeeper;
if (shutdown_called.load())
zookeeper->setKeeper(getZooKeeperIfTableShutDown());
zookeeper = getZooKeeperIfTableShutDown();
else
zookeeper->setKeeper(getZooKeeper());
zookeeper = getZooKeeper();
/// It can happen that we didn't had the connection to zookeeper during table creation, but actually
/// table is completely dropped, so we can drop it without any additional checks.
@ -7831,7 +7791,7 @@ namespace
/// But sometimes we need an opposite. When we deleting all_0_0_0_1 it can be non replicated to other replicas, so we are the only owner of this part.
/// In this case when we will drop all_0_0_0_1 we will drop blobs for all_0_0_0. But it will lead to dataloss. For such case we need to check that other replicas
/// still need parent part.
std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionPtr & zookeeper_ptr, const std::string & zero_copy_part_path_prefix, const std::string & part_info_str, MergeTreeDataFormatVersion format_version, Poco::Logger * log)
std::pair<bool, NameSet> getParentLockedBlobs(zkutil::ZooKeeperPtr zookeeper_ptr, const std::string & zero_copy_part_path_prefix, const std::string & part_info_str, MergeTreeDataFormatVersion format_version, Poco::Logger * log)
{
NameSet files_not_to_remove;
@ -7893,7 +7853,7 @@ std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionP
std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
String part_id, const String & table_uuid, const String & part_name,
const String & replica_name_, const std::string & disk_type, const ZooKeeperWithFaultInjectionPtr & zookeeper_ptr, const MergeTreeSettings & settings,
const String & replica_name_, const std::string & disk_type, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
Poco::Logger * logger, const String & zookeeper_path_old, MergeTreeDataFormatVersion data_format_version)
{
boost::replace_all(part_id, "/", "_");
@ -7912,8 +7872,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
if (!files_not_to_remove_str.empty())
boost::split(files_not_to_remove, files_not_to_remove_str, boost::is_any_of("\n "));
auto [has_parent, parent_not_to_remove] = getParentLockedBlobs(
zookeeper_ptr, fs::path(zc_zookeeper_path).parent_path(), part_name, data_format_version, logger);
auto [has_parent, parent_not_to_remove] = getParentLockedBlobs(zookeeper_ptr, fs::path(zc_zookeeper_path).parent_path(), part_name, data_format_version, logger);
files_not_to_remove.insert(parent_not_to_remove.begin(), parent_not_to_remove.end());
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id;
@ -8428,7 +8387,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
void StorageReplicatedMergeTree::createZeroCopyLockNode(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, int32_t mode,
const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode,
bool replace_existing_lock, const String & path_to_set_hardlinked_files, const NameSet & hardlinked_files)
{
/// In rare case other replica can remove path between createAncestors and createIfNotExists
@ -8545,7 +8504,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
id, table_uuid, part_name,
detached_replica_name,
toString(disk->getDataSourceDescription().type),
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), local_context->getReplicatedMergeTreeSettings(),
zookeeper, local_context->getReplicatedMergeTreeSettings(),
&Poco::Logger::get("StorageReplicatedMergeTree"),
detached_zookeeper_path,
MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);

View File

@ -83,9 +83,6 @@ namespace DB
* as the time will take the time of creation the appropriate part on any of the replicas.
*/
class ZooKeeperWithFaultInjection;
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr<ZooKeeperWithFaultInjection>;
class StorageReplicatedMergeTree final : public MergeTreeData
{
public:
@ -270,11 +267,6 @@ public:
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const override;
void lockSharedData(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files) const;
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
@ -282,23 +274,13 @@ public:
/// Return true if data unlocked
/// Return false if data is still used by another node
std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart & part) const override;
std::pair<bool, NameSet>
unlockSharedData(const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper) const;
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node
static std::pair<bool, NameSet> unlockSharedDataByID(
String part_id,
const String & table_uuid,
const String & part_name,
const String & replica_name_,
const std::string & disk_type,
const ZooKeeperWithFaultInjectionPtr & zookeeper_,
const MergeTreeSettings & settings,
Poco::Logger * logger,
const String & zookeeper_path_old,
MergeTreeDataFormatVersion data_format_version);
static std::pair<bool, NameSet> unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
const std::string & disk_type, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
const String & zookeeper_path_old, MergeTreeDataFormatVersion data_format_version);
/// Fetch part only if some replica has it on shared storage like S3
MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
@ -552,7 +534,7 @@ private:
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const String & block_id_path = "") const;
void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const;
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
@ -729,11 +711,6 @@ private:
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const String & zookeeper_block_id_path = "",
const String & zookeeper_path_prefix = "") const;
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica.
@ -771,7 +748,7 @@ private:
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
mutable std::unordered_set<std::string> existing_nodes_cache;
mutable std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const ZooKeeperWithFaultInjectionPtr & zookeeper, const std::string & path) const;
bool existsNodeCached(const std::string & path) const;
/// Cancels INSERTs in the block range by removing ephemeral block numbers
void clearLockedBlockNumbersInPartition(zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
@ -859,7 +836,7 @@ private:
const String & part_name, const String & zookeeper_path_old);
static void createZeroCopyLockNode(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node,
const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node,
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});

View File

@ -71,7 +71,6 @@ ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/marks.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/insert_keeper_retries.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -1,8 +0,0 @@
<clickhouse>
<profiles>
<default>
<insert_keeper_max_retries>0</insert_keeper_max_retries>
<insert_keeper_fault_injection_probability>0.0</insert_keeper_fault_injection_probability>
</default>
</profiles>
</clickhouse>

View File

@ -1,100 +0,0 @@
#!/usr/bin/env python3
import pytest
import time
import threading
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_replica_inserts_with_keeper_restart(started_cluster):
try:
node1.query(
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()"
)
p = Pool(1)
zk_stopped_event = threading.Event()
def zoo_restart(zk_stopped_event):
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
zk_stopped_event.set()
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
job = p.apply_async(zoo_restart, (zk_stopped_event,))
zk_stopped_event.wait(90)
node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20"
)
node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20"
)
job.wait()
p.close()
p.join()
assert node1.query("SELECT COUNT() FROM r") == "20\n"
finally:
node1.query("DROP TABLE IF EXISTS r SYNC")
def test_replica_inserts_with_keeper_disconnect(started_cluster):
try:
node1.query(
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()"
)
p = Pool(1)
disconnect_event = threading.Event()
def keeper_disconnect(node, event):
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
event.set()
job = p.apply_async(
keeper_disconnect,
(
node1,
disconnect_event,
),
)
disconnect_event.wait(90)
node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20"
)
node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20"
)
job.wait()
p.close()
p.join()
assert node1.query("SELECT COUNT() FROM r") == "20\n"
finally:
node1.query("DROP TABLE IF EXISTS r SYNC")

View File

@ -7,9 +7,6 @@ from helpers.test_tools import assert_eq_with_retry
def fill_nodes(nodes):
for node in nodes:
node.query("DROP TABLE IF EXISTS test SYNC")
for node in nodes:
node.query(
"""
@ -32,7 +29,11 @@ nodes = [node_1, node_2, node_3]
def fill_table():
fill_nodes(nodes)
node_1.query("TRUNCATE TABLE test")
for node in nodes:
node.query("SYSTEM SYNC REPLICA test")
check_data(0, 0)
# it will create multiple parts in each partition and probably cause merges

View File

@ -1,14 +1,14 @@
-- Tags: zookeeper, no-replicated-database
-- Tag no-replicated-database: Old syntax is not allowed
DROP TABLE IF EXISTS alter_00121 SYNC;
DROP TABLE IF EXISTS alter_00121;
set allow_deprecated_syntax_for_merge_tree=1;
CREATE TABLE alter_00121 (d Date, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/alter_00121/t1', 'r1', d, (d), 8192);
INSERT INTO alter_00121 VALUES ('2014-01-01', 1);
ALTER TABLE alter_00121 DROP COLUMN x;
DROP TABLE alter_00121 SYNC;
DROP TABLE alter_00121;
CREATE TABLE alter_00121 (d Date) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/alter_00121/t2', 'r1', d, (d), 8192);
@ -23,4 +23,4 @@ SELECT * FROM alter_00121 ORDER BY d;
ALTER TABLE alter_00121 DROP COLUMN x;
SELECT * FROM alter_00121 ORDER BY d;
DROP TABLE alter_00121 SYNC;
DROP TABLE alter_00121;

View File

@ -1,12 +1,11 @@
-- Tags: long, replica
SET replication_alter_partitions_sync = 2;
SET insert_keeper_fault_injection_probability=0;
SELECT '*** Not partitioned ***';
DROP TABLE IF EXISTS not_partitioned_replica1_00502 SYNC;
DROP TABLE IF EXISTS not_partitioned_replica2_00502 SYNC;
DROP TABLE IF EXISTS not_partitioned_replica1_00502;
DROP TABLE IF EXISTS not_partitioned_replica2_00502;
CREATE TABLE not_partitioned_replica1_00502(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/not_partitioned_00502', '1') ORDER BY x;
CREATE TABLE not_partitioned_replica2_00502(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/not_partitioned_00502', '2') ORDER BY x;
@ -15,7 +14,7 @@ INSERT INTO not_partitioned_replica1_00502 VALUES (4), (5);
SELECT 'Parts before OPTIMIZE:';
SELECT partition, name FROM system.parts WHERE database = currentDatabase() AND table = 'not_partitioned_replica1_00502' AND active ORDER BY name;
SYSTEM SYNC REPLICA not_partitioned_replica2_00502;
SYSTEM SYNC REPLICA not_partitioned_replica1_00502;
OPTIMIZE TABLE not_partitioned_replica1_00502 PARTITION tuple() FINAL;
SELECT 'Parts after OPTIMIZE:';
SELECT partition, name FROM system.parts WHERE database = currentDatabase() AND table = 'not_partitioned_replica2_00502' AND active ORDER BY name;
@ -26,13 +25,13 @@ ALTER TABLE not_partitioned_replica1_00502 DETACH PARTITION ID 'all';
SELECT 'Sum after DETACH PARTITION:';
SELECT sum(x) FROM not_partitioned_replica2_00502;
DROP TABLE not_partitioned_replica1_00502 SYNC;
DROP TABLE not_partitioned_replica2_00502 SYNC;
DROP TABLE not_partitioned_replica1_00502;
DROP TABLE not_partitioned_replica2_00502;
SELECT '*** Partitioned by week ***';
DROP TABLE IF EXISTS partitioned_by_week_replica1 SYNC;
DROP TABLE IF EXISTS partitioned_by_week_replica2 SYNC;
DROP TABLE IF EXISTS partitioned_by_week_replica1;
DROP TABLE IF EXISTS partitioned_by_week_replica2;
CREATE TABLE partitioned_by_week_replica1(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test/partitioned_by_week_00502', '1') PARTITION BY toMonday(d) ORDER BY x;
CREATE TABLE partitioned_by_week_replica2(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test/partitioned_by_week_00502', '2') PARTITION BY toMonday(d) ORDER BY x;
@ -42,7 +41,7 @@ INSERT INTO partitioned_by_week_replica1 VALUES ('2000-01-03', 4), ('2000-01-03'
SELECT 'Parts before OPTIMIZE:'; -- Select parts on the first replica to avoid waiting for replication.
SELECT partition, name FROM system.parts WHERE database = currentDatabase() AND table = 'partitioned_by_week_replica1' AND active ORDER BY name;
SYSTEM SYNC REPLICA partitioned_by_week_replica2;
SYSTEM SYNC REPLICA partitioned_by_week_replica1;
OPTIMIZE TABLE partitioned_by_week_replica1 PARTITION '2000-01-03' FINAL;
SELECT 'Parts after OPTIMIZE:'; -- After OPTIMIZE with replication_alter_partitions_sync=2 replicas must be in sync.
SELECT partition, name FROM system.parts WHERE database = currentDatabase() AND table = 'partitioned_by_week_replica2' AND active ORDER BY name;
@ -53,13 +52,13 @@ ALTER TABLE partitioned_by_week_replica1 DROP PARTITION '1999-12-27';
SELECT 'Sum after DROP PARTITION:';
SELECT sum(x) FROM partitioned_by_week_replica2;
DROP TABLE partitioned_by_week_replica1 SYNC;
DROP TABLE partitioned_by_week_replica2 SYNC;
DROP TABLE partitioned_by_week_replica1;
DROP TABLE partitioned_by_week_replica2;
SELECT '*** Partitioned by a (Date, UInt8) tuple ***';
DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00502 SYNC;
DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00502 SYNC;
DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00502;
DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00502;
CREATE TABLE partitioned_by_tuple_replica1_00502(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/partitioned_by_tuple_00502', '1') ORDER BY x PARTITION BY (d, x);
CREATE TABLE partitioned_by_tuple_replica2_00502(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/partitioned_by_tuple_00502', '2') ORDER BY x PARTITION BY (d, x);
@ -68,7 +67,7 @@ INSERT INTO partitioned_by_tuple_replica1_00502 VALUES ('2000-01-02', 1, 4), ('2
SELECT 'Parts before OPTIMIZE:';
SELECT partition, name FROM system.parts WHERE database = currentDatabase() AND table = 'partitioned_by_tuple_replica1_00502' AND active ORDER BY name;
SYSTEM SYNC REPLICA partitioned_by_tuple_replica2_00502;
SYSTEM SYNC REPLICA partitioned_by_tuple_replica1_00502;
OPTIMIZE TABLE partitioned_by_tuple_replica1_00502 PARTITION ('2000-01-01', 1) FINAL;
OPTIMIZE TABLE partitioned_by_tuple_replica1_00502 PARTITION ('2000-01-02', 1) FINAL;
SELECT 'Parts after OPTIMIZE:';
@ -80,13 +79,13 @@ ALTER TABLE partitioned_by_tuple_replica1_00502 DETACH PARTITION ID '20000101-1'
SELECT 'Sum after DETACH PARTITION:';
SELECT sum(y) FROM partitioned_by_tuple_replica2_00502;
DROP TABLE partitioned_by_tuple_replica1_00502 SYNC;
DROP TABLE partitioned_by_tuple_replica2_00502 SYNC;
DROP TABLE partitioned_by_tuple_replica1_00502;
DROP TABLE partitioned_by_tuple_replica2_00502;
SELECT '*** Partitioned by String ***';
DROP TABLE IF EXISTS partitioned_by_string_replica1 SYNC;
DROP TABLE IF EXISTS partitioned_by_string_replica2 SYNC;
DROP TABLE IF EXISTS partitioned_by_string_replica1;
DROP TABLE IF EXISTS partitioned_by_string_replica2;
CREATE TABLE partitioned_by_string_replica1(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test/partitioned_by_string_00502', '1') PARTITION BY s ORDER BY x;
CREATE TABLE partitioned_by_string_replica2(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test/partitioned_by_string_00502', '2') PARTITION BY s ORDER BY x;
@ -106,13 +105,13 @@ ALTER TABLE partitioned_by_string_replica1 DROP PARTITION 'bbb';
SELECT 'Sum after DROP PARTITION:';
SELECT sum(x) FROM partitioned_by_string_replica2;
DROP TABLE partitioned_by_string_replica1 SYNC;
DROP TABLE partitioned_by_string_replica2 SYNC;
DROP TABLE partitioned_by_string_replica1;
DROP TABLE partitioned_by_string_replica2;
SELECT '*** Table without columns with fixed size ***';
DROP TABLE IF EXISTS without_fixed_size_columns_replica1 SYNC;
DROP TABLE IF EXISTS without_fixed_size_columns_replica2 SYNC;
DROP TABLE IF EXISTS without_fixed_size_columns_replica1;
DROP TABLE IF EXISTS without_fixed_size_columns_replica2;
CREATE TABLE without_fixed_size_columns_replica1(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test/without_fixed_size_columns_00502', '1') PARTITION BY length(s) ORDER BY s;
CREATE TABLE without_fixed_size_columns_replica2(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test/without_fixed_size_columns_00502', '2') PARTITION BY length(s) ORDER BY s;
@ -131,5 +130,5 @@ ALTER TABLE without_fixed_size_columns_replica1 DROP PARTITION 1;
SELECT 'After DROP PARTITION:';
SELECT * FROM without_fixed_size_columns_replica2 ORDER BY s;
DROP TABLE without_fixed_size_columns_replica1 SYNC;
DROP TABLE without_fixed_size_columns_replica2 SYNC;
DROP TABLE without_fixed_size_columns_replica1;
DROP TABLE without_fixed_size_columns_replica2;

View File

@ -9,8 +9,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=./mergetree_mutations.lib
. "$CURDIR"/mergetree_mutations.lib
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_r1 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_r2 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_r2"
${CLICKHOUSE_CLIENT} --allow_deprecated_syntax_for_merge_tree=1 --query="CREATE TABLE mutations_r1(d Date, x UInt32, s String, m MATERIALIZED x + 2) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/mutations', 'r1', d, intDiv(x, 10), 8192)"
${CLICKHOUSE_CLIENT} --allow_deprecated_syntax_for_merge_tree=1 --query="CREATE TABLE mutations_r2(d Date, x UInt32, s String, m MATERIALIZED x + 2) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/mutations', 'r2', d, intDiv(x, 10), 8192)"
@ -19,9 +19,9 @@ ${CLICKHOUSE_CLIENT} --allow_deprecated_syntax_for_merge_tree=1 --query="CREATE
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE x = 1 SETTINGS mutations_sync = 2"
# Insert some data
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query="INSERT INTO mutations_r1(d, x, s) VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_r1(d, x, s) VALUES \
('2000-01-01', 1, 'a')"
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query="INSERT INTO mutations_r1(d, x, s) VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_r1(d, x, s) VALUES \
('2000-01-01', 2, 'b'), ('2000-01-01', 3, 'c'), ('2000-01-01', 4, 'd') \
('2000-02-01', 2, 'b'), ('2000-02-01', 3, 'c'), ('2000-02-01', 4, 'd')"
@ -35,7 +35,7 @@ ${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE s = 'd' SETT
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_r1 DELETE WHERE m = 3 SETTINGS mutations_sync = 2"
# Insert more data
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query="INSERT INTO mutations_r1(d, x, s) VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_r1(d, x, s) VALUES \
('2000-01-01', 5, 'e'), ('2000-02-01', 5, 'e')"
${CLICKHOUSE_CLIENT} --query "SYSTEM SYNC REPLICA mutations_r2"
@ -49,8 +49,8 @@ ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, block_numbers.partiti
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test mutations cleaner ***'"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner_r1 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner_r2 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner_r2"
# Create 2 replicas with finished_mutations_to_keep = 2
${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/mutations_cleaner', 'r1') ORDER BY x SETTINGS \
@ -63,7 +63,7 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner_r2(x UInt32) ENGINE
cleanup_delay_period_random_add = 0"
# Insert some data
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query="INSERT INTO mutations_cleaner_r1(x) VALUES (1), (2), (3), (4), (5)"
${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner_r1(x) VALUES (1), (2), (3), (4), (5)"
# Add some mutations and wait for their execution
${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_cleaner_r1 DELETE WHERE x = 1 SETTINGS mutations_sync = 2"
@ -80,8 +80,8 @@ sleep 1.5
# Check that the first mutation is cleaned
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, is_done FROM system.mutations WHERE database = '$CLICKHOUSE_DATABASE' and table = 'mutations_cleaner_r2' ORDER BY mutation_id"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_r1 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_r2 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_r2"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_cleaner_r1 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_cleaner_r2 SYNC"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_cleaner_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE mutations_cleaner_r2"

View File

@ -2,8 +2,8 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00661 SYNC;
DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00661 SYNC;
DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00661;
DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00661;
CREATE TABLE partitioned_by_tuple_replica1_00661(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{database}/test/partitioned_by_tuple_00661', '1') PARTITION BY (d, x) ORDER BY (d, x, w);
CREATE TABLE partitioned_by_tuple_replica2_00661(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{database}/test/partitioned_by_tuple_00661', '2') PARTITION BY (d, x) ORDER BY (d, x, w);
@ -21,5 +21,5 @@ OPTIMIZE TABLE partitioned_by_tuple_replica1_00661 FINAL;
SYSTEM SYNC REPLICA partitioned_by_tuple_replica2_00661;
SELECT * FROM partitioned_by_tuple_replica2_00661 ORDER BY d, x, w, y;
DROP TABLE partitioned_by_tuple_replica1_00661 SYNC;
DROP TABLE partitioned_by_tuple_replica2_00661 SYNC;
DROP TABLE partitioned_by_tuple_replica1_00661;
DROP TABLE partitioned_by_tuple_replica2_00661;

View File

@ -9,8 +9,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
${CLICKHOUSE_CLIENT} -n --query="
DROP TABLE IF EXISTS fetches_r1 SYNC;
DROP TABLE IF EXISTS fetches_r2 SYNC"
DROP TABLE IF EXISTS fetches_r1;
DROP TABLE IF EXISTS fetches_r2"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE fetches_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/fetches', 'r1') ORDER BY x"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE fetches_r2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/fetches', 'r2') ORDER BY x \
@ -18,7 +18,6 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE fetches_r2(x UInt32) ENGINE Replicate
prefer_fetch_merged_part_size_threshold=0"
${CLICKHOUSE_CLIENT} -n --query="
SET insert_keeper_fault_injection_probability=0;
INSERT INTO fetches_r1 VALUES (1);
INSERT INTO fetches_r1 VALUES (2);
INSERT INTO fetches_r1 VALUES (3)"
@ -52,5 +51,5 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Check data after fetch/clone of mutate
${CLICKHOUSE_CLIENT} --query="SELECT _part, * FROM fetches_r2 ORDER BY x"
${CLICKHOUSE_CLIENT} -n --query="
DROP TABLE fetches_r1 SYNC;
DROP TABLE fetches_r2 SYNC"
DROP TABLE fetches_r1;
DROP TABLE fetches_r2"

View File

@ -12,7 +12,6 @@ $CLICKHOUSE_CLIENT -nm -q "
DROP TABLE IF EXISTS part_header_r1;
DROP TABLE IF EXISTS part_header_r2;
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync = 2;
CREATE TABLE part_header_r1(x UInt32, y UInt32)

View File

@ -1,10 +1,9 @@
-- Tags: zookeeper
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET check_query_single_value_result = 0;
SET send_logs_level = 'fatal';
DROP TABLE IF EXISTS mt_without_pk SYNC;
DROP TABLE IF EXISTS mt_without_pk;
CREATE TABLE mt_without_pk (SomeField1 Int64, SomeField2 Double) ENGINE = MergeTree() ORDER BY tuple();
@ -12,9 +11,9 @@ INSERT INTO mt_without_pk VALUES (1, 2);
CHECK TABLE mt_without_pk;
DROP TABLE IF EXISTS mt_without_pk SYNC;
DROP TABLE IF EXISTS mt_without_pk;
DROP TABLE IF EXISTS replicated_mt_without_pk SYNC;
DROP TABLE IF EXISTS replicated_mt_without_pk;
CREATE TABLE replicated_mt_without_pk (SomeField1 Int64, SomeField2 Double) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test_01037/replicated_mt_without_pk', '1') ORDER BY tuple();
@ -22,4 +21,4 @@ INSERT INTO replicated_mt_without_pk VALUES (1, 2);
CHECK TABLE replicated_mt_without_pk;
DROP TABLE IF EXISTS replicated_mt_without_pk SYNC;
DROP TABLE IF EXISTS replicated_mt_without_pk;

View File

@ -52,8 +52,7 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE replicated_table_for_mutations(k UInt
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP MERGES replicated_table_for_mutations"
# test relays on part ids, which are non-deterministic with keeper fault injections, so disable it
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query="INSERT INTO replicated_table_for_mutations select number, number from numbers(100000)"
${CLICKHOUSE_CLIENT} --query="INSERT INTO replicated_table_for_mutations select number, number from numbers(100000)"
${CLICKHOUSE_CLIENT} --query="SELECT sum(v1) FROM replicated_table_for_mutations"

View File

@ -1,8 +1,8 @@
-- Tags: long, zookeeper, no-replicated-database
-- Tag no-replicated-database: Fails due to additional replicas or shards
DROP TABLE IF EXISTS mutations_and_quorum1 SYNC;
DROP TABLE IF EXISTS mutations_and_quorum2 SYNC;
DROP TABLE IF EXISTS mutations_and_quorum1;
DROP TABLE IF EXISTS mutations_and_quorum2;
CREATE TABLE mutations_and_quorum1 (`server_date` Date, `something` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test_01090/mutations_and_quorum', '1') PARTITION BY toYYYYMM(server_date) ORDER BY (server_date, something);
CREATE TABLE mutations_and_quorum2 (`server_date` Date, `something` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test_01090/mutations_and_quorum', '2') PARTITION BY toYYYYMM(server_date) ORDER BY (server_date, something);
@ -10,9 +10,6 @@ CREATE TABLE mutations_and_quorum2 (`server_date` Date, `something` String) ENGI
-- Should not be larger then 600e6 (default timeout in clickhouse-test)
SET insert_quorum=2, insert_quorum_parallel=0, insert_quorum_timeout=300e3;
SET insert_keeper_max_retries=100;
SET insert_keeper_retry_max_backoff_ms=10;
INSERT INTO mutations_and_quorum1 VALUES ('2019-01-01', 'test1'), ('2019-02-01', 'test2'), ('2019-03-01', 'test3'), ('2019-04-01', 'test4'), ('2019-05-01', 'test1'), ('2019-06-01', 'test2'), ('2019-07-01', 'test3'), ('2019-08-01', 'test4'), ('2019-09-01', 'test1'), ('2019-10-01', 'test2'), ('2019-11-01', 'test3'), ('2019-12-01', 'test4');
ALTER TABLE mutations_and_quorum1 DELETE WHERE something = 'test1' SETTINGS mutations_sync=2;
@ -22,5 +19,5 @@ SELECT COUNT() FROM mutations_and_quorum2;
SELECT COUNT() FROM system.mutations WHERE database = currentDatabase() AND table like 'mutations_and_quorum%' and is_done = 0;
DROP TABLE IF EXISTS mutations_and_quorum1 SYNC;
DROP TABLE IF EXISTS mutations_and_quorum2 SYNC;
DROP TABLE IF EXISTS mutations_and_quorum1;
DROP TABLE IF EXISTS mutations_and_quorum2;

View File

@ -1,6 +1,6 @@
-- Tags: zookeeper
DROP TABLE IF EXISTS default_table SYNC;
DROP TABLE IF EXISTS default_table;
CREATE TABLE default_table
(
@ -26,4 +26,4 @@ ALTER TABLE default_table MODIFY COLUMN enum_column Enum8('undefined' = 0, 'fox'
SHOW CREATE TABLE default_table;
DROP TABLE IF EXISTS default_table SYNC;
DROP TABLE IF EXISTS default_table;

View File

@ -1,7 +1,5 @@
-- Tags: zookeeper
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
set send_logs_level='error';
drop table if exists mt;
drop table if exists rmt sync;

View File

@ -1,16 +1,13 @@
-- Tags: long, zookeeper, no-replicated-database, no-polymorphic-parts
-- Tag no-replicated-database: Fails due to additional replicas or shards
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
drop table if exists rmt sync;
drop table if exists rmt;
-- cleanup code will perform extra Exists
-- (so the .reference will not match)
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400, replicated_can_become_leader=0;
system sync replica rmt;
insert into rmt values (1);
insert into rmt values (1);
system sync replica rmt;
system flush logs;
select 'log';
@ -33,7 +30,7 @@ from system.zookeeper_log
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/blocks/%' and op_num not in (1, 12, 500))
order by xid, type, request_idx;
drop table rmt sync;
drop table rmt;
system flush logs;
select 'duration_ms';

View File

@ -1,9 +1,5 @@
-- Tags: long, replica
-- in case of keeper fault injection on insert, set bigger number of retries because partitions
set insert_keeper_max_retries=100;
set insert_keeper_retry_max_backoff_ms=10;
-- Testing basic functionality with compact parts
set replication_alter_partitions_sync = 2;
drop table if exists mt_compact;

View File

@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mutation_table SYNC"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mutation_table"
$CLICKHOUSE_CLIENT --query "
CREATE TABLE mutation_table(
@ -17,10 +17,9 @@ $CLICKHOUSE_CLIENT --query "
PARTITION BY key % 10
"
# disable keeper fault injection during insert since test checks part names. Part names can differ in case of retries during insert
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --query "INSERT INTO mutation_table select number, toString(number) from numbers(100000) where number % 10 != 0"
$CLICKHOUSE_CLIENT --query "INSERT INTO mutation_table select number, toString(number) from numbers(100000) where number % 10 != 0"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --query "INSERT INTO mutation_table VALUES(0, 'hello')"
$CLICKHOUSE_CLIENT --query "INSERT INTO mutation_table VALUES(0, 'hello')"
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM mutation_table"
@ -72,4 +71,4 @@ $CLICKHOUSE_CLIENT --query "SELECT is_done, parts_to_do FROM system.mutations wh
$CLICKHOUSE_CLIENT --query "SELECT type, new_part_name FROM system.replication_queue WHERE table='mutation_table' and database='$CLICKHOUSE_DATABASE'"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mutation_table SYNC"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mutation_table"

View File

@ -1,10 +1,7 @@
-- Tags: long, replica
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync=2;
DROP TABLE IF EXISTS test SYNC;
DROP TABLE IF EXISTS test2 SYNC;
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test2;
CREATE TABLE test (x Enum('hello' = 1, 'world' = 2), y String) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_01346/table', 'r1') PARTITION BY x ORDER BY y;
CREATE TABLE test2 (x Enum('hello' = 1, 'world' = 2), y String) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_01346/table', 'r2') PARTITION BY x ORDER BY y;
@ -20,6 +17,7 @@ ALTER TABLE test MODIFY COLUMN x Enum('hello' = 1, 'world' = 2, 'goodbye' = 3);
INSERT INTO test VALUES ('goodbye', 'test');
OPTIMIZE TABLE test FINAL;
SELECT * FROM test ORDER BY x;
SYSTEM SYNC REPLICA test2;
SELECT * FROM test2 ORDER BY x;
SELECT name, partition, partition_id FROM system.parts WHERE database = currentDatabase() AND table = 'test' AND active ORDER BY partition;
SELECT name, partition, partition_id FROM system.parts WHERE database = currentDatabase() AND table = 'test2' AND active ORDER BY partition;
@ -33,6 +31,7 @@ ALTER TABLE test MODIFY COLUMN x Int8;
INSERT INTO test VALUES (111, 'abc');
OPTIMIZE TABLE test FINAL;
SELECT * FROM test ORDER BY x;
SYSTEM SYNC REPLICA test2;
SELECT * FROM test2 ORDER BY x;
SELECT name, partition, partition_id FROM system.parts WHERE database = currentDatabase() AND table = 'test' AND active ORDER BY partition;
SELECT name, partition, partition_id FROM system.parts WHERE database = currentDatabase() AND table = 'test2' AND active ORDER BY partition;
@ -49,5 +48,5 @@ ALTER TABLE test RENAME COLUMN y TO z; -- { serverError 524 }
ALTER TABLE test DROP COLUMN x; -- { serverError 47 }
ALTER TABLE test DROP COLUMN y; -- { serverError 47 }
DROP TABLE test SYNC;
DROP TABLE test2 SYNC;
DROP TABLE test;
DROP TABLE test2;

View File

@ -12,9 +12,9 @@ FREEZE_OUT_STRUCTURE='backup_name String, backup_path String , part_backup_path
# setup
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated SYNC;"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;"
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query "INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);"
${CLICKHOUSE_CLIENT} --query "INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);"
${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;" \
| ${CLICKHOUSE_LOCAL} --structure "$ALTER_OUT_STRUCTURE, $FREEZE_OUT_STRUCTURE" \
@ -25,7 +25,7 @@ ${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE PAR
--query "SELECT command_type, partition_id, part_name, backup_name FROM table"
${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated DETACH PARTITION '3';"
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query "INSERT INTO table_for_freeze_replicated VALUES (3, '3');"
${CLICKHOUSE_CLIENT} --query "INSERT INTO table_for_freeze_replicated VALUES (3, '3');"
${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated ATTACH PARTITION '3' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;" \
| ${CLICKHOUSE_LOCAL} --structure "$ALTER_OUT_STRUCTURE, $ATTACH_OUT_STRUCTURE" \
@ -38,4 +38,4 @@ ${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE PAR
--query "SELECT command_type, partition_id, part_name, backup_name, old_part_name FROM table"
# teardown
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated SYNC;"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated;"

View File

@ -1,6 +1,5 @@
-- Tags: long, replica, no-replicated-database
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync = 2;

View File

@ -1,11 +1,10 @@
-- Tags: long, replica, no-replicated-database
-- Tag no-replicated-database: Fails due to additional replicas or shards
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS replica1 SYNC;
DROP TABLE IF EXISTS replica2 SYNC;
DROP TABLE IF EXISTS replica1;
DROP TABLE IF EXISTS replica2;
CREATE TABLE replica1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/'||currentDatabase()||'test/01451/attach', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE replica2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/'||currentDatabase()||'test/01451/attach', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
@ -49,5 +48,5 @@ SELECT v FROM replica1 ORDER BY v;
SELECT name FROM system.parts WHERE table = 'replica2' AND active AND database = currentDatabase();
DROP TABLE replica1 SYNC;
DROP TABLE replica2 SYNC;
DROP TABLE replica1;
DROP TABLE replica2;

View File

@ -11,7 +11,7 @@ NUM_REPLICAS=10
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS r$i SYNC;
DROP TABLE IF EXISTS r$i;
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', 'r$i') ORDER BY x;
"
done
@ -20,11 +20,7 @@ function thread {
for x in {0..99}; do
# sometimes we can try to commit obsolete part if fetches will be quite fast,
# so supress warning messages like "Tried to commit obsolete part ... covered by ..."
# (2) keeper fault injection for inserts because
# it can be a cause of deduplicated parts be visible to SELECTs for sometime (until cleanup thread remove them),
# so the same SELECT on different replicas can return different results, i.e. test output will be non-deterministic
# (see #9712)
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $x % $NUM_REPLICAS = $1 ? $x - 1 : $x" 2>/dev/null # Replace some records as duplicates so they will be written by other replicas
$CLICKHOUSE_CLIENT --query "INSERT INTO r$1 SELECT $x % $NUM_REPLICAS = $1 ? $x - 1 : $x" 2>/dev/null # Replace some records as duplicates so they will be written by other replicas
done
}
@ -41,5 +37,5 @@ for i in $(seq 1 $NUM_REPLICAS); do
done
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS r$i SYNC;"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS r$i;"
done

View File

@ -12,7 +12,7 @@ NUM_REPLICAS=10
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS r$i SYNC;
DROP TABLE IF EXISTS r$i;
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', 'r$i') ORDER BY x;
"
done
@ -22,7 +22,7 @@ valid_exceptions_to_retry='Quorum for previous write has not been satisfied yet|
function thread {
for x in {0..99}; do
while true; do
$CLICKHOUSE_CLIENT --insert_quorum 5 --insert_quorum_parallel 0 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $x" 2>&1 | grep -qE "$valid_exceptions_to_retry" || break
$CLICKHOUSE_CLIENT --insert_quorum 5 --insert_quorum_parallel 0 --query "INSERT INTO r$1 SELECT $x" 2>&1 | grep -qE "$valid_exceptions_to_retry" || break
done
done
}
@ -40,5 +40,5 @@ for i in $(seq 1 $NUM_REPLICAS); do
done
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS r$i SYNC;"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS r$i;"
done

View File

@ -12,7 +12,7 @@ NUM_REPLICAS=6
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS r$i SYNC;
DROP TABLE IF EXISTS r$i;
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_quorum_many', 'r$i') ORDER BY x;
"
done
@ -20,7 +20,7 @@ done
function thread {
i=0 retries=300
while [[ $i -lt $retries ]]; do # server can be dead
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 1 --insert_keeper_max_retries=100 --insert_keeper_retry_max_backoff_ms=10 --query "INSERT INTO r$1 SELECT $2" && break
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 1 --query "INSERT INTO r$1 SELECT $2" && break
((++i))
sleep 0.1
done

View File

@ -9,8 +9,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1 SYNC"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2 SYNC"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/parallel_q', 'r1') ORDER BY tuple() SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0"
@ -19,10 +19,9 @@ $CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q2 (x UInt64) ENGINE=ReplicatedMerg
$CLICKHOUSE_CLIENT -q "SYSTEM STOP REPLICATION QUEUES parallel_q2"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO parallel_q1 VALUES (1)"
$CLICKHOUSE_CLIENT -q "INSERT INTO parallel_q1 VALUES (1)"
# disable keeper fault injection during insert since test checks part names. Part names can differ in case of retries during insert
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --insert_keeper_fault_injection_probability=0 --query="INSERT INTO parallel_q1 VALUES (2)" &
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --query="INSERT INTO parallel_q1 VALUES (2)" &
part_count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}'")
@ -67,5 +66,5 @@ $CLICKHOUSE_CLIENT --query="SELECT event_type FROM system.part_log WHERE table='
$CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM parallel_q2"
$CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM parallel_q1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1 SYNC"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2 SYNC"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2"

View File

@ -16,9 +16,6 @@ CREATE TABLE r2 (
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/01509_parallel_quorum_insert_no_replicas', '2')
ORDER BY tuple();
SET insert_keeper_max_retries=100;
SET insert_keeper_retry_max_backoff_ms=10;
SET insert_quorum_parallel=1;
SET insert_quorum=3;
@ -82,11 +79,11 @@ SYSTEM STOP FETCHES r2;
SET insert_quorum_timeout=0;
INSERT INTO r1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (4, '4'); -- { serverError 319 }
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
-- retry should fail despite the insert_deduplicate enabled
INSERT INTO r1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (4, '4'); -- { serverError 319 }
INSERT INTO r1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (4, '4'); -- { serverError 319 }
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
SELECT * FROM r2 WHERE key=4;
SYSTEM START FETCHES r2;
@ -102,5 +99,5 @@ SELECT 'insert happened';
SELECT COUNT() FROM r1;
SELECT COUNT() FROM r2;
DROP TABLE IF EXISTS r1 SYNC;
DROP TABLE IF EXISTS r2 SYNC;
DROP TABLE IF EXISTS r1;
DROP TABLE IF EXISTS r2;

View File

@ -2,8 +2,6 @@
-- Tag no-replicated-database: Fails due to additional replicas or shards
-- Tag no-parallel: static zk path
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
DROP TABLE IF EXISTS execute_on_single_replica_r1 NO DELAY;
DROP TABLE IF EXISTS execute_on_single_replica_r2 NO DELAY;

View File

@ -1,6 +1,6 @@
-- Tags: zookeeper
DROP TABLE IF EXISTS merge_tree_pk SYNC;
DROP TABLE IF EXISTS merge_tree_pk;
CREATE TABLE merge_tree_pk
(
@ -24,9 +24,9 @@ ATTACH TABLE merge_tree_pk;
SELECT * FROM merge_tree_pk FINAL ORDER BY key, value;
DROP TABLE IF EXISTS merge_tree_pk SYNC;
DROP TABLE IF EXISTS merge_tree_pk;
DROP TABLE IF EXISTS merge_tree_pk_sql SYNC;
DROP TABLE IF EXISTS merge_tree_pk_sql;
CREATE TABLE merge_tree_pk_sql
(
@ -60,9 +60,9 @@ SELECT * FROM merge_tree_pk_sql FINAL ORDER BY key, value;
SHOW CREATE TABLE merge_tree_pk_sql;
DROP TABLE IF EXISTS merge_tree_pk_sql SYNC;
DROP TABLE IF EXISTS merge_tree_pk_sql;
DROP TABLE IF EXISTS replicated_merge_tree_pk_sql SYNC;
DROP TABLE IF EXISTS replicated_merge_tree_pk_sql;
CREATE TABLE replicated_merge_tree_pk_sql
(
@ -99,4 +99,4 @@ ATTACH TABLE replicated_merge_tree_pk_sql;
SHOW CREATE TABLE replicated_merge_tree_pk_sql;
DROP TABLE IF EXISTS replicated_merge_tree_pk_sql SYNC;
DROP TABLE IF EXISTS replicated_merge_tree_pk_sql;

View File

@ -1,6 +1,6 @@
-- Tags: replica
DROP TABLE IF EXISTS replicated_mutations_empty_partitions SYNC;
DROP TABLE IF EXISTS replicated_mutations_empty_partitions;
CREATE TABLE replicated_mutations_empty_partitions
(
@ -11,8 +11,7 @@ ENGINE = ReplicatedMergeTree('/clickhouse/test/'||currentDatabase()||'/01586_rep
ORDER BY key
PARTITION by key;
-- insert_keeper* settings are adjusted since several actual inserts are happening behind one statement due to partitioning i.e. inserts in different partitions
INSERT INTO replicated_mutations_empty_partitions SETTINGS insert_keeper_max_retries=100, insert_keeper_retry_max_backoff_ms=10 SELECT number, toString(number) FROM numbers(10);
INSERT INTO replicated_mutations_empty_partitions SELECT number, toString(number) FROM numbers(10);
SELECT count(distinct value) FROM replicated_mutations_empty_partitions;
@ -32,4 +31,4 @@ SELECT sum(value) FROM replicated_mutations_empty_partitions;
SHOW CREATE TABLE replicated_mutations_empty_partitions;
DROP TABLE IF EXISTS replicated_mutations_empty_partitions SYNC;
DROP TABLE IF EXISTS replicated_mutations_empty_partitions;

View File

@ -5,11 +5,11 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill SYNC"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill"
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_mutate_kill (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/concurrent_mutate_kill', '1') ORDER BY key PARTITION BY key % 100 SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --query "INSERT INTO concurrent_mutate_kill SELECT number, toString(number) FROM numbers(1000000)"
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_mutate_kill SELECT number, toString(number) FROM numbers(1000000)"
function alter_thread
{
@ -67,4 +67,4 @@ done
$CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE concurrent_mutate_kill"
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE concurrent_mutate_kill FINAL"
$CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM concurrent_mutate_kill"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill SYNC"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill"

View File

@ -1,8 +1,7 @@
-- Tags: long, zookeeper, no-replicated-database
-- Tag no-replicated-database: Fails due to additional replicas or shards
SET insert_keeper_fault_injection_probability=0;
DROP TABLE IF EXISTS partitioned_table SYNC;
DROP TABLE IF EXISTS partitioned_table;
CREATE TABLE partitioned_table (
key UInt64,
@ -48,4 +47,4 @@ SELECT partition_id, name FROM system.parts WHERE table = 'partitioned_table' AN
SELECT substring(name, 1, 2), value FROM system.zookeeper WHERE path='/clickhouse/' || currentDatabase() || '/01650_drop_part_and_deduplication_partitioned_table/blocks/' ORDER BY value;
DROP TABLE IF EXISTS partitioned_table SYNC;
DROP TABLE IF EXISTS partitioned_table;

View File

@ -1,7 +1,7 @@
-- Tags: long, zookeeper
DROP TABLE IF EXISTS i20203_1 SYNC;
DROP TABLE IF EXISTS i20203_2 SYNC;
DROP TABLE IF EXISTS i20203_1;
DROP TABLE IF EXISTS i20203_2;
CREATE TABLE i20203_1 (a Int8)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/01715_background_checker_i20203', 'r1')
@ -26,5 +26,5 @@ WHERE table = 'i20203_2' AND database = currentDatabase();
ATTACH TABLE i20203_1;
DROP TABLE i20203_1 SYNC;
DROP TABLE i20203_2 SYNC;
DROP TABLE IF EXISTS i20203_1;
DROP TABLE IF EXISTS i20203_2;

View File

@ -51,9 +51,6 @@ $CLICKHOUSE_CLIENT -nm -q """
partition by key%100
settings max_part_removal_threads=10, concurrent_part_removal_threshold=99, min_bytes_for_wide_part=0;
SET insert_keeper_max_retries=1000;
SET insert_keeper_retry_max_backoff_ms=10;
insert into rep_data_01810 select * from numbers(100);
drop table rep_data_01810 settings log_queries=1;
system flush logs;

View File

@ -2,8 +2,6 @@
{% for engine in ["ReplicatedMergeTree('/clickhouse/tables/{database}/test_01825_3/t_json_3', 'r1') ORDER BY tuple()", "Memory"] -%}
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET allow_experimental_object_type = 1;
DROP TABLE IF EXISTS t_json_3;

View File

@ -1,6 +1,3 @@
SET insert_keeper_max_retries=100;
SET insert_keeper_retry_max_backoff_ms=10;
SELECT 'simple partition key:';
DROP TABLE IF EXISTS table1 SYNC;
CREATE TABLE table1 (id Int64, v UInt64)
@ -18,7 +15,7 @@ select 'where id % 200 < 0:';
select id from table1 where id % 200 < 0 order by id;
SELECT 'tuple as partition key:';
DROP TABLE IF EXISTS table2 SYNC;
DROP TABLE IF EXISTS table2;
CREATE TABLE table2 (id Int64, v UInt64)
ENGINE = MergeTree()
PARTITION BY (toInt32(id / 2) % 3, id % 200) ORDER BY id;
@ -27,7 +24,7 @@ INSERT INTO table2 SELECT number-205, number FROM numbers(400, 10);
SELECT partition as p FROM system.parts WHERE table='table2' and database=currentDatabase() ORDER BY p;
SELECT 'recursive modulo partition key:';
DROP TABLE IF EXISTS table3 SYNC;
DROP TABLE IF EXISTS table3;
CREATE TABLE table3 (id Int64, v UInt64)
ENGINE = MergeTree()
PARTITION BY (id % 200, (id % 200) % 10, toInt32(round((id % 200) / 2, 0))) ORDER BY id;
@ -41,7 +38,7 @@ SELECT 'After detach:';
SELECT partition as p FROM system.parts WHERE table='table3' and database=currentDatabase() ORDER BY p;
SELECT 'Indexes:';
DROP TABLE IF EXISTS table4 SYNC;
DROP TABLE IF EXISTS table4;
CREATE TABLE table4 (id Int64, v UInt64, s String,
INDEX a (id * 2, s) TYPE minmax GRANULARITY 3
) ENGINE = MergeTree() PARTITION BY id % 10 ORDER BY v;

View File

@ -1,5 +1,4 @@
-- Tags: long, no-parallel
SET insert_keeper_fault_injection_probability=0; -- to succeed this test can require too many retries due to 1024 partitions, so disable fault injections
-- regression for MEMORY_LIMIT_EXCEEDED error because of deferred final part flush
@ -9,8 +8,8 @@ insert into data_02228 select number, 1, number from numbers_mt(100e3) settings
insert into data_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
drop table data_02228;
drop table if exists data_rep_02228 SYNC;
drop table if exists data_rep_02228;
create table data_rep_02228 (key1 UInt32, sign Int8, s UInt64) engine = ReplicatedCollapsingMergeTree('/clickhouse/{database}', 'r1', sign) order by (key1) partition by key1 % 1024;
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=0;
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
drop table data_rep_02228 SYNC;
drop table data_rep_02228;

View File

@ -10,7 +10,7 @@ function check_refcnt_for_table()
local table=$1 && shift
$CLICKHOUSE_CLIENT -q "system stop merges $table"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into $table select number, number%4 from numbers(200)"
$CLICKHOUSE_CLIENT -q "insert into $table select number, number%4 from numbers(200)"
local query_id
query_id="$table-$(random_str 10)"
@ -52,7 +52,7 @@ $CLICKHOUSE_CLIENT -nmq "
check_refcnt_for_table data_02340
$CLICKHOUSE_CLIENT -nmq "
drop table if exists data_02340_rep sync;
drop table if exists data_02340_rep;
create table data_02340_rep (key Int, part Int) engine=ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '1') partition by part order by key settings index_granularity=1;
" || exit 1
check_refcnt_for_table data_02340_rep

View File

@ -1,18 +1,18 @@
#!/usr/bin/env bash
# Tags: long, zookeeper
# Tags: zookeeper
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists rmt1 sync;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt2 sync;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt1;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt2;"
$CLICKHOUSE_CLIENT -q "create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '1') order by n;"
$CLICKHOUSE_CLIENT -q "create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '2') order by n;"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt1 values (1);"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt1 values (2);"
$CLICKHOUSE_CLIENT -q "insert into rmt1 values (1);"
$CLICKHOUSE_CLIENT -q "insert into rmt1 values (2);"
$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt2;"
@ -32,7 +32,7 @@ $CLICKHOUSE_CLIENT -q "select * from rmt1;" 2>/dev/null
$CLICKHOUSE_CLIENT -q "detach table rmt1;"
$CLICKHOUSE_CLIENT -q "attach table rmt1;"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt1 values (3);"
$CLICKHOUSE_CLIENT -q "insert into rmt1 values (3);"
$CLICKHOUSE_CLIENT -q "system start merges rmt2;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "optimize table rmt1 final;"
@ -42,5 +42,5 @@ $CLICKHOUSE_CLIENT -q "system sync replica rmt2;"
$CLICKHOUSE_CLIENT -q "select 3, *, _part from rmt1 order by n;"
$CLICKHOUSE_CLIENT -q "select 4, *, _part from rmt2 order by n;"
$CLICKHOUSE_CLIENT -q "drop table rmt1 sync;"
$CLICKHOUSE_CLIENT -q "drop table rmt2 sync;"
$CLICKHOUSE_CLIENT -q "drop table rmt1;"
$CLICKHOUSE_CLIENT -q "drop table rmt2;"

View File

@ -1,7 +1,5 @@
-- Tags: long
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
drop table if exists rmt1;
drop table if exists rmt2;
create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '1') order by tuple()

View File

@ -1,26 +0,0 @@
-- Tags: replica
DROP TABLE IF EXISTS keeper_retries_r1 SYNC;
DROP TABLE IF EXISTS keeper_retries_r2 SYNC;
CREATE TABLE keeper_retries_r1(a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/02456_keeper_retries_during_insert', 'r1') ORDER BY tuple ();
CREATE TABLE keeper_retries_r2(a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/02456_keeper_retries_during_insert', 'r2') ORDER BY tuple();
INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (1);
INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_probability=1, insert_keeper_max_retries=0 VALUES (2); -- { serverError KEEPER_EXCEPTION }
INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_probability=1, insert_keeper_retry_max_backoff_ms=10 VALUES (3); -- { serverError KEEPER_EXCEPTION }
SET insert_quorum=2;
INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (11);
INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_probability=1, insert_keeper_max_retries=0 VALUES (12); -- { serverError KEEPER_EXCEPTION }
INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_probability=1, insert_keeper_retry_max_backoff_ms=1 VALUES (13); -- { serverError KEEPER_EXCEPTION }
-- INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_mode=1, insert_keeper_fault_injection_probability=0.05, insert_keeper_fault_injection_seed=1 VALUES (21);
-- INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_mode=1, insert_keeper_fault_injection_probability=0.2, insert_keeper_max_retries=100, insert_keeper_retry_max_backoff_ms=1, insert_keeper_fault_injection_seed=2 VALUES (22);
-- INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_mode=1, insert_keeper_fault_injection_probability=0.3, insert_keeper_max_retries=100, insert_keeper_retry_max_backoff_ms=1, insert_keeper_fault_injection_seed=3 VALUES (23);
-- INSERT INTO keeper_retries_r1 SETTINGS insert_keeper_fault_injection_mode=1, insert_keeper_fault_injection_probability=0.4, insert_keeper_max_retries=100, insert_keeper_retry_max_backoff_ms=1, insert_keeper_fault_injection_seed=4 VALUES (24);
SELECT * FROM keeper_retries_r1 order by a;
DROP TABLE keeper_retries_r1 SYNC;
DROP TABLE keeper_retries_r2 SYNC;