update ut and refine logs

This commit is contained in:
Han Fei 2022-11-23 21:07:59 +01:00
parent 3125aca59d
commit d0f5fcdf09
2 changed files with 57 additions and 4 deletions

View File

@ -80,12 +80,19 @@ struct ReplicatedMergeTreeSink<async_insert>::DelayedChunk
namespace
{
/// Convert block id vector to string. Output at most 50 rows.
template<typename T>
inline String toString(const std::vector<T> & vec)
{
String res = "{";
for (const auto & item : vec)
res += DB::toString(item) + ",";
size_t size = vec.size();
if (size > 50) size = 50;
for (size_t i = 0; i < size; ++i)
{
res += DB::toString(vec[i]);
if (i + 1 < size)
res += ",";
}
return res + "}";
}
@ -137,7 +144,7 @@ namespace
idx++;
}
LOG_TRACE(log, "New block IDs: {}, new offsets: {}", toString(new_block_ids), toString(new_offsets));
LOG_TRACE(log, "New block IDs: {}, new offsets: {}, size: {}", toString(new_block_ids), toString(new_offsets), new_offsets.size());
offsets = std::move(new_offsets);
partition.block_id = std::move(new_block_ids);
@ -476,13 +483,14 @@ void ReplicatedMergeTreeSink<true>::finishDelayedChunk(const ZooKeeperWithFaultI
for (auto & partition: delayed_chunk->partitions)
{
int retry_times = 0;
while (true)
{
partition.temp_part.finalize();
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num, false);
if (conflict_block_ids.empty())
break;
LOG_DEBUG(log, "Found depulicate block IDs: {}", toString(conflict_block_ids));
LOG_DEBUG(log, "Found depulicate block IDs: {}, retry times {}", toString(conflict_block_ids), ++retry_times);
/// partition clean conflict
rewriteBlock(log, partition, conflict_block_ids);
if (partition.block_id.empty())

View File

@ -0,0 +1,45 @@
#include "config.h"
#include <gtest/gtest.h>
#include <Processors/Chunk.h>
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
namespace DB {
std::vector<ChunkOffsetsPtr> scatterOffsetsBySelector(ChunkOffsetsPtr chunk_offsets, const IColumn::Selector & selector, size_t partition_num);
class AsyncInsertsTest : public ::testing::TestPartResult
{};
TEST(AsyncInsertsTest, testScatterOffsetsBySelector)
{
auto testImpl = [](std::vector<size_t> offsets, std::vector<size_t> selector_data, size_t part_num, std::vector<std::vector<size_t>> expected)
{
auto offset_ptr = std::make_shared<ChunkOffsets>(offsets);
IColumn::Selector selector(selector_data.size());
size_t num_rows = selector_data.size();
for (size_t i = 0; i < num_rows; i++)
selector[i] = selector_data[i];
auto results = scatterOffsetsBySelector(offset_ptr, selector, part_num);
ASSERT_EQ(results.size(), expected.size());
for (size_t i = 0; i < results.size(); i++)
{
auto result = results[i]->offsets;
auto expect = expected[i];
ASSERT_EQ(result.size(), expect.size());
for (size_t j = 0; j < result.size(); j++)
ASSERT_EQ(result[j], expect[j]);
}
};
testImpl({5}, {0,1,0,1,0}, 2, {{3},{2}});
testImpl({5,10}, {0,1,0,1,0,1,0,1,0,1}, 2, {{3,5},{2,5}});
testImpl({4,8,12}, {0,1,0,1,0,2,0,2,1,2,1,2}, 3, {{2,4},{2,4},{2,4}});
testImpl({1,2,3,4,5}, {0,1,2,3,4}, 5, {{1},{1},{1},{1},{1}});
testImpl({3,6,10}, {1,1,1,2,2,2,0,0,0,0}, 3, {{4},{3},{3}});
}
}