2014-04-02 07:59:43 +00:00
|
|
|
#pragma once
|
|
|
|
|
2021-07-23 19:33:59 +00:00
|
|
|
#include <Processors/Sinks/SinkToStorage.h>
|
2017-06-25 00:01:10 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/types.h>
|
2022-11-10 12:14:04 +00:00
|
|
|
#include <Storages/MergeTree/ZooKeeperRetries.h>
|
2022-12-22 13:31:42 +00:00
|
|
|
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
|
2023-01-10 12:19:12 +00:00
|
|
|
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
|
2014-04-02 07:59:43 +00:00
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2017-02-02 22:08:19 +00:00
|
|
|
namespace Poco { class Logger; }
|
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
namespace zkutil
|
|
|
|
{
|
|
|
|
class ZooKeeper;
|
|
|
|
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
|
|
|
|
}
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-17 05:22:22 +00:00
|
|
|
class StorageReplicatedMergeTree;
|
2022-08-25 18:44:40 +00:00
|
|
|
struct StorageSnapshot;
|
|
|
|
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
|
2016-01-11 21:46:36 +00:00
|
|
|
|
|
|
|
|
2022-11-06 22:56:26 +00:00
|
|
|
/// ReplicatedMergeTreeSink will sink data to replicated merge tree with deduplication.
|
|
|
|
/// The template argument "async_insert" indicates whether this sink serves for async inserts.
|
|
|
|
/// Async inserts will have different deduplication policy. We use a vector of "block ids" to
|
|
|
|
/// identify different async inserts inside the same part. It will remove the duplicate inserts
|
|
|
|
/// when it encounters lock and retries.
|
|
|
|
template<bool async_insert>
|
2022-12-07 22:40:52 +00:00
|
|
|
class ReplicatedMergeTreeSinkImpl : public SinkToStorage
|
2014-04-02 07:59:43 +00:00
|
|
|
{
|
|
|
|
public:
|
2022-12-07 22:40:52 +00:00
|
|
|
ReplicatedMergeTreeSinkImpl(
|
2020-06-16 15:51:29 +00:00
|
|
|
StorageReplicatedMergeTree & storage_,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
|
|
|
size_t quorum_,
|
|
|
|
size_t quorum_timeout_ms_,
|
|
|
|
size_t max_parts_per_block_,
|
2020-09-30 23:16:27 +00:00
|
|
|
bool quorum_parallel_,
|
2020-11-13 07:54:05 +00:00
|
|
|
bool deduplicate_,
|
2022-08-08 05:23:49 +00:00
|
|
|
bool majority_quorum_,
|
2021-02-10 14:12:49 +00:00
|
|
|
ContextPtr context_,
|
2021-02-15 15:06:48 +00:00
|
|
|
// special flag to determine the ALTER TABLE ATTACH PART without the query context,
|
|
|
|
// needed to set the special LogEntryType::ATTACH_PART
|
|
|
|
bool is_attach_ = false);
|
2015-09-11 02:13:59 +00:00
|
|
|
|
2022-12-07 22:40:52 +00:00
|
|
|
~ReplicatedMergeTreeSinkImpl() override;
|
2022-02-01 10:36:51 +00:00
|
|
|
|
2021-07-26 14:47:29 +00:00
|
|
|
void onStart() override;
|
2021-07-23 19:33:59 +00:00
|
|
|
void consume(Chunk chunk) override;
|
2022-02-01 10:36:51 +00:00
|
|
|
void onFinish() override;
|
2021-07-23 19:33:59 +00:00
|
|
|
|
|
|
|
String getName() const override { return "ReplicatedMergeTreeSink"; }
|
2014-04-02 07:59:43 +00:00
|
|
|
|
2017-06-25 00:51:51 +00:00
|
|
|
/// For ATTACHing existing data on filesystem.
|
|
|
|
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
|
|
|
|
|
2017-10-24 19:32:23 +00:00
|
|
|
/// For proper deduplication in MaterializedViews
|
2021-08-31 13:50:07 +00:00
|
|
|
bool lastBlockIsDuplicate() const override
|
2017-10-24 19:32:23 +00:00
|
|
|
{
|
2021-08-31 13:50:07 +00:00
|
|
|
/// If MV is responsible for deduplication, block is not considered duplicating.
|
|
|
|
if (context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
|
|
|
|
return false;
|
|
|
|
|
2017-10-24 19:32:23 +00:00
|
|
|
return last_block_is_duplicate;
|
|
|
|
}
|
|
|
|
|
2022-11-06 22:56:26 +00:00
|
|
|
struct DelayedChunk;
|
2014-04-02 07:59:43 +00:00
|
|
|
private:
|
2022-11-06 22:56:26 +00:00
|
|
|
using BlockIDsType = std::conditional_t<async_insert, std::vector<String>, String>;
|
|
|
|
|
2022-11-10 12:14:04 +00:00
|
|
|
ZooKeeperRetriesInfo zookeeper_retries_info;
|
2017-06-25 00:01:10 +00:00
|
|
|
struct QuorumInfo
|
|
|
|
{
|
|
|
|
String status_path;
|
|
|
|
int is_active_node_version = -1;
|
|
|
|
int host_node_version = -1;
|
|
|
|
};
|
|
|
|
|
|
|
|
QuorumInfo quorum_info;
|
2022-09-06 12:09:03 +00:00
|
|
|
|
|
|
|
/// Checks active replicas.
|
|
|
|
/// Returns total number of replicas.
|
2022-11-10 12:14:04 +00:00
|
|
|
size_t checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
|
|
|
/// Rename temporary part and commit to ZooKeeper.
|
2023-06-01 19:20:39 +00:00
|
|
|
std::pair<std::vector<String>, bool> commitPart(
|
2022-11-10 12:14:04 +00:00
|
|
|
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
2022-06-23 16:21:46 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr & part,
|
2022-11-06 22:56:26 +00:00
|
|
|
const BlockIDsType & block_id,
|
2022-11-10 12:14:04 +00:00
|
|
|
size_t replicas_num,
|
|
|
|
bool writing_existing_part);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2021-04-08 10:35:38 +00:00
|
|
|
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
|
|
|
|
/// Also checks that replica still alive.
|
|
|
|
void waitForQuorum(
|
2022-11-10 12:14:04 +00:00
|
|
|
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
|
|
|
const std::string & part_name,
|
|
|
|
const std::string & quorum_path,
|
|
|
|
int is_active_node_version,
|
|
|
|
size_t replicas_num) const;
|
2021-04-08 10:35:38 +00:00
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
StorageReplicatedMergeTree & storage;
|
2020-06-16 15:51:29 +00:00
|
|
|
StorageMetadataPtr metadata_snapshot;
|
2022-09-06 12:09:03 +00:00
|
|
|
|
|
|
|
/// Empty means use majority quorum.
|
|
|
|
std::optional<size_t> required_quorum_size;
|
|
|
|
|
|
|
|
size_t getQuorumSize(size_t replicas_num) const;
|
|
|
|
bool isQuorumEnabled() const;
|
2022-09-13 13:49:51 +00:00
|
|
|
String quorumLogMessage(size_t replicas_num) const; /// Used in logs for debug purposes
|
2022-09-06 12:09:03 +00:00
|
|
|
|
2016-01-24 05:00:24 +00:00
|
|
|
size_t quorum_timeout_ms;
|
2019-03-29 09:33:39 +00:00
|
|
|
size_t max_parts_per_block;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2023-01-10 12:19:12 +00:00
|
|
|
UInt64 cache_version = 0;
|
|
|
|
|
2021-02-15 15:06:48 +00:00
|
|
|
bool is_attach = false;
|
2020-09-30 23:16:27 +00:00
|
|
|
bool quorum_parallel = false;
|
insert_deduplication_token setting for INSERT statement
The setting allows a user to provide own deduplication semantic in Replicated*MergeTree
If provided, it's used instead of data digest to generate block ID
So, for example, by providing a unique value for the setting in each INSERT statement,
user can avoid the same inserted data being deduplicated
Inserting data within the same INSERT statement are split into blocks
according to the *insert_block_size* settings
(max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes).
Each block with the same INSERT statement will get an ordinal number.
The ordinal number is added to insert_deduplication_token to get block dedup token
i.e. <token>_0, <token>_1, ... Deduplication is done per block
So, to guarantee deduplication for two same INSERT queries,
dedup token and number of blocks to have to be the same
Issue: #7461
2021-11-21 20:39:42 +00:00
|
|
|
const bool deduplicate = true;
|
2017-10-24 19:32:23 +00:00
|
|
|
bool last_block_is_duplicate = false;
|
|
|
|
|
2017-02-07 15:38:57 +00:00
|
|
|
using Logger = Poco::Logger;
|
2020-05-30 21:57:37 +00:00
|
|
|
Poco::Logger * log;
|
2020-11-13 07:54:05 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
ContextPtr context;
|
2022-08-25 18:44:40 +00:00
|
|
|
StorageSnapshotPtr storage_snapshot;
|
|
|
|
|
insert_deduplication_token setting for INSERT statement
The setting allows a user to provide own deduplication semantic in Replicated*MergeTree
If provided, it's used instead of data digest to generate block ID
So, for example, by providing a unique value for the setting in each INSERT statement,
user can avoid the same inserted data being deduplicated
Inserting data within the same INSERT statement are split into blocks
according to the *insert_block_size* settings
(max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes).
Each block with the same INSERT statement will get an ordinal number.
The ordinal number is added to insert_deduplication_token to get block dedup token
i.e. <token>_0, <token>_1, ... Deduplication is done per block
So, to guarantee deduplication for two same INSERT queries,
dedup token and number of blocks to have to be the same
Issue: #7461
2021-11-21 20:39:42 +00:00
|
|
|
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
|
2022-02-01 10:36:51 +00:00
|
|
|
|
|
|
|
/// We can delay processing for previous chunk and start writing a new one.
|
|
|
|
std::unique_ptr<DelayedChunk> delayed_chunk;
|
|
|
|
|
2022-11-10 12:14:04 +00:00
|
|
|
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
|
2014-04-02 07:59:43 +00:00
|
|
|
};
|
|
|
|
|
2022-12-07 22:40:52 +00:00
|
|
|
using ReplicatedMergeTreeSinkWithAsyncDeduplicate = ReplicatedMergeTreeSinkImpl<true>;
|
|
|
|
using ReplicatedMergeTreeSink = ReplicatedMergeTreeSinkImpl<false>;
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
}
|