upd full sorting join

This commit is contained in:
vdimir 2022-07-05 14:06:10 +00:00
parent b0c0ec370d
commit 881d352e05
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
5 changed files with 25 additions and 7 deletions

View File

@ -1098,6 +1098,8 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> ana
{
if (analyzed_join->getClauses().size() != 1)
throw Exception("Full sorting merge join is supported only for single-condition joins", ErrorCodes::NOT_IMPLEMENTED);
if (analyzed_join->isSpecialStorage())
throw Exception("Full sorting merge join is not supported for special storage", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<FullSortingMergeJoin>(analyzed_join, sample_block);
}
return std::make_shared<JoinSwitcher>(analyzed_join, sample_block);

View File

@ -202,7 +202,7 @@ public:
bool forceMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARTIAL_MERGE); }
bool allowParallelHashJoin() const;
bool forceFullSortingMergeJoin() const { return join_algorithm.isSet(JoinAlgorithm::FULL_SORTING_MERGE); }
bool forceFullSortingMergeJoin() const { return !isSpecialStorage() && join_algorithm.isSet(JoinAlgorithm::FULL_SORTING_MERGE); }
bool forceHashJoin() const
{

View File

@ -320,9 +320,9 @@ static void prepareChunk(Chunk & chunk)
void MergeJoinAlgorithm::initialize(Inputs inputs)
{
if (inputs.size() != 2)
throw Exception("MergeJoinAlgorithm requires exactly two inputs", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Two inputs arerequired, got {}", inputs.size());
LOG_DEBUG(log, "MergeJoinAlgorithm initialize, number of inputs: {}", inputs.size());
LOG_DEBUG(log, "Initialize, number of inputs: {}", inputs.size());
for (size_t i = 0; i < inputs.size(); ++i)
{
assert(inputs[i].chunk.getNumColumns() == cursors[i]->sampleBlock().columns());
@ -861,7 +861,7 @@ MergeJoinTransform::MergeJoinTransform(
void MergeJoinTransform::onFinish()
{
algorithm.onFinish(total_stopwatch.elapsedSeconds());
algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
}
}

View File

@ -232,14 +232,19 @@ public:
virtual void consume(Input & input, size_t source_num) override;
virtual Status merge() override;
void onFinish(double seconds)
void logElapsed(double seconds, bool force)
{
/// Do not log more frequently than once per ten seconds
if (seconds - stat.last_log_seconds < 10 && !force)
return;
LOG_TRACE(log,
"Finished pocessing in {} seconds"
", left: {} blocks, {} rows; right: {} blocks, {} rows"
", max blocks loaded to memory: {}",
seconds, stat.num_blocks[0], stat.num_rows[0], stat.num_blocks[1], stat.num_rows[1],
stat.max_blocks_loaded);
stat.last_log_seconds = seconds;
}
private:
@ -272,7 +277,10 @@ private:
size_t num_rows[2] = {0, 0};
size_t max_blocks_loaded = 0;
double last_log_seconds = 0;
};
Statistic stat;
Poco::Logger * log;
@ -280,6 +288,8 @@ private:
class MergeJoinTransform final : public IMergingTransform<MergeJoinAlgorithm>
{
using Base = IMergingTransform<MergeJoinAlgorithm>;
public:
MergeJoinTransform(
JoinPtr table_join,
@ -293,6 +303,12 @@ public:
protected:
void onFinish() override;
void work() override
{
algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
Base::work();
}
Poco::Logger * log;
};

View File

@ -311,7 +311,7 @@ QueryPipelineBuilderPtr QueryPipelineBuilder::mergePipelines(
Processors * collected_processors)
{
if (transform->getOutputs().size() != 1)
throw Exception("Merge transform must have exactly 1 output", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge transform must have exactly 1 output, got {}", transform->getOutputs().size());
connect(*left->pipe.output_ports.front(), transform->getInputs().front());
connect(*right->pipe.output_ports.front(), transform->getInputs().back());
@ -323,7 +323,7 @@ QueryPipelineBuilderPtr QueryPipelineBuilder::mergePipelines(
left->pipe.processors.emplace_back(transform);
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
left->pipe.holder = std::move(right->pipe.holder);
// left->pipe.holder = std::move(right->pipe.holder);
left->pipe.header = left->pipe.output_ports.front()->getHeader();
left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams);
return left;