diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 9cbf6258199..203797a0731 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1098,6 +1098,8 @@ static std::shared_ptr chooseJoinAlgorithm(std::shared_ptr ana { if (analyzed_join->getClauses().size() != 1) throw Exception("Full sorting merge join is supported only for single-condition joins", ErrorCodes::NOT_IMPLEMENTED); + if (analyzed_join->isSpecialStorage()) + throw Exception("Full sorting merge join is not supported for special storage", ErrorCodes::NOT_IMPLEMENTED); return std::make_shared(analyzed_join, sample_block); } return std::make_shared(analyzed_join, sample_block); diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index f953ebb111e..4210da6ae76 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -202,7 +202,7 @@ public: bool forceMergeJoin() const { return join_algorithm == MultiEnum(JoinAlgorithm::PARTIAL_MERGE); } bool allowParallelHashJoin() const; - bool forceFullSortingMergeJoin() const { return join_algorithm.isSet(JoinAlgorithm::FULL_SORTING_MERGE); } + bool forceFullSortingMergeJoin() const { return !isSpecialStorage() && join_algorithm.isSet(JoinAlgorithm::FULL_SORTING_MERGE); } bool forceHashJoin() const { diff --git a/src/Processors/Transforms/MergeJoinTransform.cpp b/src/Processors/Transforms/MergeJoinTransform.cpp index dc7462ce24f..5ad2ea77c13 100644 --- a/src/Processors/Transforms/MergeJoinTransform.cpp +++ b/src/Processors/Transforms/MergeJoinTransform.cpp @@ -320,9 +320,9 @@ static void prepareChunk(Chunk & chunk) void MergeJoinAlgorithm::initialize(Inputs inputs) { if (inputs.size() != 2) - throw Exception("MergeJoinAlgorithm requires exactly two inputs", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Two inputs arerequired, got {}", inputs.size()); - LOG_DEBUG(log, "MergeJoinAlgorithm initialize, number of inputs: {}", inputs.size()); + LOG_DEBUG(log, "Initialize, number of inputs: {}", inputs.size()); for (size_t i = 0; i < inputs.size(); ++i) { assert(inputs[i].chunk.getNumColumns() == cursors[i]->sampleBlock().columns()); @@ -861,7 +861,7 @@ MergeJoinTransform::MergeJoinTransform( void MergeJoinTransform::onFinish() { - algorithm.onFinish(total_stopwatch.elapsedSeconds()); + algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true); } } diff --git a/src/Processors/Transforms/MergeJoinTransform.h b/src/Processors/Transforms/MergeJoinTransform.h index d776900e205..5c588d5db94 100644 --- a/src/Processors/Transforms/MergeJoinTransform.h +++ b/src/Processors/Transforms/MergeJoinTransform.h @@ -232,14 +232,19 @@ public: virtual void consume(Input & input, size_t source_num) override; virtual Status merge() override; - void onFinish(double seconds) + void logElapsed(double seconds, bool force) { + /// Do not log more frequently than once per ten seconds + if (seconds - stat.last_log_seconds < 10 && !force) + return; + LOG_TRACE(log, "Finished pocessing in {} seconds" ", left: {} blocks, {} rows; right: {} blocks, {} rows" ", max blocks loaded to memory: {}", seconds, stat.num_blocks[0], stat.num_rows[0], stat.num_blocks[1], stat.num_rows[1], stat.max_blocks_loaded); + stat.last_log_seconds = seconds; } private: @@ -272,7 +277,10 @@ private: size_t num_rows[2] = {0, 0}; size_t max_blocks_loaded = 0; + + double last_log_seconds = 0; }; + Statistic stat; Poco::Logger * log; @@ -280,6 +288,8 @@ private: class MergeJoinTransform final : public IMergingTransform { + using Base = IMergingTransform; + public: MergeJoinTransform( JoinPtr table_join, @@ -293,6 +303,12 @@ public: protected: void onFinish() override; + void work() override + { + algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true); + Base::work(); + } + Poco::Logger * log; }; diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 353f37836de..ec186971b50 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -311,7 +311,7 @@ QueryPipelineBuilderPtr QueryPipelineBuilder::mergePipelines( Processors * collected_processors) { if (transform->getOutputs().size() != 1) - throw Exception("Merge transform must have exactly 1 output", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge transform must have exactly 1 output, got {}", transform->getOutputs().size()); connect(*left->pipe.output_ports.front(), transform->getInputs().front()); connect(*right->pipe.output_ports.front(), transform->getInputs().back()); @@ -323,7 +323,7 @@ QueryPipelineBuilderPtr QueryPipelineBuilder::mergePipelines( left->pipe.processors.emplace_back(transform); left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end()); - left->pipe.holder = std::move(right->pipe.holder); + // left->pipe.holder = std::move(right->pipe.holder); left->pipe.header = left->pipe.output_ports.front()->getHeader(); left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams); return left;