2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/StorageReplicatedMergeTree.h>
|
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
|
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
|
2017-06-25 00:01:10 +00:00
|
|
|
#include <Interpreters/PartLog.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <DataStreams/IBlockOutputStream.h>
|
|
|
|
#include <Common/SipHash.h>
|
2018-04-03 17:35:48 +00:00
|
|
|
#include <Common/ZooKeeper/KeeperException.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/Operators.h>
|
2016-01-17 05:22:22 +00:00
|
|
|
|
2018-04-03 17:35:48 +00:00
|
|
|
|
2018-01-25 18:46:24 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
|
|
|
extern const Event DuplicatedInsertedBlocks;
|
|
|
|
}
|
2016-01-17 05:22:22 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2018-12-07 03:20:27 +00:00
|
|
|
extern const int TOO_FEW_LIVE_REPLICAS;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE;
|
|
|
|
extern const int UNEXPECTED_ZOOKEEPER_ERROR;
|
|
|
|
extern const int NO_ZOOKEEPER;
|
|
|
|
extern const int READONLY;
|
|
|
|
extern const int UNKNOWN_STATUS_OF_INSERT;
|
2018-01-23 22:56:46 +00:00
|
|
|
extern const int INSERT_WAS_DEDUPLICATED;
|
2018-11-22 21:19:58 +00:00
|
|
|
extern const int TIMEOUT_EXCEEDED;
|
|
|
|
extern const int NO_ACTIVE_REPLICAS;
|
2020-06-15 18:57:38 +00:00
|
|
|
extern const int DUPLICATE_DATA_PART;
|
2020-08-28 00:28:37 +00:00
|
|
|
extern const int PART_IS_TEMPORARILY_LOCKED;
|
2020-06-16 01:13:45 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2016-01-17 05:22:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
|
2020-06-16 15:51:29 +00:00
|
|
|
StorageReplicatedMergeTree & storage_,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
|
|
|
size_t quorum_,
|
|
|
|
size_t quorum_timeout_ms_,
|
|
|
|
size_t max_parts_per_block_,
|
2020-09-30 23:16:27 +00:00
|
|
|
bool quorum_parallel_,
|
2020-11-13 07:54:05 +00:00
|
|
|
bool deduplicate_,
|
2021-02-15 15:06:48 +00:00
|
|
|
bool optimize_on_insert_,
|
|
|
|
bool is_attach_)
|
2020-06-16 15:51:29 +00:00
|
|
|
: storage(storage_)
|
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
|
|
|
, quorum(quorum_)
|
2020-06-15 17:41:44 +00:00
|
|
|
, quorum_timeout_ms(quorum_timeout_ms_)
|
|
|
|
, max_parts_per_block(max_parts_per_block_)
|
2020-09-30 23:16:27 +00:00
|
|
|
, quorum_parallel(quorum_parallel_)
|
2020-06-15 17:41:44 +00:00
|
|
|
, deduplicate(deduplicate_)
|
|
|
|
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
|
2020-12-04 16:25:30 +00:00
|
|
|
, optimize_on_insert(optimize_on_insert_)
|
2021-02-15 15:06:48 +00:00
|
|
|
, is_attach(is_attach_)
|
2016-01-17 05:22:22 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// The quorum value `1` has the same meaning as if it is disabled.
|
|
|
|
if (quorum == 1)
|
|
|
|
quorum = 0;
|
2016-01-17 05:22:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-19 00:45:32 +00:00
|
|
|
Block ReplicatedMergeTreeBlockOutputStream::getHeader() const
|
|
|
|
{
|
2020-06-16 15:51:29 +00:00
|
|
|
return metadata_snapshot->getSampleBlock();
|
2018-02-19 00:45:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-03-12 19:18:07 +00:00
|
|
|
/// Allow to verify that the session in ZooKeeper is still alive.
|
2016-01-17 05:22:22 +00:00
|
|
|
static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!zookeeper)
|
|
|
|
throw Exception("No ZooKeeper session.", ErrorCodes::NO_ZOOKEEPER);
|
2016-01-17 08:12:48 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (zookeeper->expired())
|
|
|
|
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
|
2016-01-17 05:22:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper)
|
|
|
|
{
|
|
|
|
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
|
|
|
|
|
2018-08-25 01:58:14 +00:00
|
|
|
std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active");
|
|
|
|
std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host");
|
2017-06-25 00:01:10 +00:00
|
|
|
|
|
|
|
/// List of live replicas. All of them register an ephemeral node for leader_election.
|
|
|
|
|
2018-08-25 01:58:14 +00:00
|
|
|
Coordination::Stat leader_election_stat;
|
2017-06-25 00:01:10 +00:00
|
|
|
zookeeper->get(storage.zookeeper_path + "/leader_election", &leader_election_stat);
|
|
|
|
|
|
|
|
if (leader_election_stat.numChildren < static_cast<int32_t>(quorum))
|
|
|
|
throw Exception("Number of alive replicas ("
|
|
|
|
+ toString(leader_election_stat.numChildren) + ") is less than requested quorum (" + toString(quorum) + ").",
|
2018-12-07 03:20:27 +00:00
|
|
|
ErrorCodes::TOO_FEW_LIVE_REPLICAS);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
|
|
|
/** Is there a quorum for the last part for which a quorum is needed?
|
|
|
|
* Write of all the parts with the included quorum is linearly ordered.
|
|
|
|
* This means that at any time there can be only one part,
|
|
|
|
* for which you need, but not yet reach the quorum.
|
|
|
|
* Information about this part will be located in `/quorum/status` node.
|
|
|
|
* If the quorum is reached, then the node is deleted.
|
|
|
|
*/
|
|
|
|
|
2020-10-07 11:28:48 +00:00
|
|
|
String quorum_status;
|
|
|
|
if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status))
|
|
|
|
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status,
|
2020-08-28 00:28:37 +00:00
|
|
|
ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
|
|
|
/// Both checks are implicitly made also later (otherwise there would be a race condition).
|
|
|
|
|
|
|
|
auto is_active = is_active_future.get();
|
|
|
|
auto host = host_future.get();
|
|
|
|
|
2020-06-12 15:09:12 +00:00
|
|
|
if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE)
|
2017-06-25 00:01:10 +00:00
|
|
|
throw Exception("Replica is not active right now", ErrorCodes::READONLY);
|
|
|
|
|
2018-03-24 01:00:12 +00:00
|
|
|
quorum_info.is_active_node_value = is_active.data;
|
2017-06-25 00:01:10 +00:00
|
|
|
quorum_info.is_active_node_version = is_active.stat.version;
|
|
|
|
quorum_info.host_node_version = host.stat.version;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-01-17 05:22:22 +00:00
|
|
|
void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
|
|
|
{
|
2017-10-24 19:32:23 +00:00
|
|
|
last_block_is_duplicate = false;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
auto zookeeper = storage.getZooKeeper();
|
|
|
|
assertSessionIsNotExpired(zookeeper);
|
|
|
|
|
|
|
|
/** If write is with quorum, then we check that the required number of replicas is now live,
|
2017-06-25 00:01:10 +00:00
|
|
|
* and also that for all previous parts for which quorum is required, this quorum is reached.
|
2017-04-01 07:20:54 +00:00
|
|
|
* And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node).
|
|
|
|
* TODO Too complex logic, you can do better.
|
|
|
|
*/
|
|
|
|
if (quorum)
|
2017-06-25 00:01:10 +00:00
|
|
|
checkQuorumPrecondition(zookeeper);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-06-17 10:34:23 +00:00
|
|
|
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
for (auto & current_block : part_blocks)
|
|
|
|
{
|
2017-06-25 00:01:10 +00:00
|
|
|
Stopwatch watch;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
/// Write part to the filesystem under temporary name. Calculate a checksum.
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-12-04 16:25:30 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-10-24 19:32:23 +00:00
|
|
|
String block_id;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-10-24 19:32:23 +00:00
|
|
|
if (deduplicate)
|
|
|
|
{
|
|
|
|
SipHash hash;
|
2018-03-21 20:21:34 +00:00
|
|
|
part->checksums.computeTotalChecksumDataOnly(hash);
|
2017-10-24 19:32:23 +00:00
|
|
|
union
|
|
|
|
{
|
|
|
|
char bytes[16];
|
|
|
|
UInt64 words[2];
|
|
|
|
} hash_value;
|
|
|
|
hash.get128(hash_value.bytes);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-11-15 20:05:10 +00:00
|
|
|
/// We add the hash from the data and partition identifier to deduplication ID.
|
|
|
|
/// That is, do not insert the same data to the same partition twice.
|
2017-11-15 16:32:47 +00:00
|
|
|
block_id = part->info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
|
2017-10-24 19:32:23 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows());
|
2017-10-24 19:32:23 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-01-23 22:56:46 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
commitPart(zookeeper, part, block_id);
|
|
|
|
|
|
|
|
/// Set a special error code if the block is duplicate
|
|
|
|
int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
|
2019-01-04 12:10:00 +00:00
|
|
|
PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus(error));
|
2018-01-23 22:56:46 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2019-01-04 12:10:00 +00:00
|
|
|
PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
|
2018-01-23 22:56:46 +00:00
|
|
|
throw;
|
|
|
|
}
|
2017-06-25 00:01:10 +00:00
|
|
|
}
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2017-06-25 00:51:51 +00:00
|
|
|
void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
|
|
|
|
{
|
2017-10-24 19:32:23 +00:00
|
|
|
last_block_is_duplicate = false;
|
|
|
|
|
2018-05-21 13:49:54 +00:00
|
|
|
/// NOTE: No delay in this case. That's Ok.
|
2017-06-25 00:51:51 +00:00
|
|
|
|
|
|
|
auto zookeeper = storage.getZooKeeper();
|
|
|
|
assertSessionIsNotExpired(zookeeper);
|
|
|
|
|
|
|
|
if (quorum)
|
|
|
|
checkQuorumPrecondition(zookeeper);
|
|
|
|
|
|
|
|
Stopwatch watch;
|
|
|
|
|
2018-01-23 22:56:46 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
commitPart(zookeeper, part, "");
|
2019-01-04 12:10:00 +00:00
|
|
|
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
|
2018-01-23 22:56:46 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2019-01-04 12:10:00 +00:00
|
|
|
PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
|
2018-01-23 22:56:46 +00:00
|
|
|
throw;
|
|
|
|
}
|
2017-06-25 00:51:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-06-13 20:59:20 +00:00
|
|
|
void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
|
|
|
zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id)
|
2017-06-25 00:01:10 +00:00
|
|
|
{
|
2020-06-17 14:32:25 +00:00
|
|
|
metadata_snapshot->check(part->getColumns());
|
2017-06-25 00:01:10 +00:00
|
|
|
assertSessionIsNotExpired(zookeeper);
|
|
|
|
|
2020-08-28 00:07:51 +00:00
|
|
|
String temporary_part_relative_path = part->relative_path;
|
2018-01-19 22:37:50 +00:00
|
|
|
|
2020-08-27 23:22:00 +00:00
|
|
|
/// There is one case when we need to retry transaction in a loop.
|
|
|
|
/// But don't do it too many times - just as defensive measure.
|
|
|
|
size_t loop_counter = 0;
|
|
|
|
constexpr size_t max_iterations = 10;
|
|
|
|
|
2020-08-27 23:39:12 +00:00
|
|
|
bool is_already_existing_part = false;
|
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
while (true)
|
2018-01-19 22:37:50 +00:00
|
|
|
{
|
2020-06-15 18:57:38 +00:00
|
|
|
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
|
|
|
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
|
|
|
|
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
|
|
|
|
|
|
|
|
/// Allocate new block number and check for duplicates
|
|
|
|
bool deduplicate_block = !block_id.empty();
|
|
|
|
String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "";
|
|
|
|
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
|
|
|
|
|
2020-08-27 23:22:00 +00:00
|
|
|
/// Prepare transaction to ZooKeeper
|
|
|
|
/// It will simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
|
|
|
|
Coordination::Requests ops;
|
|
|
|
|
2020-06-16 01:17:02 +00:00
|
|
|
Int64 block_number = 0;
|
2020-06-15 18:57:38 +00:00
|
|
|
String existing_part_name;
|
|
|
|
if (block_number_lock)
|
|
|
|
{
|
2020-08-27 23:39:12 +00:00
|
|
|
is_already_existing_part = false;
|
2020-06-15 18:57:38 +00:00
|
|
|
block_number = block_number_lock->getNumber();
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
/// Set part attributes according to part_number. Prepare an entry for log.
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
part->info.min_block = block_number;
|
|
|
|
part->info.max_block = block_number;
|
|
|
|
part->info.level = 0;
|
2021-01-11 13:26:43 +00:00
|
|
|
part->info.mutation = 0;
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
part->name = part->getNewName(part->info);
|
2020-08-27 23:22:00 +00:00
|
|
|
|
2021-02-15 15:06:48 +00:00
|
|
|
StorageReplicatedMergeTree::LogEntry log_entry;
|
|
|
|
|
|
|
|
if (is_attach)
|
|
|
|
{
|
2021-02-16 12:40:00 +00:00
|
|
|
log_entry.type = StorageReplicatedMergeTree::LogEntry::ATTACH_PART;
|
|
|
|
|
|
|
|
/// We don't need to involve ZooKeeper to obtain the checksums as by the time we get
|
|
|
|
/// the MutableDataPartPtr here, we already have the data thus being able to
|
|
|
|
/// calculate the checksums.
|
|
|
|
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
|
2021-02-15 15:06:48 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
|
2020-08-27 23:22:00 +00:00
|
|
|
|
|
|
|
log_entry.create_time = time(nullptr);
|
|
|
|
log_entry.source_replica = storage.replica_name;
|
|
|
|
log_entry.new_part_name = part->name;
|
2020-10-29 16:18:25 +00:00
|
|
|
/// TODO maybe add UUID here as well?
|
2020-08-27 23:22:00 +00:00
|
|
|
log_entry.quorum = quorum;
|
|
|
|
log_entry.block_id = block_id;
|
|
|
|
log_entry.new_part_type = part->getType();
|
|
|
|
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(
|
|
|
|
storage.zookeeper_path + "/log/log-",
|
|
|
|
log_entry.toString(),
|
|
|
|
zkutil::CreateMode::PersistentSequential));
|
|
|
|
|
|
|
|
/// Deletes the information that the block number is used for writing.
|
|
|
|
block_number_lock->getUnlockOps(ops);
|
2020-08-27 23:30:07 +00:00
|
|
|
|
|
|
|
/** If we need a quorum - create a node in which the quorum is monitored.
|
|
|
|
* (If such a node already exists, then someone has managed to make another quorum record at the same time,
|
|
|
|
* but for it the quorum has not yet been reached.
|
|
|
|
* You can not do the next quorum record at this time.)
|
|
|
|
*/
|
|
|
|
if (quorum)
|
|
|
|
{
|
2020-10-06 21:49:48 +00:00
|
|
|
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
|
|
|
quorum_entry.part_name = part->name;
|
|
|
|
quorum_entry.required_number_of_replicas = quorum;
|
|
|
|
quorum_entry.replicas.insert(storage.replica_name);
|
2020-08-27 23:30:07 +00:00
|
|
|
|
|
|
|
/** At this point, this node will contain information that the current replica received a part.
|
|
|
|
* When other replicas will receive this part (in the usual way, processing the replication log),
|
|
|
|
* they will add themselves to the contents of this node.
|
|
|
|
* When it contains information about `quorum` number of replicas, this node is deleted,
|
|
|
|
* which indicates that the quorum has been reached.
|
|
|
|
*/
|
|
|
|
|
2020-10-06 21:49:48 +00:00
|
|
|
if (quorum_parallel)
|
|
|
|
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + part->name;
|
|
|
|
|
|
|
|
ops.emplace_back(
|
|
|
|
zkutil::makeCreateRequest(
|
|
|
|
quorum_info.status_path,
|
|
|
|
quorum_entry.toString(),
|
|
|
|
zkutil::CreateMode::Persistent));
|
2020-08-27 23:30:07 +00:00
|
|
|
|
|
|
|
/// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished).
|
|
|
|
ops.emplace_back(
|
|
|
|
zkutil::makeCheckRequest(
|
|
|
|
storage.replica_path + "/is_active",
|
|
|
|
quorum_info.is_active_node_version));
|
|
|
|
|
2020-08-28 00:28:37 +00:00
|
|
|
/// Unfortunately, just checking the above is not enough, because `is_active`
|
|
|
|
/// node can be deleted and reappear with the same version.
|
2020-08-27 23:30:07 +00:00
|
|
|
/// But then the `host` value will change. We will check this.
|
|
|
|
/// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread).
|
|
|
|
ops.emplace_back(
|
|
|
|
zkutil::makeCheckRequest(
|
|
|
|
storage.replica_path + "/host",
|
|
|
|
quorum_info.host_node_version));
|
|
|
|
}
|
2020-06-15 18:57:38 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2020-08-27 23:39:12 +00:00
|
|
|
is_already_existing_part = true;
|
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
/// This block was already written to some replica. Get the part name for it.
|
|
|
|
/// Note: race condition with DROP PARTITION operation is possible. User will get "No node" exception and it is Ok.
|
|
|
|
existing_part_name = zookeeper->get(storage.zookeeper_path + "/blocks/" + block_id);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
/// If it exists on our replica, ignore it.
|
2020-10-06 21:49:48 +00:00
|
|
|
if (storage.getActiveContainingPart(existing_part_name))
|
2020-06-15 18:57:38 +00:00
|
|
|
{
|
|
|
|
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
|
|
|
|
part->is_duplicate = true;
|
|
|
|
last_block_is_duplicate = true;
|
|
|
|
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
|
|
|
return;
|
|
|
|
}
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.",
|
|
|
|
block_id, existing_part_name);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
/// If it does not exist, we will write a new part with existing name.
|
|
|
|
/// Note that it may also appear on filesystem right now in PreCommitted state due to concurrent inserts of the same data.
|
|
|
|
/// It will be checked when we will try to rename directory.
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
part->name = existing_part_name;
|
|
|
|
part->info = MergeTreePartInfo::fromPartName(existing_part_name, storage.format_version);
|
2020-08-27 23:22:00 +00:00
|
|
|
/// Used only for exception messages.
|
2020-06-16 01:17:02 +00:00
|
|
|
block_number = part->info.min_block;
|
2020-08-28 00:07:51 +00:00
|
|
|
|
|
|
|
/// Do not check for duplicate on commit to ZK.
|
|
|
|
block_id_path.clear();
|
2020-06-15 18:57:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Information about the part.
|
2020-10-06 21:49:48 +00:00
|
|
|
storage.getCommitPartOps(ops, part, block_id_path);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
|
|
|
bool renamed = false;
|
|
|
|
try
|
|
|
|
{
|
|
|
|
renamed = storage.renameTempPartAndAdd(part, nullptr, &transaction);
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
2020-08-28 00:28:37 +00:00
|
|
|
if (e.code() != ErrorCodes::DUPLICATE_DATA_PART
|
|
|
|
&& e.code() != ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
|
2020-06-15 18:57:38 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
if (!renamed)
|
|
|
|
{
|
2020-08-28 00:07:51 +00:00
|
|
|
if (is_already_existing_part)
|
2020-06-15 18:57:38 +00:00
|
|
|
{
|
2020-09-17 12:10:06 +00:00
|
|
|
LOG_INFO(log, "Part {} is duplicate and it is already written by concurrent request or fetched; ignoring it.", part->name);
|
2020-06-15 18:57:38 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
else
|
2020-09-17 12:10:06 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part with name {} is already written by concurrent request."
|
2020-08-28 00:28:37 +00:00
|
|
|
" It should not happen for non-duplicate data parts because unique names are assigned for them. It's a bug",
|
2020-09-17 12:10:06 +00:00
|
|
|
part->name);
|
2020-06-15 18:57:38 +00:00
|
|
|
}
|
2018-01-19 22:37:50 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
Coordination::Responses responses;
|
|
|
|
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
|
2018-03-15 17:04:47 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
if (multi_code == Coordination::Error::ZOK)
|
|
|
|
{
|
|
|
|
transaction.commit();
|
|
|
|
storage.merge_selecting_task->schedule();
|
2018-01-19 22:37:50 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
/// Lock nodes have been already deleted, do not delete them in destructor
|
|
|
|
if (block_number_lock)
|
|
|
|
block_number_lock->assumeUnlocked();
|
|
|
|
}
|
|
|
|
else if (multi_code == Coordination::Error::ZCONNECTIONLOSS
|
|
|
|
|| multi_code == Coordination::Error::ZOPERATIONTIMEOUT)
|
|
|
|
{
|
|
|
|
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
|
|
|
|
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
|
|
|
|
*/
|
|
|
|
transaction.commit();
|
|
|
|
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
|
|
|
|
|
|
|
|
/// We do not know whether or not data has been inserted.
|
|
|
|
throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)),
|
|
|
|
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
|
|
|
|
}
|
|
|
|
else if (Coordination::isUserError(multi_code))
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-06-15 18:57:38 +00:00
|
|
|
String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp();
|
2018-01-19 22:37:50 +00:00
|
|
|
|
2020-06-15 18:57:38 +00:00
|
|
|
if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
|
|
|
|
{
|
|
|
|
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
|
|
|
|
LOG_INFO(log, "Block with ID {} already exists (it was just appeared). Renaming part {} back to {}. Will retry write.",
|
2020-08-28 00:07:51 +00:00
|
|
|
block_id, part->name, temporary_part_relative_path);
|
2020-06-15 18:57:38 +00:00
|
|
|
|
2020-09-17 19:30:45 +00:00
|
|
|
/// We will try to add this part again on the new iteration as it's just a new part.
|
|
|
|
/// So remove it from storage parts set immediately and transfer state to temporary.
|
2020-09-17 15:33:50 +00:00
|
|
|
transaction.rollbackPartsToTemporaryState();
|
2020-06-15 18:57:38 +00:00
|
|
|
|
|
|
|
part->is_temp = true;
|
2020-08-28 00:07:51 +00:00
|
|
|
part->renameTo(temporary_part_relative_path, false);
|
2020-06-15 18:57:38 +00:00
|
|
|
|
2020-09-17 12:01:03 +00:00
|
|
|
/// If this part appeared on other replica than it's better to try to write it locally one more time. If it's our part
|
|
|
|
/// than it will be ignored on the next itration.
|
2020-08-27 23:22:00 +00:00
|
|
|
++loop_counter;
|
|
|
|
if (loop_counter == max_iterations)
|
2020-09-17 14:01:17 +00:00
|
|
|
{
|
|
|
|
part->is_duplicate = true; /// Part is duplicate, just remove it from local FS
|
2020-09-17 12:01:03 +00:00
|
|
|
throw Exception("Too many transaction retries - it may indicate an error", ErrorCodes::DUPLICATE_DATA_PART);
|
2020-09-17 14:01:17 +00:00
|
|
|
}
|
2020-06-15 18:57:38 +00:00
|
|
|
continue;
|
|
|
|
}
|
2020-09-17 12:10:06 +00:00
|
|
|
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
|
2020-06-15 18:57:38 +00:00
|
|
|
{
|
2020-09-17 12:01:03 +00:00
|
|
|
transaction.rollback();
|
|
|
|
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
2020-06-15 18:57:38 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
|
|
|
|
transaction.rollback();
|
|
|
|
throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
|
|
|
+ Coordination::errorMessage(multi_code) + ", path " + failed_op_path,
|
|
|
|
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2020-06-15 18:57:38 +00:00
|
|
|
else if (Coordination::isHardwareError(multi_code))
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-01-19 22:37:50 +00:00
|
|
|
transaction.rollback();
|
2020-06-15 18:57:38 +00:00
|
|
|
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
|
|
|
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2017-06-25 00:01:10 +00:00
|
|
|
else
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-01-19 22:37:50 +00:00
|
|
|
transaction.rollback();
|
2020-06-15 18:57:38 +00:00
|
|
|
throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
|
|
|
|
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
2017-06-25 00:01:10 +00:00
|
|
|
}
|
2020-06-15 18:57:38 +00:00
|
|
|
|
|
|
|
break;
|
2018-01-19 22:37:50 +00:00
|
|
|
}
|
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
if (quorum)
|
|
|
|
{
|
2020-08-27 23:39:12 +00:00
|
|
|
if (is_already_existing_part)
|
|
|
|
{
|
2020-09-17 14:01:17 +00:00
|
|
|
/// We get duplicate part without fetch
|
2020-10-07 11:28:48 +00:00
|
|
|
/// Check if this quorum insert is parallel or not
|
2020-10-08 15:35:41 +00:00
|
|
|
if (zookeeper->exists(storage.zookeeper_path + "/quorum/parallel/" + part->name))
|
2020-10-07 11:28:48 +00:00
|
|
|
storage.updateQuorum(part->name, true);
|
2020-10-08 15:35:41 +00:00
|
|
|
else if (zookeeper->exists(storage.zookeeper_path + "/quorum/status"))
|
|
|
|
storage.updateQuorum(part->name, false);
|
2020-08-27 23:39:12 +00:00
|
|
|
}
|
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
/// We are waiting for quorum to be satisfied.
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_TRACE(log, "Waiting for quorum");
|
2017-06-25 00:01:10 +00:00
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
while (true)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-06-25 00:01:10 +00:00
|
|
|
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
|
|
|
|
|
|
|
std::string value;
|
|
|
|
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
|
2020-10-06 21:49:48 +00:00
|
|
|
if (!zookeeper->tryGet(quorum_info.status_path, value, nullptr, event))
|
2017-06-25 00:01:10 +00:00
|
|
|
break;
|
|
|
|
|
2020-12-17 16:14:42 +00:00
|
|
|
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_info.status_path);
|
2020-12-17 16:13:01 +00:00
|
|
|
|
2020-10-06 21:49:48 +00:00
|
|
|
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
|
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
/// If the node has time to disappear, and then appear again for the next insert.
|
2020-10-06 21:49:48 +00:00
|
|
|
if (quorum_entry.part_name != part->name)
|
|
|
|
break;
|
2017-06-25 00:01:10 +00:00
|
|
|
|
|
|
|
if (!event->tryWait(quorum_timeout_ms))
|
2018-11-22 21:19:58 +00:00
|
|
|
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
|
2020-12-17 16:13:01 +00:00
|
|
|
|
|
|
|
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_info.status_path);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2020-08-28 00:28:37 +00:00
|
|
|
/// And what if it is possible that the current replica at this time has ceased to be active
|
|
|
|
/// and the quorum is marked as failed and deleted?
|
2017-06-25 00:01:10 +00:00
|
|
|
String value;
|
|
|
|
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|
|
|
|
|| value != quorum_info.is_active_node_value)
|
2018-11-22 21:19:58 +00:00
|
|
|
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2017-06-25 00:01:10 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
/// We do not know whether or not data has been inserted
|
|
|
|
/// - whether other replicas have time to download the part and mark the quorum as done.
|
|
|
|
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
|
|
|
|
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
|
|
|
|
}
|
|
|
|
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_TRACE(log, "Quorum satisfied");
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-01-17 05:22:22 +00:00
|
|
|
}
|
|
|
|
|
2018-05-21 23:17:57 +00:00
|
|
|
void ReplicatedMergeTreeBlockOutputStream::writePrefix()
|
|
|
|
{
|
2020-11-29 15:08:02 +00:00
|
|
|
/// Only check "too many parts" before write,
|
|
|
|
/// because interrupting long-running INSERT query in the middle is not convenient for users.
|
|
|
|
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
|
2018-05-21 23:17:57 +00:00
|
|
|
}
|
|
|
|
|
2016-01-17 05:22:22 +00:00
|
|
|
|
|
|
|
}
|