mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 13:02:00 +00:00
Don't ignore duplicate parts written to replicas
This commit is contained in:
parent
f0eafed520
commit
e44e1ad0d4
@ -1754,16 +1754,27 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
|
||||
bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
|
||||
{
|
||||
auto removed = renameTempPartAndReplace(part, increment, out_transaction);
|
||||
if (!removed.empty())
|
||||
throw Exception("Added part " + part->name + " covers " + toString(removed.size())
|
||||
+ " existing part(s) (including " + removed[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
|
||||
if (out_transaction && &out_transaction->data != this)
|
||||
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
DataPartsVector covered_parts;
|
||||
{
|
||||
auto lock = lockParts();
|
||||
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts))
|
||||
return false;
|
||||
}
|
||||
if (!covered_parts.empty())
|
||||
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
|
||||
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::renameTempPartAndReplace(
|
||||
bool MergeTreeData::renameTempPartAndReplace(
|
||||
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction,
|
||||
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts)
|
||||
{
|
||||
@ -1816,7 +1827,7 @@ void MergeTreeData::renameTempPartAndReplace(
|
||||
if (covering_part)
|
||||
{
|
||||
LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part_name, covering_part->getNameWithState());
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// All checks are passed. Now we can rename the part on disk.
|
||||
@ -1854,6 +1865,8 @@ void MergeTreeData::renameTempPartAndReplace(
|
||||
for (DataPartPtr & covered_part : covered_parts)
|
||||
out_covered_parts->emplace_back(std::move(covered_part));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||
|
@ -423,7 +423,8 @@ public:
|
||||
/// If out_transaction != nullptr, adds the part in the PreCommitted state (the part will be added to the
|
||||
/// active set later with out_transaction->commit()).
|
||||
/// Else, commits the part immediately.
|
||||
void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
|
||||
/// Returns true if part was added. Returns false if part is covered by bigger part.
|
||||
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
|
||||
|
||||
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
|
||||
/// Returns all parts covered by the added part (in ascending order).
|
||||
@ -432,7 +433,7 @@ public:
|
||||
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
|
||||
|
||||
/// Low-level version of previous one, doesn't lock mutex
|
||||
void renameTempPartAndReplace(
|
||||
bool renameTempPartAndReplace(
|
||||
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
|
||||
DataPartsVector * out_covered_parts = nullptr);
|
||||
|
||||
|
@ -27,6 +27,7 @@ namespace ErrorCodes
|
||||
extern const int INSERT_WAS_DEDUPLICATED;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
extern const int NO_ACTIVE_REPLICAS;
|
||||
extern const int DUPLICATE_DATA_PART;
|
||||
}
|
||||
|
||||
|
||||
@ -204,165 +205,223 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
storage.check(part->getColumns());
|
||||
assertSessionIsNotExpired(zookeeper);
|
||||
|
||||
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
||||
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
|
||||
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
|
||||
String temporary_part_name = part->name;
|
||||
|
||||
/// Allocate new block number and check for duplicates
|
||||
bool deduplicate_block = !block_id.empty();
|
||||
String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "";
|
||||
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
|
||||
|
||||
if (!block_number_lock)
|
||||
while (true)
|
||||
{
|
||||
LOG_INFO(log, "Block with ID {} already exists; ignoring it.", block_id);
|
||||
part->is_duplicate = true;
|
||||
last_block_is_duplicate = true;
|
||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||
return;
|
||||
}
|
||||
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
||||
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
|
||||
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
|
||||
|
||||
Int64 block_number = block_number_lock->getNumber();
|
||||
/// Allocate new block number and check for duplicates
|
||||
bool deduplicate_block = !block_id.empty();
|
||||
String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "";
|
||||
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
|
||||
|
||||
/// Set part attributes according to part_number. Prepare an entry for log.
|
||||
|
||||
part->info.min_block = block_number;
|
||||
part->info.max_block = block_number;
|
||||
part->info.level = 0;
|
||||
|
||||
String part_name = part->getNewName(part->info);
|
||||
part->name = part_name;
|
||||
|
||||
StorageReplicatedMergeTree::LogEntry log_entry;
|
||||
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
|
||||
log_entry.create_time = time(nullptr);
|
||||
log_entry.source_replica = storage.replica_name;
|
||||
log_entry.new_part_name = part_name;
|
||||
log_entry.quorum = quorum;
|
||||
log_entry.block_id = block_id;
|
||||
|
||||
/// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
|
||||
|
||||
/// Information about the part.
|
||||
Coordination::Requests ops;
|
||||
|
||||
storage.getCommitPartOps(ops, part, block_id_path);
|
||||
|
||||
/// Replication log.
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
storage.zookeeper_path + "/log/log-",
|
||||
log_entry.toString(),
|
||||
zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
/// Deletes the information that the block number is used for writing.
|
||||
block_number_lock->getUnlockOps(ops);
|
||||
|
||||
/** If you need a quorum - create a node in which the quorum is monitored.
|
||||
* (If such a node already exists, then someone has managed to make another quorum record at the same time, but for it the quorum has not yet been reached.
|
||||
* You can not do the next quorum record at this time.)
|
||||
*/
|
||||
if (quorum)
|
||||
{
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
quorum_entry.part_name = part_name;
|
||||
quorum_entry.required_number_of_replicas = quorum;
|
||||
quorum_entry.replicas.insert(storage.replica_name);
|
||||
|
||||
/** At this point, this node will contain information that the current replica received a part.
|
||||
* When other replicas will receive this part (in the usual way, processing the replication log),
|
||||
* they will add themselves to the contents of this node.
|
||||
* When it contains information about `quorum` number of replicas, this node is deleted,
|
||||
* which indicates that the quorum has been reached.
|
||||
*/
|
||||
|
||||
ops.emplace_back(
|
||||
zkutil::makeCreateRequest(
|
||||
quorum_info.status_path,
|
||||
quorum_entry.toString(),
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished).
|
||||
ops.emplace_back(
|
||||
zkutil::makeCheckRequest(
|
||||
storage.replica_path + "/is_active",
|
||||
quorum_info.is_active_node_version));
|
||||
|
||||
/// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version.
|
||||
/// But then the `host` value will change. We will check this.
|
||||
/// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread).
|
||||
ops.emplace_back(
|
||||
zkutil::makeCheckRequest(
|
||||
storage.replica_path + "/host",
|
||||
quorum_info.host_node_version));
|
||||
}
|
||||
|
||||
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
||||
storage.renameTempPartAndAdd(part, nullptr, &transaction);
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
|
||||
|
||||
if (multi_code == Coordination::Error::ZOK)
|
||||
{
|
||||
transaction.commit();
|
||||
storage.merge_selecting_task->schedule();
|
||||
|
||||
/// Lock nodes have been already deleted, do not delete them in destructor
|
||||
block_number_lock->assumeUnlocked();
|
||||
}
|
||||
else if (multi_code == Coordination::Error::ZCONNECTIONLOSS
|
||||
|| multi_code == Coordination::Error::ZOPERATIONTIMEOUT)
|
||||
{
|
||||
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
|
||||
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
|
||||
*/
|
||||
transaction.commit();
|
||||
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
|
||||
|
||||
/// We do not know whether or not data has been inserted.
|
||||
throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)),
|
||||
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
|
||||
}
|
||||
else if (Coordination::isUserError(multi_code))
|
||||
{
|
||||
String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp();
|
||||
|
||||
if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
|
||||
Int64 block_number;
|
||||
String existing_part_name;
|
||||
if (block_number_lock)
|
||||
{
|
||||
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
|
||||
LOG_INFO(log, "Block with ID {} already exists; ignoring it (removing part {})", block_id, part->name);
|
||||
block_number = block_number_lock->getNumber();
|
||||
|
||||
part->is_duplicate = true;
|
||||
transaction.rollback();
|
||||
last_block_is_duplicate = true;
|
||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||
}
|
||||
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
|
||||
{
|
||||
transaction.rollback();
|
||||
/// Set part attributes according to part_number. Prepare an entry for log.
|
||||
|
||||
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
||||
part->info.min_block = block_number;
|
||||
part->info.max_block = block_number;
|
||||
part->info.level = 0;
|
||||
|
||||
part->name = part->getNewName(part->info);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
|
||||
transaction.rollback();
|
||||
throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
||||
+ Coordination::errorMessage(multi_code) + ", path " + failed_op_path,
|
||||
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
||||
/// This block was already written to some replica. Get the part name for it.
|
||||
/// Note: race condition with DROP PARTITION operation is possible. User will get "No node" exception and it is Ok.
|
||||
existing_part_name = zookeeper->get(storage.zookeeper_path + "/blocks/" + block_id);
|
||||
|
||||
/// If it exists on our replica, ignore it.
|
||||
if (storage.getActiveContainingPart(existing_part_name))
|
||||
{
|
||||
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
|
||||
part->is_duplicate = true;
|
||||
last_block_is_duplicate = true;
|
||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.",
|
||||
block_id, existing_part_name);
|
||||
|
||||
/// If it does not exist, we will write a new part with existing name.
|
||||
/// Note that it may also appear on filesystem right now in PreCommitted state due to concurrent inserts of the same data.
|
||||
/// It will be checked when we will try to rename directory.
|
||||
|
||||
part->name = existing_part_name;
|
||||
part->info = MergeTreePartInfo::fromPartName(existing_part_name, storage.format_version);
|
||||
|
||||
/// Don't do subsequent duplicate check.
|
||||
block_id_path.clear();
|
||||
}
|
||||
}
|
||||
else if (Coordination::isHardwareError(multi_code))
|
||||
{
|
||||
transaction.rollback();
|
||||
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
||||
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
||||
}
|
||||
else
|
||||
{
|
||||
transaction.rollback();
|
||||
throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
||||
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
||||
|
||||
StorageReplicatedMergeTree::LogEntry log_entry;
|
||||
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
|
||||
log_entry.create_time = time(nullptr);
|
||||
log_entry.source_replica = storage.replica_name;
|
||||
log_entry.new_part_name = part->name;
|
||||
log_entry.quorum = quorum;
|
||||
log_entry.block_id = block_id;
|
||||
|
||||
/// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
|
||||
|
||||
/// Information about the part.
|
||||
Coordination::Requests ops;
|
||||
|
||||
storage.getCommitPartOps(ops, part, block_id_path);
|
||||
|
||||
/// Replication log.
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
storage.zookeeper_path + "/log/log-",
|
||||
log_entry.toString(),
|
||||
zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
/// Deletes the information that the block number is used for writing.
|
||||
if (block_number_lock)
|
||||
block_number_lock->getUnlockOps(ops);
|
||||
|
||||
/** If you need a quorum - create a node in which the quorum is monitored.
|
||||
* (If such a node already exists, then someone has managed to make another quorum record at the same time,
|
||||
* but for it the quorum has not yet been reached.
|
||||
* You can not do the next quorum record at this time.)
|
||||
*/
|
||||
if (quorum) /// TODO Duplicate blocks.
|
||||
{
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
quorum_entry.part_name = part->name;
|
||||
quorum_entry.required_number_of_replicas = quorum;
|
||||
quorum_entry.replicas.insert(storage.replica_name);
|
||||
|
||||
/** At this point, this node will contain information that the current replica received a part.
|
||||
* When other replicas will receive this part (in the usual way, processing the replication log),
|
||||
* they will add themselves to the contents of this node.
|
||||
* When it contains information about `quorum` number of replicas, this node is deleted,
|
||||
* which indicates that the quorum has been reached.
|
||||
*/
|
||||
|
||||
ops.emplace_back(
|
||||
zkutil::makeCreateRequest(
|
||||
quorum_info.status_path,
|
||||
quorum_entry.toString(),
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished).
|
||||
ops.emplace_back(
|
||||
zkutil::makeCheckRequest(
|
||||
storage.replica_path + "/is_active",
|
||||
quorum_info.is_active_node_version));
|
||||
|
||||
/// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version.
|
||||
/// But then the `host` value will change. We will check this.
|
||||
/// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread).
|
||||
ops.emplace_back(
|
||||
zkutil::makeCheckRequest(
|
||||
storage.replica_path + "/host",
|
||||
quorum_info.host_node_version));
|
||||
}
|
||||
|
||||
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
||||
bool renamed = false;
|
||||
try
|
||||
{
|
||||
renamed = storage.renameTempPartAndAdd(part, nullptr, &transaction);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (e.code() != ErrorCodes::DUPLICATE_DATA_PART)
|
||||
throw;
|
||||
}
|
||||
if (!renamed)
|
||||
{
|
||||
if (!existing_part_name.empty())
|
||||
{
|
||||
LOG_INFO(log, "Part {} is duplicate and it is already written by concurrent request; ignoring it.", block_id, existing_part_name);
|
||||
return;
|
||||
}
|
||||
else
|
||||
throw Exception("Part with name {} is already written by concurrent request. It should not happen for non-duplicate data parts because unique names are assigned for them. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
|
||||
|
||||
if (multi_code == Coordination::Error::ZOK)
|
||||
{
|
||||
transaction.commit();
|
||||
storage.merge_selecting_task->schedule();
|
||||
|
||||
/// Lock nodes have been already deleted, do not delete them in destructor
|
||||
if (block_number_lock)
|
||||
block_number_lock->assumeUnlocked();
|
||||
}
|
||||
else if (multi_code == Coordination::Error::ZCONNECTIONLOSS
|
||||
|| multi_code == Coordination::Error::ZOPERATIONTIMEOUT)
|
||||
{
|
||||
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
|
||||
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
|
||||
*/
|
||||
transaction.commit();
|
||||
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
|
||||
|
||||
/// We do not know whether or not data has been inserted.
|
||||
throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)),
|
||||
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
|
||||
}
|
||||
else if (Coordination::isUserError(multi_code))
|
||||
{
|
||||
String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp();
|
||||
|
||||
if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
|
||||
{
|
||||
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
|
||||
LOG_INFO(log, "Block with ID {} already exists (it was just appeared). Renaming part {} back to {}. Will retry write.",
|
||||
block_id, part->name, temporary_part_name);
|
||||
|
||||
transaction.rollback();
|
||||
|
||||
part->is_duplicate = true;
|
||||
part->is_temp = true;
|
||||
part->state = MergeTreeDataPartState::Temporary;
|
||||
part->renameTo(temporary_part_name);
|
||||
|
||||
continue;
|
||||
}
|
||||
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
|
||||
{
|
||||
transaction.rollback();
|
||||
|
||||
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
|
||||
transaction.rollback();
|
||||
throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
||||
+ Coordination::errorMessage(multi_code) + ", path " + failed_op_path,
|
||||
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
||||
}
|
||||
}
|
||||
else if (Coordination::isHardwareError(multi_code))
|
||||
{
|
||||
transaction.rollback();
|
||||
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
||||
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
||||
}
|
||||
else
|
||||
{
|
||||
transaction.rollback();
|
||||
throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
||||
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (quorum)
|
||||
@ -386,7 +445,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
|
||||
|
||||
/// If the node has time to disappear, and then appear again for the next insert.
|
||||
if (quorum_entry.part_name != part_name)
|
||||
if (quorum_entry.part_name != part->name)
|
||||
break;
|
||||
|
||||
if (!event->tryWait(quorum_timeout_ms))
|
||||
|
@ -0,0 +1,6 @@
|
||||
Hello, world
|
||||
---
|
||||
Hello, world
|
||||
Hello, world
|
||||
Hello, world
|
||||
Hello, world
|
25
tests/queries/0_stateless/01319_manual_write_to_replicas.sql
Normal file
25
tests/queries/0_stateless/01319_manual_write_to_replicas.sql
Normal file
@ -0,0 +1,25 @@
|
||||
DROP TABLE IF EXISTS r1;
|
||||
DROP TABLE IF EXISTS r2;
|
||||
|
||||
CREATE TABLE r1 (x String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', 'r1') ORDER BY x;
|
||||
CREATE TABLE r2 (x String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', 'r2') ORDER BY x;
|
||||
|
||||
SYSTEM STOP REPLICATED SENDS;
|
||||
|
||||
INSERT INTO r1 VALUES ('Hello, world');
|
||||
SELECT * FROM r1;
|
||||
SELECT * FROM r2;
|
||||
INSERT INTO r2 VALUES ('Hello, world');
|
||||
SELECT '---';
|
||||
SELECT * FROM r1;
|
||||
SELECT * FROM r2;
|
||||
|
||||
SYSTEM START REPLICATED SENDS;
|
||||
SYSTEM SYNC REPLICA r1;
|
||||
SYSTEM SYNC REPLICA r2;
|
||||
|
||||
SELECT * FROM r1;
|
||||
SELECT * FROM r2;
|
||||
|
||||
DROP TABLE r1;
|
||||
DROP TABLE r2;
|
Loading…
Reference in New Issue
Block a user