Merge pull request #6684 from yandex/processors-2

Processors 2
This commit is contained in:
Nikolai Kochetov 2019-09-13 11:58:04 +03:00 committed by GitHub
commit 86bee0b8b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 724 additions and 115 deletions

View File

@ -545,6 +545,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
{
auto & pipeline = state.io.pipeline;
if (pipeline.getMaxThreads())
num_threads = pipeline.getMaxThreads();
/// Send header-block, to allow client to prepare output format for data to send.
{
auto & header = pipeline.getHeader();
@ -594,7 +597,15 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
lazy_format->finish();
lazy_format->clearQueue();
pool.wait();
try
{
pool.wait();
}
catch (...)
{
/// If exception was thrown during pipeline execution, skip it while processing other exception.
}
pipeline = QueryPipeline()
);

View File

@ -238,6 +238,10 @@ void Adam::read(ReadBuffer & buf)
void Adam::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
{
auto & adam_rhs = static_cast<const Adam &>(rhs);
if (adam_rhs.average_gradient.empty())
return;
if (average_gradient.empty())
{
if (!average_squared_gradient.empty() ||

View File

@ -508,6 +508,13 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
}
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
if (isCancelled())
return true;
@ -538,7 +545,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(block.safeGetByPosition(params.keys[i]).column->convertToFullColumnIfConst());
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (!result.isLowCardinality())
@ -559,7 +566,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
materialized_columns.push_back(block.safeGetByPosition(params.aggregates[i].arguments[j]).column->convertToFullColumnIfConst());
materialized_columns.push_back(columns.at(params.aggregates[i].arguments[j])->convertToFullColumnIfConst());
aggregate_columns[i][j] = materialized_columns.back().get();
auto column_no_lc = recursiveRemoveLowCardinality(aggregate_columns[i][j]->getPtr());
@ -579,8 +586,6 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
if (isCancelled())
return true;
size_t rows = block.rows();
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
@ -593,7 +598,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
/// For the case when there are no keys (all aggregate into one row).
if (result.type == AggregatedDataVariants::Type::without_key)
{
executeWithoutKeyImpl(result.without_key, rows, aggregate_functions_instructions.data(), result.aggregates_pool);
executeWithoutKeyImpl(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool);
}
else
{
@ -602,7 +607,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, aggregate_functions_instructions.data(), \
executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, aggregate_functions_instructions.data(), \
no_more_keys, overflow_row_ptr);
if (false) {}
@ -740,6 +745,30 @@ Block Aggregator::convertOneBucketToBlock(
return block;
}
Block Aggregator::mergeAndConvertOneBucketToBlock(
ManyAggregatedDataVariants & variants,
Arena * arena,
bool final,
size_t bucket) const
{
auto & merged_data = *variants[0];
auto method = merged_data.type;
Block block;
if (false) {}
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
mergeBucketImpl<decltype(merged_data.NAME)::element_type>(variants, bucket, arena); \
block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
return block;
}
template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
@ -1635,9 +1664,7 @@ private:
}
};
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const
ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED);
@ -1651,7 +1678,7 @@ std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
non_empty_data.push_back(data);
if (non_empty_data.empty())
return std::make_unique<NullBlockInputStream>(getHeader(final));
return {};
if (non_empty_data.size() > 1)
{
@ -1695,6 +1722,17 @@ std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
non_empty_data[i]->aggregates_pools.begin(), non_empty_data[i]->aggregates_pools.end());
}
return non_empty_data;
}
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{
ManyAggregatedDataVariants non_empty_data = prepareVariantsToMerge(data_variants);
if (non_empty_data.empty())
return std::make_unique<NullBlockInputStream>(getHeader(final));
return std::make_unique<MergingAndConvertingBlockInputStream>(*this, non_empty_data, final, max_threads);
}

View File

