2014-04-02 07:59:43 +00:00
|
|
|
#pragma once
|
|
|
|
|
2021-07-23 19:33:59 +00:00
|
|
|
#include <Processors/Sinks/SinkToStorage.h>
|
2017-06-25 00:01:10 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/types.h>
|
2014-04-02 07:59:43 +00:00
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2017-02-02 22:08:19 +00:00
|
|
|
namespace Poco { class Logger; }
|
|
|
|
|
2017-06-25 00:01:10 +00:00
|
|
|
namespace zkutil
|
|
|
|
{
|
|
|
|
class ZooKeeper;
|
|
|
|
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
|
|
|
|
}
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-17 05:22:22 +00:00
|
|
|
class StorageReplicatedMergeTree;
|
2016-01-11 21:46:36 +00:00
|
|
|
|
|
|
|
|
2021-07-23 19:33:59 +00:00
|
|
|
class ReplicatedMergeTreeSink : public SinkToStorage
|
2014-04-02 07:59:43 +00:00
|
|
|
{
|
|
|
|
public:
|
2021-07-23 19:33:59 +00:00
|
|
|
ReplicatedMergeTreeSink(
|
2020-06-16 15:51:29 +00:00
|
|
|
StorageReplicatedMergeTree & storage_,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
|
|
|
size_t quorum_,
|
|
|
|
size_t quorum_timeout_ms_,
|
|
|
|
size_t max_parts_per_block_,
|
2020-09-30 23:16:27 +00:00
|
|
|
bool quorum_parallel_,
|
2020-11-13 07:54:05 +00:00
|
|
|
bool deduplicate_,
|
2021-02-10 14:12:49 +00:00
|
|
|
ContextPtr context_,
|
2021-02-15 15:06:48 +00:00
|
|
|
// special flag to determine the ALTER TABLE ATTACH PART without the query context,
|
|
|
|
// needed to set the special LogEntryType::ATTACH_PART
|
|
|
|
bool is_attach_ = false);
|
2015-09-11 02:13:59 +00:00
|
|
|
|
2022-02-01 10:36:51 +00:00
|
|
|
~ReplicatedMergeTreeSink() override;
|
|
|
|
|
2021-07-26 14:47:29 +00:00
|
|
|
void onStart() override;
|
2021-07-23 19:33:59 +00:00
|
|
|
void consume(Chunk chunk) override;
|
2022-02-01 10:36:51 +00:00
|
|
|
void onFinish() override;
|
2021-07-23 19:33:59 +00:00
|
|
|
|
|
|
|
String getName() const override { return "ReplicatedMergeTreeSink"; }
|
2014-04-02 07:59:43 +00:00
|
|
|
|
2017-06-25 00:51:51 +00:00
|
|
|
/// For ATTACHing existing data on filesystem.
|
|
|
|
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
|
|
|
|
|
2017-10-24 19:32:23 +00:00
|
|
|
/// For proper deduplication in MaterializedViews
|
2021-08-31 13:50:07 +00:00
|
|
|
bool lastBlockIsDuplicate() const override
|
2017-10-24 19:32:23 +00:00
|
|
|
{
|
2021-08-31 13:50:07 +00:00
|
|
|
/// If MV is responsible for deduplication, block is not considered duplicating.
|
|
|
|
if (context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
|
|
|
|
return false;
|
|
|
|
|
2017-10-24 19:32:23 +00:00
|
|
|
return last_block_is_duplicate;
|
|
|
|
}
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
private:
|
2017-06-25 00:01:10 +00:00
|
|
|
struct QuorumInfo
|
|
|
|
{
|
|
|
|
String status_path;
|
|
|
|
String is_active_node_value;
|
|
|
|
int is_active_node_version = -1;
|
|
|
|
int host_node_version = -1;
|
|
|
|
};
|
|
|
|
|
|
|
|
QuorumInfo quorum_info;
|
|
|
|
void checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper);
|
|
|
|
|
|
|
|
/// Rename temporary part and commit to ZooKeeper.
|
2022-06-23 16:21:46 +00:00
|
|
|
void commitPart(
|
|
|
|
zkutil::ZooKeeperPtr & zookeeper,
|
|
|
|
MergeTreeData::MutableDataPartPtr & part,
|
|
|
|
const String & block_id,
|
|
|
|
DataPartStorageBuilderPtr part_builder);
|
2017-06-25 00:01:10 +00:00
|
|
|
|
2021-04-08 10:35:38 +00:00
|
|
|
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
|
|
|
|
/// Also checks that replica still alive.
|
|
|
|
void waitForQuorum(
|
|
|
|
zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name,
|
2021-04-08 10:38:40 +00:00
|
|
|
const std::string & quorum_path, const std::string & is_active_node_value) const;
|
2021-04-08 10:35:38 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
StorageReplicatedMergeTree & storage;
|
2020-06-16 15:51:29 +00:00
|
|
|
StorageMetadataPtr metadata_snapshot;
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t quorum;
|
|
|
|
size_t quorum_timeout_ms;
|
2019-03-29 09:33:39 +00:00
|
|
|
size_t max_parts_per_block;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-02-15 15:06:48 +00:00
|
|
|
bool is_attach = false;
|
2020-09-30 23:16:27 +00:00
|
|
|
bool quorum_parallel = false;
|
insert_deduplication_token setting for INSERT statement
The setting allows a user to provide own deduplication semantic in Replicated*MergeTree
If provided, it's used instead of data digest to generate block ID
So, for example, by providing a unique value for the setting in each INSERT statement,
user can avoid the same inserted data being deduplicated
Inserting data within the same INSERT statement are split into blocks
according to the *insert_block_size* settings
(max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes).
Each block with the same INSERT statement will get an ordinal number.
The ordinal number is added to insert_deduplication_token to get block dedup token
i.e. <token>_0, <token>_1, ... Deduplication is done per block
So, to guarantee deduplication for two same INSERT queries,
dedup token and number of blocks to have to be the same
Issue: #7461
2021-11-21 20:39:42 +00:00
|
|
|
const bool deduplicate = true;
|
2017-10-24 19:32:23 +00:00
|
|
|
bool last_block_is_duplicate = false;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
using Logger = Poco::Logger;
|
2020-05-30 21:57:37 +00:00
|
|
|
Poco::Logger * log;
|
2020-11-13 07:54:05 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
ContextPtr context;
|
insert_deduplication_token setting for INSERT statement
The setting allows a user to provide own deduplication semantic in Replicated*MergeTree
If provided, it's used instead of data digest to generate block ID
So, for example, by providing a unique value for the setting in each INSERT statement,
user can avoid the same inserted data being deduplicated
Inserting data within the same INSERT statement are split into blocks
according to the *insert_block_size* settings
(max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes).
Each block with the same INSERT statement will get an ordinal number.
The ordinal number is added to insert_deduplication_token to get block dedup token
i.e. <token>_0, <token>_1, ... Deduplication is done per block
So, to guarantee deduplication for two same INSERT queries,
dedup token and number of blocks to have to be the same
Issue: #7461
2021-11-21 20:39:42 +00:00
|
|
|
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
|
2022-02-01 10:36:51 +00:00
|
|
|
|
|
|
|
/// We can delay processing for previous chunk and start writing a new one.
|
|
|
|
struct DelayedChunk;
|
|
|
|
std::unique_ptr<DelayedChunk> delayed_chunk;
|
|
|
|
|
|
|
|
void finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper);
|
2014-04-02 07:59:43 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|