Fix clang build.

This commit is contained in:
Nikolai Kochetov 2019-04-05 14:27:08 +03:00
parent 7626b1b267
commit 1470fe0a72
4 changed files with 21 additions and 76 deletions

View File

@ -741,18 +741,18 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (expressions.hasJoin())
{
Block header;
Block header_before_join;
if constexpr (pipeline_with_processors)
{
header = pipeline.getHeader();
header_before_join = pipeline.getHeader();
pipeline.addSimpleTransform([&](const Block & header){
return std::make_shared<ExpressionTransform>(header, expressions.before_join);
});
}
else
{
header = pipeline.firstStream()->getHeader();
header_before_join = pipeline.firstStream()->getHeader();
/// Applies to all sources except stream_with_non_joined_data.
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
@ -762,11 +762,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (isRightOrFull(join.kind))
{
auto stream = expressions.before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(
header, settings.max_block_size);
header_before_join, settings.max_block_size);
if constexpr (pipeline_with_processors)
{
auto source = std::make_shared<SourceFromInputStream>(header, std::move(stream));
auto source = std::make_shared<SourceFromInputStream>(header_before_join, std::move(stream));
pipeline.addDelayedStream(source);
}
else
@ -1434,14 +1434,14 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = pipeline.getHeader();
Block header_before_aggregation = pipeline.getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));
keys.push_back(header_before_aggregation.getPositionByName(name));
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header.getPositionByName(name));
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
const Settings & settings = context.getSettingsRef();
@ -1451,7 +1451,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
*/
bool allow_to_use_two_level_group_by = pipeline.getNumMainStreams() > 1 || settings.max_bytes_before_external_group_by != 0;
Aggregator::Params params(header, keys, aggregates,
Aggregator::Params params(header_before_aggregation, keys, aggregates,
overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
@ -1546,11 +1546,11 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bo
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = pipeline.getHeader();
Block header_before_merge = pipeline.getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));
keys.push_back(header_before_merge.getPositionByName(name));
/** There are two modes of distributed aggregation.
*
@ -1569,7 +1569,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bo
const Settings & settings = context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates, overflow_row, settings.max_threads);
Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
@ -1686,16 +1686,16 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block header = pipeline.getHeader();
Block header_before_transform = pipeline.getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name));
keys.push_back(header_before_transform.getPositionByName(name));
const Settings & settings = context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates,
Aggregator::Params params(header_before_transform, keys, aggregates,
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile,
SettingUInt64(0), SettingUInt64(0),

View File

@ -10,7 +10,7 @@ SourceFromInputStream::SourceFromInputStream(Block header, BlockInputStreamPtr s
Chunk SourceFromInputStream::generate()
{
if (finished)
if (stream_finished)
return {};
if (!initialized)
@ -23,7 +23,7 @@ Chunk SourceFromInputStream::generate()
if (!block)
{
stream->readSuffix();
finished = true;
stream_finished = true;
return {};
}

View File

@ -12,8 +12,6 @@ class SourceFromInputStream : public ISource
{
public:
SourceFromInputStream(Block header, BlockInputStreamPtr stream);
String getName() const override { return "SourceFromInputStream"; }
Chunk generate() override;
@ -22,7 +20,7 @@ public:
private:
bool initialized = false;
bool finished = false;
bool stream_finished = false;
BlockInputStreamPtr stream;
};

View File

@ -26,59 +26,6 @@ struct SharedChunk : Chunk
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
//template <typename TSortCursor>
//class Queue
//{
//public:
// bool empty() const { return queue.empty(); }
// void push(TSortCursor cursor) { queue.push(std::move(cursor)); }
//
// bool needUpdateCursor() const { return !empty() && !queue.top()->isLast(); }
//
// void updateCursor(TSortCursor cursor)
// {
// if (!needUpdateCursor())
// throw Exception("Do not need to update cursor for sort cursor queue.", ErrorCodes::LOGICAL_ERROR);
//
// if (cursor->order != queue.top()->order)
// throw Exception("Cannot update cursor for sort cursor queue because top cursor order "
// "(" + toString(queue.top()->order) + ") is not equal to new cursor order "
// "(" + toString(cursor->order) + ").", ErrorCodes::LOGICAL_ERROR);
// queue.pop();
// queue.push(cursor);
// }
//
// void dropCursor()
// {
// if (!needUpdateCursor())
// throw Exception("Do not need to update cursor for sort cursor queue.", ErrorCodes::LOGICAL_ERROR);
//
// queue.pop();
// }
//
// const TSortCursor & top() const
// {
// if (needUpdateCursor())
// throw Exception("Cannot get top element from sort cursor queue because "
// "need to update cursor.", ErrorCodes::LOGICAL_ERROR);
//
// return queue.top();
// }
//
// void pop()
// {
// if (needUpdateCursor())
// throw Exception("Cannot pop element from sort cursor queue because "
// "need to update cursor.", ErrorCodes::LOGICAL_ERROR);
// queue.pop();
// }
//
//private:
// /// Queue with SortCursors.
// using PriorityQueue = std::priority_queue<TSortCursor>;
// PriorityQueue queue;
//};
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
@ -132,7 +79,7 @@ protected:
++merged_rows;
}
void insertFromChunk(Chunk && chunk, size_t limit)
void insertFromChunk(Chunk && chunk, size_t limit_rows)
{
if (merged_rows)
throw Exception("Cannot insert to MergedData from Chunk because MergedData is not empty.",
@ -140,9 +87,9 @@ protected:
auto num_rows = chunk.getNumRows();
columns = chunk.mutateColumns();
if (limit && num_rows > limit)
if (limit_rows && num_rows > limit_rows)
for (auto & column : columns)
column = (*column->cut(0, limit)).mutate();
column = (*column->cut(0, limit_rows)).mutate();
total_merged_rows += num_rows;
merged_rows = num_rows;