mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 11:32:03 +00:00
Merge pull request #51676 from ClickHouse/fix-async-insert-dedup-for-merging-algorithms
Fix async insert with deduplication for ReplicatedMergeTree using merging algorithms
This commit is contained in:
commit
3e8358e8a1
@ -39,7 +39,6 @@ void ConvertingTransform::onConsume(Chunk chunk)
|
||||
expression->execute(block, num_rows);
|
||||
|
||||
chunk.setColumns(block.getColumns(), num_rows);
|
||||
chunk.setChunkInfo(chunk.getChunkInfo());
|
||||
cur_chunk = std::move(chunk);
|
||||
}
|
||||
|
||||
|
@ -80,6 +80,11 @@ public:
|
||||
*/
|
||||
TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
||||
|
||||
MergeTreeData::MergingParams::Mode getMergingMode() const
|
||||
{
|
||||
return data.merging_params.mode;
|
||||
}
|
||||
|
||||
TemporaryPart writeTempPartWithoutPrefix(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, int64_t block_number, ContextPtr context);
|
||||
|
||||
/// For insertion.
|
||||
|
@ -7,6 +7,8 @@
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/ThreadFuzzer.h>
|
||||
#include <Storages/MergeTree/MergeAlgorithm.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
|
||||
#include <DataTypes/ObjectUtils.h>
|
||||
#include <Core/Block.h>
|
||||
@ -54,6 +56,9 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
|
||||
UInt64 elapsed_ns;
|
||||
BlockIDsType block_id;
|
||||
BlockWithPartition block_with_partition;
|
||||
/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
|
||||
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block
|
||||
std::optional<BlockWithPartition> unmerged_block_with_partition;
|
||||
std::unordered_map<String, std::vector<size_t>> block_id_to_offset_idx;
|
||||
ProfileEvents::Counters part_counters;
|
||||
|
||||
@ -63,12 +68,14 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
|
||||
UInt64 elapsed_ns_,
|
||||
BlockIDsType && block_id_,
|
||||
BlockWithPartition && block_,
|
||||
std::optional<BlockWithPartition> && unmerged_block_with_partition_,
|
||||
ProfileEvents::Counters && part_counters_)
|
||||
: log(log_),
|
||||
temp_part(std::move(temp_part_)),
|
||||
elapsed_ns(elapsed_ns_),
|
||||
block_id(std::move(block_id_)),
|
||||
block_with_partition(std::move(block_)),
|
||||
unmerged_block_with_partition(std::move(unmerged_block_with_partition_)),
|
||||
part_counters(std::move(part_counters_))
|
||||
{
|
||||
initBlockIDMap();
|
||||
@ -113,6 +120,7 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
|
||||
{
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
auto * current_block_with_partition = unmerged_block_with_partition.has_value() ? &unmerged_block_with_partition.value() : &block_with_partition;
|
||||
std::vector<size_t> offset_idx;
|
||||
for (const auto & raw_path : block_paths)
|
||||
{
|
||||
@ -127,14 +135,14 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
|
||||
}
|
||||
std::sort(offset_idx.begin(), offset_idx.end());
|
||||
|
||||
auto & offsets = block_with_partition.offsets;
|
||||
auto & offsets = current_block_with_partition->offsets;
|
||||
size_t idx = 0, remove_count = 0;
|
||||
auto it = offset_idx.begin();
|
||||
std::vector<size_t> new_offsets;
|
||||
std::vector<String> new_block_ids;
|
||||
|
||||
/// construct filter
|
||||
size_t rows = block_with_partition.block.rows();
|
||||
size_t rows = current_block_with_partition->block.rows();
|
||||
auto filter_col = ColumnUInt8::create(rows, 1u);
|
||||
ColumnUInt8::Container & vec = filter_col->getData();
|
||||
UInt8 * pos = vec.data();
|
||||
@ -162,18 +170,21 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
|
||||
|
||||
LOG_TRACE(log, "New block IDs: {}, new offsets: {}, size: {}", toString(new_block_ids), toString(new_offsets), new_offsets.size());
|
||||
|
||||
block_with_partition.offsets = std::move(new_offsets);
|
||||
current_block_with_partition->offsets = std::move(new_offsets);
|
||||
block_id = std::move(new_block_ids);
|
||||
auto cols = block_with_partition.block.getColumns();
|
||||
auto cols = current_block_with_partition->block.getColumns();
|
||||
for (auto & col : cols)
|
||||
{
|
||||
col = col->filter(vec, rows - remove_count);
|
||||
}
|
||||
block_with_partition.block.setColumns(cols);
|
||||
current_block_with_partition->block.setColumns(cols);
|
||||
|
||||
LOG_TRACE(log, "New block rows {}", block_with_partition.block.rows());
|
||||
LOG_TRACE(log, "New block rows {}", current_block_with_partition->block.rows());
|
||||
|
||||
initBlockIDMap();
|
||||
|
||||
if (unmerged_block_with_partition.has_value())
|
||||
block_with_partition.block = unmerged_block_with_partition->block;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -202,7 +213,7 @@ std::vector<Int64> testSelfDeduplicate(std::vector<Int64> data, std::vector<size
|
||||
BlockWithPartition block1(std::move(block), Row(), std::move(offsets));
|
||||
ProfileEvents::Counters profile_counters;
|
||||
ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition part(
|
||||
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1), std::move(profile_counters));
|
||||
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1), std::nullopt, std::move(profile_counters));
|
||||
|
||||
part.filterSelfDuplicate();
|
||||
|
||||
@ -235,8 +246,10 @@ namespace
|
||||
{
|
||||
SipHash hash;
|
||||
for (size_t i = start; i < offset; ++i)
|
||||
{
|
||||
for (const auto & col : cols)
|
||||
col->updateHashWithValue(i, hash);
|
||||
}
|
||||
union
|
||||
{
|
||||
char bytes[16];
|
||||
@ -432,8 +445,18 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
|
||||
ProfileEvents::Counters part_counters;
|
||||
auto profile_events_scope = std::make_unique<ProfileEventsScope>(&part_counters);
|
||||
|
||||
/// Write part to the filesystem under temporary name. Calculate a checksum.
|
||||
/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
|
||||
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block
|
||||
std::optional<BlockWithPartition> unmerged_block;
|
||||
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
/// we copy everything but offsets which we move because they are only used by async insert
|
||||
if (settings.optimize_on_insert && storage.writer.getMergingMode() != MergeTreeData::MergingParams::Mode::Ordinary)
|
||||
unmerged_block.emplace(Block(current_block.block), Row(current_block.partition), std::move(current_block.offsets));
|
||||
}
|
||||
|
||||
/// Write part to the filesystem under temporary name. Calculate a checksum.
|
||||
auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
|
||||
|
||||
/// If optimize_on_insert setting is true, current_block could become empty after merge
|
||||
@ -446,31 +469,35 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
/// TODO consider insert_deduplication_token
|
||||
block_id = getHashesForBlocks(current_block, temp_part.part->info.partition_id);
|
||||
block_id = getHashesForBlocks(unmerged_block.has_value() ? *unmerged_block : current_block, temp_part.part->info.partition_id);
|
||||
LOG_TRACE(log, "async insert part, part id {}, block id {}, offsets {}, size {}", temp_part.part->info.partition_id, toString(block_id), toString(current_block.offsets), current_block.offsets.size());
|
||||
}
|
||||
else if (deduplicate)
|
||||
{
|
||||
String block_dedup_token;
|
||||
|
||||
/// We add the hash from the data and partition identifier to deduplication ID.
|
||||
/// That is, do not insert the same data to the same partition twice.
|
||||
|
||||
const String & dedup_token = settings.insert_deduplication_token;
|
||||
if (!dedup_token.empty())
|
||||
{
|
||||
/// multiple blocks can be inserted within the same insert query
|
||||
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
|
||||
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
|
||||
++chunk_dedup_seqnum;
|
||||
}
|
||||
|
||||
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
|
||||
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
|
||||
|
||||
if (deduplicate)
|
||||
{
|
||||
String block_dedup_token;
|
||||
|
||||
/// We add the hash from the data and partition identifier to deduplication ID.
|
||||
/// That is, do not insert the same data to the same partition twice.
|
||||
|
||||
const String & dedup_token = settings.insert_deduplication_token;
|
||||
if (!dedup_token.empty())
|
||||
{
|
||||
/// multiple blocks can be inserted within the same insert query
|
||||
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
|
||||
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
|
||||
++chunk_dedup_seqnum;
|
||||
}
|
||||
|
||||
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
|
||||
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
|
||||
}
|
||||
}
|
||||
|
||||
profile_events_scope.reset();
|
||||
@ -501,6 +528,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
|
||||
elapsed_ns,
|
||||
std::move(block_id),
|
||||
std::move(current_block),
|
||||
std::move(unmerged_block),
|
||||
std::move(part_counters) /// profile_events_scope must be reset here.
|
||||
));
|
||||
}
|
||||
|
@ -0,0 +1,8 @@
|
||||
string1
|
||||
------------
|
||||
string1
|
||||
------------
|
||||
string1
|
||||
string1
|
||||
string2
|
||||
------------
|
39
tests/queries/0_stateless/02810_async_insert_dedup_replicated_collapsing.sh
Executable file
39
tests/queries/0_stateless/02810_async_insert_dedup_replicated_collapsing.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS 02810_async_insert_dedup_collapsing"
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE TABLE 02810_async_insert_dedup_collapsing (stringvalue String, sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/{database}/02810_async_insert_dedup', 'r1', sign) ORDER BY stringvalue"
|
||||
|
||||
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_busy_timeout_ms=3000&async_insert_deduplicate=1"
|
||||
|
||||
# insert value with same key and sign so it's collapsed on insert
|
||||
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
|
||||
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
|
||||
|
||||
wait
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"
|
||||
|
||||
# trigger same collaps algorithm but also deduplication
|
||||
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
|
||||
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
|
||||
|
||||
wait
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"
|
||||
|
||||
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1)" &
|
||||
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1), ('string1', 1)" &
|
||||
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1)" &
|
||||
|
||||
wait
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP TABLE 02810_async_insert_dedup_collapsing"
|
Loading…
Reference in New Issue
Block a user