deduplicate async inserts in the same block earlier

This commit is contained in:
Han Fei 2022-12-28 17:25:36 +01:00
parent 05c27f4e15
commit 66111ef241
2 changed files with 156 additions and 69 deletions

View File

@ -2,9 +2,16 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Interpreters/PartLog.h>
#include "Common/Exception.h"
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ThreadFuzzer.h>
#include "Columns/IColumn.h"
#include "Core/ColumnWithTypeAndName.h"
#include "Core/ColumnsWithTypeAndName.h"
#include "DataTypes/Serializations/ISerialization.h"
#include "Processors/Chunk.h"
#include "Storages/MergeTree/MergeTreeDataWriter.h"
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
#include <IO/Operators.h>
@ -41,15 +48,17 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
{
struct Partition
{
Poco::Logger * log;
MergeTreeDataWriter::TemporaryPart temp_part;
UInt64 elapsed_ns;
BlockIDsType block_id;
BlockWithPartition block_with_partition;
std::unordered_map<String, size_t> block_id_to_offset_idx;
std::unordered_map<String, std::vector<size_t>> block_id_to_offset_idx;
Partition() = default;
Partition(MergeTreeDataWriter::TemporaryPart && temp_part_, UInt64 elapsed_ns_, BlockIDsType && block_id_, BlockWithPartition && block_)
: temp_part(std::move(temp_part_)),
Partition(Poco::Logger * log_, MergeTreeDataWriter::TemporaryPart && temp_part_, UInt64 elapsed_ns_, BlockIDsType && block_id_, BlockWithPartition && block_)
: log(log_),
temp_part(std::move(temp_part_)),
elapsed_ns(elapsed_ns_),
block_id(std::move(block_id_)),
block_with_partition(std::move(block_))
@ -64,55 +73,56 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
block_id_to_offset_idx.clear();
for (size_t i = 0; i < block_id.size(); ++i)
{
block_id_to_offset_idx[block_id[i]] = i;
block_id_to_offset_idx[block_id[i]].push_back(i);
}
}
}
};
DelayedChunk() = default;
explicit DelayedChunk(size_t replicas_num_) : replicas_num(replicas_num_) {}
size_t replicas_num = 0;
std::vector<Partition> partitions;
};
namespace
bool checkSelfDeduplicate()
{
/// Convert block id vector to string. Output at most 50 ids.
template<typename T>
inline String toString(const std::vector<T> & vec)
if constexpr (async_insert)
{
size_t size = vec.size();
if (size > 50) size = 50;
return fmt::format("({})", fmt::join(vec.begin(), vec.begin() + size, ","));
std::vector<String> dup_block_ids;
for (const auto & [hash_id, offset_idxes] : block_id_to_offset_idx)
{
if (offset_idxes.size() > 1)
dup_block_ids.push_back(hash_id);
}
if (dup_block_ids.empty())
return false;
rewriteBlock(dup_block_ids, true);
return true;
}
return false;
}
/// remove the conflict parts of block for rewriting again.
void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition & partition, const std::vector<String> & block_paths)
void rewriteBlock(const std::vector<String> & block_paths, bool self_dedup)
{
if constexpr (async_insert)
{
std::vector<size_t> offset_idx;
for (const auto & raw_path : block_paths)
{
std::filesystem::path p(raw_path);
String conflict_block_id = p.filename();
auto it = partition.block_id_to_offset_idx.find(conflict_block_id);
if (it == partition.block_id_to_offset_idx.end())
auto it = block_id_to_offset_idx.find(conflict_block_id);
if (it == block_id_to_offset_idx.end())
throw Exception("Unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR);
offset_idx.push_back(it->second);
/// in case of self dedup, we should remain one insert
offset_idx.insert(std::end(offset_idx), std::begin(it->second) + self_dedup, std::end(it->second));
}
std::sort(offset_idx.begin(), offset_idx.end());
auto & offsets = partition.block_with_partition.offsets->offsets;
auto & offsets = block_with_partition.offsets->offsets;
size_t idx = 0, remove_count = 0;
auto it = offset_idx.begin();
std::vector<size_t> new_offsets;
std::vector<String> new_block_ids;
/// construct filter
size_t rows = partition.block_with_partition.block.rows();
size_t rows = block_with_partition.block.rows();
auto filter_col = ColumnUInt8::create(rows, 1u);
ColumnUInt8::Container & vec = filter_col->getData();
UInt8 * pos = vec.data();
@ -133,7 +143,7 @@ namespace
else
{
new_offsets.push_back(offset - remove_count);
new_block_ids.push_back(partition.block_id[idx]);
new_block_ids.push_back(block_id[idx]);
}
idx++;
}
@ -141,17 +151,67 @@ namespace
LOG_TRACE(log, "New block IDs: {}, new offsets: {}, size: {}", toString(new_block_ids), toString(new_offsets), new_offsets.size());
offsets = std::move(new_offsets);
partition.block_id = std::move(new_block_ids);
auto cols = partition.block_with_partition.block.getColumns();
block_id = std::move(new_block_ids);
auto cols = block_with_partition.block.getColumns();
for (auto & col : cols)
{
col = col->filter(vec, rows - remove_count);
}
partition.block_with_partition.block.setColumns(cols);
block_with_partition.block.setColumns(cols);
LOG_TRACE(log, "New block rows {}", partition.block_with_partition.block.rows());
LOG_TRACE(log, "New block rows {}", block_with_partition.block.rows());
partition.initBlockIDMap();
initBlockIDMap();
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "sync insert should not call rewriteBlock");
}
}
};
DelayedChunk() = default;
explicit DelayedChunk(size_t replicas_num_) : replicas_num(replicas_num_) {}
size_t replicas_num = 0;
std::vector<Partition> partitions;
};
std::vector<Int64> testSelfDeduplicate(std::vector<Int64> data, std::vector<size_t> offsets, std::vector<String> hashes)
{
MutableColumnPtr column = DataTypeInt64().createColumn();
auto offset_ptr = std::make_shared<ChunkOffsets>(offsets);
for (auto datum : data)
{
column->insert(datum);
}
Block block({ColumnWithTypeAndName(std::move(column), DataTypePtr(new DataTypeInt64()), "a")});
BlockWithPartition block1(std::move(block), Row(), offset_ptr);
ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition part(
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1));
part.checkSelfDeduplicate();
ColumnPtr col = part.block_with_partition.block.getColumns()[0];
std::vector<Int64> result;
for (size_t i = 0; i < col->size(); i++)
{
result.push_back(col->getInt(i));
}
return result;
}
namespace
{
/// Convert block id vector to string. Output at most 50 ids.
template<typename T>
inline String toString(const std::vector<T> & vec)
{
size_t size = vec.size();
if (size > 50) size = 50;
return fmt::format("({})", fmt::join(vec.begin(), vec.begin() + size, ","));
}
std::vector<String> getHashesForBlocks(BlockWithPartition & block, String partition_id)
@ -416,6 +476,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
}
partitions.emplace_back(DelayedPartition(
log,
std::move(temp_part),
elapsed_ns,
std::move(block_id),
@ -479,6 +540,14 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
for (auto & partition: delayed_chunk->partitions)
{
int retry_times = 0;
/// users may have lots of same inserts. It will be helpful to deduplicate in advance.
if (partition.checkSelfDeduplicate())
{
LOG_TRACE(log, "found duplicated inserts in the block");
partition.block_with_partition.partition = std::move(partition.temp_part.part->partition.value);
partition.temp_part = storage.writer.writeTempPart(partition.block_with_partition, metadata_snapshot, context);
}
while (true)
{
partition.temp_part.finalize();
@ -488,7 +557,7 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
++retry_times;
LOG_DEBUG(log, "Found duplicate block IDs: {}, retry times {}", toString(conflict_block_ids), retry_times);
/// partition clean conflict
rewriteBlock(log, partition, conflict_block_ids);
partition.rewriteBlock(conflict_block_ids, false);
if (partition.block_id.empty())
break;
partition.block_with_partition.partition = std::move(partition.temp_part.part->partition.value);

View File

@ -1,3 +1,4 @@
#include "Storages/MergeTree/ReplicatedMergeTreeSink.h"
#include "config.h"
#include <gtest/gtest.h>
@ -42,4 +43,21 @@ TEST(AsyncInsertsTest, testScatterOffsetsBySelector)
test_impl({3,6,10}, {1,1,1,2,2,2,0,0,0,0}, 3, {{4},{3},{3}});
}
std::vector<Int64> testSelfDeduplicate(std::vector<Int64> data, std::vector<size_t> offsets, std::vector<String> hashes);
TEST(AsyncInsertsTest, testSelfDeduplicate)
{
auto test_impl = [](std::vector<Int64> data, std::vector<size_t> offsets, std::vector<String> hashes, std::vector<Int64> answer)
{
auto result = testSelfDeduplicate(data, offsets, hashes);
ASSERT_EQ(answer.size(), result.size());
for (size_t i = 0; i < result.size(); i++)
ASSERT_EQ(answer[i], result[i]);
};
test_impl({1,2,3,1,2,3,4,5,6,1,2,3},{3,6,9,12},{"a","a","b","a"},{1,2,3,4,5,6});
test_impl({1,2,3,1,2,3,1,2,3,1,2,3},{2,3,5,6,8,9,11,12},{"a","b","a","b","a","b","a","b"},{1,2,3});
test_impl({1,2,3,1,2,4,1,2,5,1,2},{2,3,5,6,8,9,11},{"a","b","a","c","a","d","a"},{1,2,3,4,5});
test_impl({1,2,1,2,1,2,1,2,1,2},{2,4,6,8,10},{"a","a","a","a","a"},{1,2});
}
}