[remove before merging] Shuffle CHECK TABLE results

This commit is contained in:
vdimir 2023-10-23 10:38:39 +00:00
parent 3332f70275
commit 8bb2378952
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862

View File

@ -11,6 +11,7 @@
#include <Common/typeid_cast.h>
#include <Interpreters/ProcessList.h>
#include <algorithm>
#include <random>
#include <Columns/IColumn.h>
@ -140,6 +141,44 @@ private:
std::atomic_bool is_value_emitted{false};
};
class RandomShuffleTransform : public IAccumulatingTransform
{
public:
RandomShuffleTransform(const Block & header)
: IAccumulatingTransform(header, header)
{}
String getName() const override { return "TableCheckResultEmitter"; }
void consume(Chunk chunk) override
{
chunks.emplace_back(std::move(chunk));
}
Chunk generate() override
{
if (!is_shuffled.exchange(true))
{
std::random_device rd;
std::mt19937 gen(rd());
std::shuffle(chunks.begin(), chunks.end(), gen);
}
if (chunks.empty())
return {};
auto chunk = std::move(chunks.back());
chunks.pop_back();
return chunk;
}
private:
Chunks chunks;
std::atomic_bool is_shuffled{false};
};
}
InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextPtr context_)
@ -190,6 +229,18 @@ BlockIO InterpreterCheckQuery::execute()
resize_outport = &resize_processor->getOutputs().front();
}
/// TODO: for tesing only, remove after CI passed once
if (num_streams > 1)
{
auto shuffle_processor = std::make_shared<RandomShuffleTransform>(resize_outport->getHeader());
auto * input_port = &shuffle_processor->getInputPort();
connect(*resize_outport, *input_port);
processors->emplace_back(shuffle_processor);
assert(resize_processor->getOutputs().size() == 1);
resize_outport = &shuffle_processor->getOutputs().front();
}
if (settings.check_query_single_value_result)
{
auto emitter_processor = std::make_shared<TableCheckResultEmitter>();