Add randomized test FullSortingJoin.AsofGeneratedTestData

This commit is contained in:
vdimir 2023-09-21 11:19:20 +00:00
parent 984d94e5f1
commit 6330b466aa
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
4 changed files with 116 additions and 17 deletions

View File

@ -6,12 +6,17 @@ namespace DB
{
String getRandomASCIIString(size_t length)
{
return getRandomASCIIString(length, thread_local_rng);
}
String getRandomASCIIString(size_t length, pcg64 & rng)
{
std::uniform_int_distribution<int> distribution('a', 'z');
String res;
res.resize(length);
for (auto & c : res)
c = distribution(thread_local_rng);
c = distribution(rng);
return res;
}

View File

@ -2,11 +2,14 @@
#include <Core/Types.h>
#include <pcg_random.hpp>
namespace DB
{
/// Slow random string. Useful for random names and things like this. Not for generating data.
String getRandomASCIIString(size_t length);
String getRandomASCIIString(size_t length, pcg64 & rng);
}

View File

@ -874,7 +874,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::asofJoin()
for (auto & col : rcols)
result.addColumn(std::move(col));
}
UNUSED(asof_inequality);
return Status(std::move(result));
}

View File

@ -1,30 +1,31 @@
#include <gtest/gtest.h>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
#include <random>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Sources/SourceFromChunks.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Columns/ColumnsNumber.h>
#include <Common/getRandomASCIIString.h>
#include <Common/randomSeed.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/TableJoin.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Sources/SourceFromChunks.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/MergeJoinTransform.h>
#include <QueryPipeline/QueryPipeline.h>
using namespace DB;
UInt64 getAndPrintRandomSeed()
{
UInt64 seed = randomSeed();
std::cerr << "TEST_RANDOM_SEED: " << seed << std::endl;
std::cerr << __FILE__ << "::" << "TEST_RANDOM_SEED = " << seed << "ull" << std::endl;
return seed;
}
@ -132,6 +133,8 @@ catch (Exception & e)
class SourceChunksBuilder
{
public:
double break_prob = 0.0;
explicit SourceChunksBuilder(const Block & header_)
: header(header_)
{
@ -144,6 +147,10 @@ public:
chassert(row.size() == current_chunk.size());
for (size_t i = 0; i < current_chunk.size(); ++i)
current_chunk[i]->insert(row[i]);
if (break_prob > 0.0 && std::uniform_real_distribution<>(0.0, 1.0)(rng) < break_prob)
addChunk();
return *this;
}
@ -184,7 +191,7 @@ std::vector<std::vector<Field>> getValuesFromBlock(const Block & block, const Na
}
Block executePipeline(QueryPipeline & pipeline)
Block executePipeline(QueryPipeline && pipeline)
{
PullingPipelineExecutor executor(pipeline);
@ -233,7 +240,7 @@ try
left_source, right_source, /* key_length = */ 2,
JoinKind::Inner, JoinStrictness::Asof, ASOFJoinInequality::LessOrEquals);
Block result_block = executePipeline(pipeline);
Block result_block = executePipeline(std::move(pipeline));
auto values = getValuesFromBlock(result_block, {"t1.key", "t1.t", "t2.t", "t2.value"});
ASSERT_EQ(values, (std::vector<std::vector<Field>>{
@ -253,13 +260,12 @@ try
{
auto left_source = oneColumnSource({ {3}, {3, 3, 3}, {3, 5, 5, 6}, {9, 9}, {10, 20} });
UInt64 p = std::uniform_int_distribution<>(0, 2)(rng);
SourceChunksBuilder right_source_builder({
{std::make_shared<DataTypeUInt64>(), "t"},
{std::make_shared<DataTypeUInt64>(), "value"},
});
UInt64 p = std::uniform_int_distribution<>(0, 2)(rng);
double break_prob = p == 0 ? 0.0 : (p == 1 ? 0.5 : 1.0);
std::uniform_real_distribution<> prob_dis(0.0, 1.0);
for (const auto & row : std::vector<std::vector<Field>>{ {1, 101}, {2, 102}, {4, 104}, {5, 105}, {11, 111}, {15, 115} })
@ -274,7 +280,7 @@ try
left_source, right_source, /* key_length = */ 1,
JoinKind::Inner, JoinStrictness::Asof, ASOFJoinInequality::LessOrEquals);
Block result_block = executePipeline(pipeline);
Block result_block = executePipeline(std::move(pipeline));
ASSERT_EQ(
assert_cast<const ColumnUInt64 *>(result_block.getByName("t1.x").column.get())->getData(),
@ -296,3 +302,88 @@ catch (Exception & e)
std::cout << e.getStackTraceString() << std::endl;
throw;
}
TEST(FullSortingJoin, AsofGeneratedTestData)
try
{
auto left_source_builder = SourceChunksBuilder({
{std::make_shared<DataTypeUInt64>(), "k1"},
{std::make_shared<DataTypeString>(), "k2"},
{std::make_shared<DataTypeUInt64>(), "t"},
{std::make_shared<DataTypeUInt64>(), "attr"},
});
auto right_source_builder = SourceChunksBuilder({
{std::make_shared<DataTypeUInt64>(), "k1"},
{std::make_shared<DataTypeString>(), "k2"},
{std::make_shared<DataTypeUInt64>(), "t"},
{std::make_shared<DataTypeUInt64>(), "attr"},
});
/// uniform_int_distribution to have 0.0 and 1.0 probabilities
left_source_builder.break_prob = std::uniform_int_distribution<>(0, 5)(rng) / 5.0;
right_source_builder.break_prob = std::uniform_int_distribution<>(0, 5)(rng) / 5.0;
auto get_next_key = [](UInt64 & k1, String & k2)
{
size_t str_len = std::uniform_int_distribution<>(1, 10)(rng);
String new_k2 = getRandomASCIIString(str_len, rng);
if (new_k2.compare(k2) <= 0)
++k1;
k2 = new_k2;
};
ColumnUInt64::Container expected;
UInt64 k1 = 0;
String k2 = "asdfg";
auto key_num_total = std::uniform_int_distribution<>(1, 1000)(rng);
for (size_t key_num = 0; key_num < key_num_total; ++key_num)
{
UInt64 left_t = 0;
size_t num_left_rows = std::uniform_int_distribution<>(1, 100)(rng);
for (size_t i = 0; i < num_left_rows; ++i)
{
left_t += std::uniform_int_distribution<>(1, 10)(rng);
left_source_builder.addRow({k1, k2, left_t, 10 * left_t});
expected.push_back(10 * left_t);
auto num_matches = 1 + std::poisson_distribution<>(4)(rng);
size_t right_t = left_t;
for (size_t j = 0; j < num_matches; ++j)
{
right_t += std::uniform_int_distribution<>(0, 3)(rng);
right_source_builder.addRow({k1, k2, right_t, j == 0 ? 100 * left_t : 0});
}
/// next left_t should be greater than right_t not to match with previous rows
left_t = right_t;
}
/// generate some rows with greater left_t to check that they are not matched
num_left_rows = std::uniform_int_distribution<>(1, 100)(rng);
for (size_t i = 0; i < num_left_rows; ++i)
{
left_t += std::uniform_int_distribution<>(1, 10)(rng);
left_source_builder.addRow({k1, k2, left_t, 10 * left_t});
}
get_next_key(k1, k2);
}
Block result_block = executePipeline(buildJoinPipeline(
left_source_builder.build(), right_source_builder.build(),
/* key_length = */ 3,
JoinKind::Inner, JoinStrictness::Asof, ASOFJoinInequality::LessOrEquals));
ASSERT_EQ(assert_cast<const ColumnUInt64 *>(block.getByName("t1.attr").column.get())->getData(), expected);
for (auto & e : expected)
e = 10 * e;
ASSERT_EQ(assert_cast<const ColumnUInt64 *>(block.getByName("t2.attr").column.get())->getData(), expected);
}
catch (Exception & e) {
std::cout << e.getStackTraceString() << std::endl;
throw;
}