@ -744,6 +744,7 @@ struct AggregatedDataVariants : private boost::noncopyable
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingBlockInputStream.)
@ -845,6 +846,10 @@ public:
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
*
@ -857,6 +862,7 @@ public:
/** Merge several aggregation data structures and output the result as a block stream.
*/
std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
ManyAggregatedDataVariants prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const;
/** Merge the stream of partially aggregated blocks into one data structure.
* (Pre-aggregate several blocks that represent the result of independent aggregations from remote servers.)
@ -911,6 +917,8 @@ public:
protected:
friend struct AggregatedDataVariants;
friend class MergingAndConvertingBlockInputStream;
friend class ConvertingAggregatedToChunksTransform;
friend class ConvertingAggregatedToChunksSource;
Params params;
@ -1091,6 +1099,12 @@ protected:
bool final,
size_t bucket) const;
Block mergeAndConvertOneBucketToBlock(
ManyAggregatedDataVariants & variants,
Arena * arena,
bool final,
size_t bucket) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
@ -1162,5 +1176,4 @@ APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}

View File

@ -265,7 +265,7 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
BlockIO block_io = executeQuery(select_query, context, true);
BlockIO block_io = executeQuery(select_query, context, true, QueryProcessingStage::Complete, false, false);
Block res = block_io.in->read();
if (res && block_io.in->read())

View File

@ -411,6 +411,7 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
QueryPipeline InterpreterSelectQuery::executeWithProcessors()
{
QueryPipeline query_pipeline;
query_pipeline.setMaxThreads(context.getSettingsRef().max_threads);
executeImpl(query_pipeline, input);
return query_pipeline;
}
@ -1635,6 +1636,9 @@ void InterpreterSelectQuery::executeFetchColumns(
if constexpr (pipeline_with_processors)
{
if (streams.size() == 1)
pipeline.setMaxThreads(streams.size());
/// Unify streams. They must have same headers.
if (streams.size() > 1)
{
@ -1657,6 +1661,9 @@ void InterpreterSelectQuery::executeFetchColumns(
Processors sources;
sources.reserve(streams.size());
/// Pin sources for merge tree tables.
bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
for (auto & stream : streams)
{
bool force_add_agg_info = processing_stage == QueryProcessingStage::WithMergeableState;
@ -1665,8 +1672,10 @@ void InterpreterSelectQuery::executeFetchColumns(
if (processing_stage == QueryProcessingStage::Complete)
source->addTotalsPort();
sources.emplace_back(std::move(source));
if (pin_sources)
source->setStream(sources.size());
sources.emplace_back(std::move(source));
}
pipeline.init(std::move(sources));
@ -1822,9 +1831,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumMainStreams() > 1)
{
pipeline.resize(max_streams);
/// Add resize transform to uniformly distribute data between aggregating streams.
pipeline.resize(pipeline.getNumMainStreams(), true);
auto many_data = std::make_shared<ManyAggregatedData>(max_streams);
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumMainStreams());
auto merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);

View File

@ -182,7 +182,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr)
ReadBuffer * istr,
bool allow_processors)
{
time_t current_time = time(nullptr);
@ -300,7 +301,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
bool use_processors = settings.experimental_use_processors && interpreter->canExecuteWithProcessors();
bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors();
if (use_processors)
pipeline = interpreter->executeWithProcessors();
@ -549,11 +550,12 @@ BlockIO executeQuery(
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data)
bool may_have_embedded_data,
bool allow_processors)
{
BlockIO streams;
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr);
internal, stage, !may_have_embedded_data, nullptr, allow_processors);
return streams;
}
@ -604,7 +606,7 @@ void executeQuery(
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr);
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true);
auto & pipeline = streams.pipeline;

View File

@ -43,7 +43,8 @@ BlockIO executeQuery(
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, /// To which stage the query must be executed.
bool may_have_embedded_data = false /// If insert query may have embedded data
bool may_have_embedded_data = false, /// If insert query may have embedded data
bool allow_processors = true /// If can use processors pipeline
);

View File

@ -9,6 +9,7 @@
#include <boost/lockfree/queue.hpp>
#include <Common/Stopwatch.h>
#include <Processors/ISource.h>
namespace DB
{
@ -155,9 +156,9 @@ void PipelineExecutor::addJob(ExecutionState * execution_state)
{
try
{
Stopwatch watch;
// Stopwatch watch;
executeJob(execution_state->processor);
execution_state->execution_time_ns += watch.elapsed();
// execution_state->execution_time_ns += watch.elapsed();
++execution_state->num_executed_jobs;
}
@ -388,7 +389,17 @@ void PipelineExecutor::finish()
finished = true;
}
task_queue_condvar.notify_all();
std::lock_guard guard(executor_contexts_mutex);
for (auto & context : executor_contexts)
{
{
std::lock_guard lock(context->mutex);
context->wake_flag = true;
}
context->condvar.notify_one();
}
}
void PipelineExecutor::execute(size_t num_threads)
@ -461,38 +472,122 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
}
};
auto wake_up_executor = [&](size_t executor)
{
std::lock_guard guard(executor_contexts[executor]->mutex);
executor_contexts[executor]->wake_flag = true;
executor_contexts[executor]->condvar.notify_one();
};
auto process_pinned_tasks = [&](Queue & queue)
{
Queue tmp_queue;
struct PinnedTask
{
ExecutionState * task;
size_t thread_num;
};
std::stack<PinnedTask> pinned_tasks;
while (!queue.empty())
{
auto task = queue.front();
queue.pop();
auto stream = task->processor->getStream();
if (stream != IProcessor::NO_STREAM)
pinned_tasks.push({.task = task, .thread_num = stream % num_threads});
else
tmp_queue.push(task);
}
if (!pinned_tasks.empty())
{
std::stack<size_t> threads_to_wake;
{
std::lock_guard lock(task_queue_mutex);
while (!pinned_tasks.empty())
{
auto & pinned_task = pinned_tasks.top();
auto thread = pinned_task.thread_num;
executor_contexts[thread]->pinned_tasks.push(pinned_task.task);
pinned_tasks.pop();
if (threads_queue.has(thread))
{
threads_queue.pop(thread);
threads_to_wake.push(thread);
}
}
}
while (!threads_to_wake.empty())
{
wake_up_executor(threads_to_wake.top());
threads_to_wake.pop();
}
}
queue.swap(tmp_queue);
};
while (!finished)
{
/// First, find any processor to execute.
/// Just travers graph and prepare any processor.
while (!finished)
{
std::unique_lock lock(task_queue_mutex);
if (!task_queue.empty())
{
state = task_queue.front();
task_queue.pop();
break;
std::unique_lock lock(task_queue_mutex);
if (!executor_contexts[thread_num]->pinned_tasks.empty())
{
state = executor_contexts[thread_num]->pinned_tasks.front();
executor_contexts[thread_num]->pinned_tasks.pop();
break;
}
if (!task_queue.empty())
{
state = task_queue.front();
task_queue.pop();
if (!task_queue.empty() && !threads_queue.empty())
{
auto thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);
}
break;
}
if (threads_queue.size() + 1 == num_threads)
{
lock.unlock();
finish();
break;
}
threads_queue.push(thread_num);
}
++num_waiting_threads;
if (num_waiting_threads == num_threads)
{
finished = true;
lock.unlock();
task_queue_condvar.notify_all();
break;
std::unique_lock lock(executor_contexts[thread_num]->mutex);
executor_contexts[thread_num]->condvar.wait(lock, [&]
{
return finished || executor_contexts[thread_num]->wake_flag;
});
executor_contexts[thread_num]->wake_flag = false;
}
task_queue_condvar.wait(lock, [&]()
{
return finished || !task_queue.empty();
});
--num_waiting_threads;
}
if (finished)
@ -506,9 +601,9 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
addJob(state);
{
Stopwatch execution_time_watch;
// Stopwatch execution_time_watch;
state->job();
execution_time_ns += execution_time_watch.elapsed();
// execution_time_ns += execution_time_watch.elapsed();
}
if (state->exception)
@ -517,7 +612,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (finished)
break;
Stopwatch processing_time_watch;
// Stopwatch processing_time_watch;
/// Try to execute neighbour processor.
{
@ -535,7 +630,9 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Process all neighbours. Children will be on the top of stack, then parents.
prepare_all_processors(queue, children, children, parents);
process_pinned_tasks(queue);
/// Take local task from queue if has one.
if (!state && !queue.empty())
{
state = queue.front();
@ -543,10 +640,25 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
}
prepare_all_processors(queue, parents, parents, parents);
process_pinned_tasks(queue);
/// Take pinned task if has one.
{
std::lock_guard guard(task_queue_mutex);
if (!executor_contexts[thread_num]->pinned_tasks.empty())
{
if (state)
queue.push(state);
state = executor_contexts[thread_num]->pinned_tasks.front();
executor_contexts[thread_num]->pinned_tasks.pop();
}
}
/// Push other tasks to global queue.
if (!queue.empty())
{
std::lock_guard lock(task_queue_mutex);
std::unique_lock lock(task_queue_mutex);
while (!queue.empty() && !finished)
{
@ -554,7 +666,12 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
queue.pop();
}
task_queue_condvar.notify_all();
if (!threads_queue.empty())
{
auto thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);
}
}
--num_processing_executors;
@ -562,7 +679,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
doExpandPipeline(task, false);
}
processing_time_ns += processing_time_watch.elapsed();
// processing_time_ns += processing_time_watch.elapsed();
}
}
@ -580,9 +697,15 @@ void PipelineExecutor::executeImpl(size_t num_threads)
{
Stack stack;
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
threads_queue.init(num_threads);
{
std::lock_guard guard(executor_contexts_mutex);
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
}
auto thread_group = CurrentThread::getGroup();
@ -598,7 +721,8 @@ void PipelineExecutor::executeImpl(size_t num_threads)
finish();
for (auto & thread : threads)
thread.join();
if (thread.joinable())
thread.join();
}
);
@ -639,7 +763,8 @@ void PipelineExecutor::executeImpl(size_t num_threads)
}
for (auto & thread : threads)
thread.join();
if (thread.joinable())
thread.join();
finished_flag = true;
}

View File

@ -1,14 +1,14 @@
#pragma once
#include <queue>
#include <stack>
#include <Processors/IProcessor.h>
#include <mutex>
#include <Processors/Executors/ThreadsQueue.h>
#include <Common/ThreadPool.h>
#include <Common/EventCounter.h>
#include <common/logger_useful.h>
#include <boost/lockfree/stack.hpp>
#include <queue>
#include <stack>
#include <mutex>
namespace DB
{
@ -122,17 +122,15 @@ private:
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue task_queue;
ThreadsQueue threads_queue;
std::mutex task_queue_mutex;
std::condition_variable task_queue_condvar;
std::atomic_bool cancelled;
std::atomic_bool finished;
Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");
/// Num threads waiting condvar. Last thread finish execution if task_queue is empty.
size_t num_waiting_threads = 0;
/// Things to stop execution to expand pipeline.
struct ExpandPipelineTask
{
@ -155,9 +153,16 @@ private:
/// Will store context for all expand pipeline tasks (it's easy and we don't expect many).
/// This can be solved by using atomic shard ptr.
std::list<ExpandPipelineTask> task_list;
std::condition_variable condvar;
std::mutex mutex;
bool wake_flag = false;
std::queue<ExecutionState *> pinned_tasks;
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
std::mutex executor_contexts_mutex;
/// Processor ptr -> node number
using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>;

View File

@ -0,0 +1,70 @@
#pragma once
namespace DB
{
/// Simple struct which stores threads with numbers [0 .. num_threads - 1].
/// Allows to push and pop specified thread, or pop any thread if has.
/// Oll operations (except init) are O(1). No memory allocations after init happen.
struct ThreadsQueue
{
void init(size_t num_threads)
{
stack_size = 0;
stack.clear();
thread_pos_in_stack.clear();
stack.reserve(num_threads);
thread_pos_in_stack.reserve(num_threads);
for (size_t thread = 0; thread < num_threads; ++thread)
{
stack.emplace_back(thread);
thread_pos_in_stack.emplace_back(thread);
}
}
bool has(size_t thread) const { return thread_pos_in_stack[thread] < stack_size; }
size_t size() const { return stack_size; }
bool empty() const { return stack_size == 0; }
void push(size_t thread)
{
if (unlikely(has(thread)))
throw Exception("Can't push thread because it is already in threads queue.", ErrorCodes::LOGICAL_ERROR);
swap_threads(thread, stack[stack_size]);
++stack_size;
}
void pop(size_t thread)
{
if (unlikely(!has(thread)))
throw Exception("Can't pop thread because it is not in threads queue.", ErrorCodes::LOGICAL_ERROR);
--stack_size;
swap_threads(thread, stack[stack_size]);
}
size_t pop_any()
{
if (unlikely(stack_size == 0))
throw Exception("Can't pop from empty queue.", ErrorCodes::LOGICAL_ERROR);
--stack_size;
return stack[stack_size];
}
private:
std::vector<size_t> stack;
std::vector<size_t> thread_pos_in_stack;
size_t stack_size = 0;
void swap_threads(size_t first, size_t second)
{
std::swap(thread_pos_in_stack[first], thread_pos_in_stack[second]);
std::swap(stack[thread_pos_in_stack[first]], stack[thread_pos_in_stack[second]]);
}
};
}

View File

@ -183,6 +183,11 @@ public:
throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
}
virtual void work(size_t /*thread_num*/)
{
work();
}
/** You may call this method if 'prepare' returned Async.
* This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
*
@ -228,10 +233,17 @@ public:
void setDescription(const std::string & description_) { processor_description = description_; }
const std::string & getDescription() const { return processor_description; }
/// Helpers for pipeline executor.
void setStream(size_t value) { stream_number = value; }
size_t getStream() const { return stream_number; }
constexpr static size_t NO_STREAM = std::numeric_limits<size_t>::max();
private:
std::atomic<bool> is_cancelled{false};
std::string processor_description;
size_t stream_number = NO_STREAM;
};

View File

@ -75,6 +75,7 @@ void QueryPipeline::init(Processors sources)
totals.emplace_back(&source->getOutputs().back());
}
/// source->setStream(streams.size());
streams.emplace_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
}
@ -115,7 +116,7 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
Block header;
auto add_transform = [&](OutputPort *& stream, StreamType stream_type)
auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num [[maybe_unused]] = IProcessor::NO_STREAM)
{
if (!stream)
return;
@ -148,14 +149,17 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
if (transform)
{
// if (stream_type == StreamType::Main)
// transform->setStream(stream_num);
connect(*stream, transform->getInputs().front());
stream = &transform->getOutputs().front();
processors.emplace_back(std::move(transform));
}
};
for (auto & stream : streams)
add_transform(stream, StreamType::Main);
for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num)
add_transform(streams[stream_num], StreamType::Main, stream_num);
add_transform(delayed_stream_port, StreamType::Main);
add_transform(totals_having_port, StreamType::Totals);
@ -247,12 +251,12 @@ void QueryPipeline::concatDelayedStream()
delayed_stream_port = nullptr;
}
void QueryPipeline::resize(size_t num_streams)
void QueryPipeline::resize(size_t num_streams, bool force)
{
checkInitialized();
concatDelayedStream();
if (num_streams == getNumStreams())
if (!force && num_streams == getNumStreams())
return;
has_resize = true;

View File

@ -58,7 +58,7 @@ public:
/// Check if resize transform was used. (In that case another distinct transform will be added).
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
void resize(size_t num_streams);
void resize(size_t num_streams, bool force = false);
void unitePipelines(std::vector<QueryPipeline> && pipelines, const Block & common_header, const Context & context);
@ -81,6 +81,9 @@ public:
/// Call after execution.
void finalize();
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getMaxThreads() const { return max_threads; }
private:
/// All added processors.
@ -106,6 +109,8 @@ private:
IOutputFormat * output_format = nullptr;
size_t max_threads = 0;
void checkInitialized();
void checkSource(const ProcessorPtr & source, bool can_have_totals);
void concatDelayedStream();

View File

@ -123,7 +123,9 @@ Chunk SourceFromInputStream::generate()
return {};
}
#ifndef NDEBUG
assertBlocksHaveEqualStructure(getPort().getHeader(), block, "SourceFromInputStream");
#endif
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);

View File

@ -14,14 +14,30 @@ namespace ProfileEvents
namespace DB
{
/// Convert block to chunk.
/// Adds additional info about aggregation.
static Chunk convertToChunk(const Block & block)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
namespace
{
/// Reads chunks from file in native format. Provide chunks with aggregation info.
class SourceFromNativeStream : public ISource
{
public:
SourceFromNativeStream(const Block & header, const std::string & path)
: ISource(header), file_in(path), compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get()))
: ISource(header), file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get()))
{
block_in->readPrefix();
}
@ -41,15 +57,7 @@ namespace
return {};
}
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
return convertToChunk(block);
}
private:
@ -57,39 +65,331 @@ namespace
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
};
}
class ConvertingAggregatedToBlocksTransform : public ISource
/// Worker which merges buckets for two-level aggregation.
/// Atomically increments bucket counter and returns merged result.
class ConvertingAggregatedToChunksSource : public ISource
{
public:
ConvertingAggregatedToChunksSource(
AggregatingTransformParamsPtr params_,
ManyAggregatedDataVariantsPtr data_,
Arena * arena_,
std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge_)
: ISource(params_->getHeader())
, params(std::move(params_))
, data(std::move(data_))
, next_bucket_to_merge(std::move(next_bucket_to_merge_))
, arena(arena_)
{}
String getName() const override { return "ConvertingAggregatedToChunksSource"; }
protected:
Chunk generate() override
{
public:
ConvertingAggregatedToBlocksTransform(Block header, AggregatingTransformParamsPtr params_, BlockInputStreamPtr stream_)
: ISource(std::move(header)), params(std::move(params_)), stream(std::move(stream_)) {}
UInt32 bucket_num = next_bucket_to_merge->fetch_add(1);
String getName() const override { return "ConvertingAggregatedToBlocksTransform"; }
if (bucket_num >= NUM_BUCKETS)
return {};
protected:
Chunk generate() override
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num);
return convertToChunk(block);
}
private:
AggregatingTransformParamsPtr params;
ManyAggregatedDataVariantsPtr data;
std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge;
Arena * arena;
static constexpr UInt32 NUM_BUCKETS = 256;
};
/// Generates chunks with aggregated data.
/// In single level case, aggregates data itself.
/// In two-level case, creates `ConvertingAggregatedToChunksSource` workers:
///
/// ConvertingAggregatedToChunksSource ->
/// ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform
/// ConvertingAggregatedToChunksSource ->
///
/// Result chunks guaranteed to be sorted by bucket number.
class ConvertingAggregatedToChunksTransform : public IProcessor
{
public:
ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_)
: IProcessor({}, {params_->getHeader()})
, params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_) {}
String getName() const override { return "ConvertingAggregatedToChunksTransform"; }
void work() override
{
if (data->empty())
{
auto block = stream->read();
if (!block)
return {};
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
finished = true;
return;
}
private:
/// Store params because aggregator must be destroyed after stream. Order is important.
AggregatingTransformParamsPtr params;
BlockInputStreamPtr stream;
};
}
if (!is_initialized)
{
initialize();
return;
}
if (data->at(0)->isTwoLevel())
{
/// In two-level case will only create sources.
if (inputs.empty())
createSources();
}
else
{
mergeSingleLevel();
}
}
Processors expandPipeline() override
{
for (auto & source : processors)
{
auto & out = source->getOutputs().front();
inputs.emplace_back(out.getHeader(), this);
connect(out, inputs.back());
}
return std::move(processors);
}
IProcessor::Status prepare() override
{
auto & output = outputs.front();
if (finished && !has_input)
{
output.finish();
return Status::Finished;
}
/// Check can output.
if (output.isFinished())
return Status::Finished;
if (!output.canPush())
return Status::PortFull;
if (!is_initialized)
return Status::Ready;
if (!processors.empty())
return Status::ExpandPipeline;
if (has_input)
return preparePushToOutput();
/// Single level case.
if (inputs.empty())
return Status::Ready;
/// Two-level case.
return preparePullFromInputs();
}
private:
IProcessor::Status preparePushToOutput()
{
auto & output = outputs.front();
output.push(std::move(current_chunk));
has_input = false;
if (finished)
{
output.finish();
return Status::Finished;
}
return Status::PortFull;
}
/// Read all sources and try to push current bucket.
IProcessor::Status preparePullFromInputs()
{
bool all_inputs_are_finished = true;
for (auto & input : inputs)
{
if (input.isFinished())
continue;
all_inputs_are_finished = false;
input.setNeeded();
if (input.hasData())
ready_chunks.emplace_back(input.pull());
}
moveReadyChunksToMap();
if (trySetCurrentChunkFromCurrentBucket())
return preparePushToOutput();
if (all_inputs_are_finished)
throw Exception("All sources have finished before getting enough data in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
return Status::NeedData;
}
private:
AggregatingTransformParamsPtr params;
ManyAggregatedDataVariantsPtr data;
size_t num_threads;
bool is_initialized = false;
bool has_input = false;
bool finished = false;
Chunk current_chunk;
Chunks ready_chunks;
UInt32 current_bucket_num = 0;
static constexpr Int32 NUM_BUCKETS = 256;
std::map<UInt32, Chunk> bucket_to_chunk;
Processors processors;
static Int32 getBucketFromChunk(const Chunk & chunk)
{
auto & info = chunk.getChunkInfo();
if (!info)
throw Exception("Chunk info was not set for chunk in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
if (!agg_info)
throw Exception("Chunk should have AggregatedChunkInfo in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
return agg_info->bucket_num;
}
void moveReadyChunksToMap()
{
for (auto & chunk : ready_chunks)
{
auto bucket = getBucketFromChunk(chunk);
if (bucket < 0 || bucket >= NUM_BUCKETS)
throw Exception("Invalid bucket number " + toString(bucket) + " in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
if (bucket_to_chunk.count(bucket))
throw Exception("Found several chunks with the same bucket number in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
bucket_to_chunk[bucket] = std::move(chunk);
}
ready_chunks.clear();
}
void setCurrentChunk(Chunk chunk)
{
if (has_input)
throw Exception("Current chunk was already set in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
has_input = true;
current_chunk = std::move(chunk);
}
void initialize()
{
is_initialized = true;
AggregatedDataVariantsPtr & first = data->at(0);
/// At least we need one arena in first data item per thread
if (num_threads > first->aggregates_pools.size())
{
Arenas & first_pool = first->aggregates_pools;
for (size_t j = first_pool.size(); j < num_threads; j++)
first_pool.emplace_back(std::make_shared<Arena>());
}
if (first->type == AggregatedDataVariants::Type::without_key || params->params.overflow_row)
{
params->aggregator.mergeWithoutKeyDataImpl(*data);
auto block = params->aggregator.prepareBlockAndFillWithoutKey(
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
setCurrentChunk(convertToChunk(block));
}
}
void mergeSingleLevel()
{
AggregatedDataVariantsPtr & first = data->at(0);
if (current_bucket_num > 0 || first->type == AggregatedDataVariants::Type::without_key)
{
finished = true;
return;
}
++current_bucket_num;
#define M(NAME) \
else if (first->type == AggregatedDataVariants::Type::NAME) \
params->aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(*data);
if (false) {}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
auto block = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final);
setCurrentChunk(convertToChunk(block));
finished = true;
}
void createSources()
{
AggregatedDataVariantsPtr & first = data->at(0);
auto next_bucket_to_merge = std::make_shared<std::atomic<UInt32>>(0);
for (size_t thread = 0; thread < num_threads; ++thread)
{
Arena * arena = first->aggregates_pools.at(thread).get();
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(
params, data, arena, next_bucket_to_merge);
processors.emplace_back(std::move(source));
}
}
bool trySetCurrentChunkFromCurrentBucket()
{
auto it = bucket_to_chunk.find(current_bucket_num);
if (it != bucket_to_chunk.end())
{
setCurrentChunk(std::move(it->second));
++current_bucket_num;
if (current_bucket_num == NUM_BUCKETS)
finished = true;
return true;
}
return false;
}
};
AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformParamsPtr params_)
: AggregatingTransform(std::move(header), std::move(params_)
@ -197,7 +497,9 @@ Processors AggregatingTransform::expandPipeline()
void AggregatingTransform::consume(Chunk chunk)
{
if (chunk.getNumRows() == 0 && params->params.empty_result_for_aggregation_by_empty_set)
UInt64 num_rows = chunk.getNumRows();
if (num_rows == 0 && params->params.empty_result_for_aggregation_by_empty_set)
return;
if (!is_consume_started)
@ -209,9 +511,7 @@ void AggregatingTransform::consume(Chunk chunk)
src_rows += chunk.getNumRows();
src_bytes += chunk.bytes();
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
if (!params->aggregator.executeOnBlock(block, variants, key_columns, aggregate_columns, no_more_keys))
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
}
@ -249,8 +549,9 @@ void AggregatingTransform::initGenerate()
if (!params->aggregator.hasTemporaryFiles())
{
auto stream = params->aggregator.mergeAndConvertToBlocks(many_data->variants, params->final, max_threads);
processors.emplace_back(std::make_shared<ConvertingAggregatedToBlocksTransform>(stream->getHeader(), params, std::move(stream)));
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
processors.emplace_back(std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads));
}
else
{

View File

@ -33,12 +33,16 @@ struct AggregatingTransformParams
struct ManyAggregatedData
{
ManyAggregatedDataVariants variants;
std::vector<std::unique_ptr<std::mutex>> mutexes;
std::atomic<UInt32> num_finished = 0;
explicit ManyAggregatedData(size_t num_threads = 0) : variants(num_threads)
explicit ManyAggregatedData(size_t num_threads = 0) : variants(num_threads), mutexes(num_threads)
{
for (auto & elem : variants)
elem = std::make_shared<AggregatedDataVariants>();
for (auto & mut : mutexes)
mut = std::make_unique<std::mutex>();
}
};

View File

@ -507,6 +507,7 @@ Processors createMergingAggregatedMemoryEfficientPipe(
for (size_t i = 0; i < num_merging_processors; ++i, ++in, ++out)
{
auto transform = std::make_shared<MergingAggregatedBucketTransform>(params);
transform->setStream(i);
connect(*out, transform->getInputPort());
connect(transform->getOutputPort(), *in);
processors.emplace_back(std::move(transform));

View File

@ -1,4 +1,5 @@
SET output_format_write_statistics = 0;
SET experimental_use_processors = 0;
select
sum(cnt) > 0 as total,