2015-12-01 22:35:48 +00:00
|
|
|
|
#include <future>
|
|
|
|
|
#include <DB/Common/setThreadName.h>
|
2016-01-21 01:47:28 +00:00
|
|
|
|
#include <DB/Common/CurrentMetrics.h>
|
2015-09-08 19:53:16 +00:00
|
|
|
|
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficientBlockInputStream(
|
2015-12-05 04:20:37 +00:00
|
|
|
|
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t reading_threads_, size_t merging_threads_)
|
|
|
|
|
: aggregator(params), final(final_),
|
|
|
|
|
reading_threads(std::min(reading_threads_, inputs_.size())), merging_threads(merging_threads_),
|
|
|
|
|
inputs(inputs_.begin(), inputs_.end())
|
2015-09-08 19:53:16 +00:00
|
|
|
|
{
|
|
|
|
|
children = inputs_;
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
|
2015-09-08 20:19:30 +00:00
|
|
|
|
String MergingAggregatedMemoryEfficientBlockInputStream::getID() const
|
2015-09-08 19:53:16 +00:00
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
|
|
|
|
res << "MergingAggregatedMemoryEfficient(" << aggregator.getID();
|
|
|
|
|
for (size_t i = 0, size = children.size(); i < size; ++i)
|
|
|
|
|
res << ", " << children.back()->getID();
|
|
|
|
|
res << ")";
|
|
|
|
|
return res.str();
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
|
|
|
|
|
void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix()
|
|
|
|
|
{
|
|
|
|
|
start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix()
|
|
|
|
|
{
|
|
|
|
|
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
|
|
|
|
|
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
|
|
finalize();
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < children.size(); ++i)
|
|
|
|
|
children[i]->readSuffix();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void MergingAggregatedMemoryEfficientBlockInputStream::cancel()
|
|
|
|
|
{
|
|
|
|
|
bool old_val = false;
|
|
|
|
|
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
if (parallel_merge_data)
|
|
|
|
|
{
|
|
|
|
|
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
|
|
|
|
|
|
|
|
|
parallel_merge_data->finish = true;
|
|
|
|
|
parallel_merge_data->merged_blocks_changed.notify_one();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
|
{
|
|
|
|
|
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(input.stream.get()))
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
child->cancel();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
/** Если не удалось попросить остановиться одного или несколько источников.
|
|
|
|
|
* (например, разорвано соединение при распределённой обработке запроса)
|
|
|
|
|
* - то пофиг.
|
|
|
|
|
*/
|
|
|
|
|
LOG_ERROR(log, "Exception while cancelling " << child->getName());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
void MergingAggregatedMemoryEfficientBlockInputStream::start()
|
2015-09-08 19:53:16 +00:00
|
|
|
|
{
|
2015-12-05 04:20:37 +00:00
|
|
|
|
if (started)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
started = true;
|
|
|
|
|
|
|
|
|
|
/// Если child - RemoteBlockInputStream, то child->readPrefix() отправляет запрос на удалённый сервер, инициируя вычисления.
|
|
|
|
|
|
|
|
|
|
if (reading_threads == 1)
|
2015-09-08 19:53:16 +00:00
|
|
|
|
{
|
2015-12-05 04:20:37 +00:00
|
|
|
|
for (auto & child : children)
|
|
|
|
|
child->readPrefix();
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
reading_pool.reset(new boost::threadpool::pool(reading_threads));
|
|
|
|
|
|
|
|
|
|
size_t num_children = children.size();
|
|
|
|
|
std::vector<std::packaged_task<void()>> tasks(num_children);
|
|
|
|
|
for (size_t i = 0; i < num_children; ++i)
|
2015-12-01 22:35:48 +00:00
|
|
|
|
{
|
2015-12-05 04:20:37 +00:00
|
|
|
|
auto & child = children[i];
|
|
|
|
|
auto & task = tasks[i];
|
|
|
|
|
|
2015-12-06 05:43:36 +00:00
|
|
|
|
auto memory_tracker = current_memory_tracker;
|
|
|
|
|
task = std::packaged_task<void()>([&child, memory_tracker]
|
|
|
|
|
{
|
|
|
|
|
current_memory_tracker = memory_tracker;
|
|
|
|
|
setThreadName("MergeAggReadThr");
|
2016-01-21 01:47:28 +00:00
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
|
2015-12-06 05:43:36 +00:00
|
|
|
|
child->readPrefix();
|
|
|
|
|
});
|
2015-12-05 04:20:37 +00:00
|
|
|
|
reading_pool->schedule([&task] { task(); });
|
2015-12-01 22:35:48 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
reading_pool->wait();
|
|
|
|
|
for (auto & task : tasks)
|
|
|
|
|
task.get_future().get();
|
|
|
|
|
}
|
2015-12-13 15:07:01 +00:00
|
|
|
|
|
|
|
|
|
if (merging_threads > 1)
|
|
|
|
|
{
|
|
|
|
|
/** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа,
|
|
|
|
|
* затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты.
|
|
|
|
|
*/
|
|
|
|
|
parallel_merge_data.reset(new ParallelMergeData(merging_threads));
|
|
|
|
|
|
|
|
|
|
auto & pool = parallel_merge_data->pool;
|
|
|
|
|
|
|
|
|
|
/** Создаём потоки, которые будут получать и мерджить данные.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < merging_threads; ++i)
|
|
|
|
|
pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread,
|
|
|
|
|
this, current_memory_tracker));
|
|
|
|
|
}
|
2015-12-05 04:20:37 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
|
|
|
|
|
{
|
|
|
|
|
start();
|
|
|
|
|
|
|
|
|
|
if (merging_threads == 1)
|
|
|
|
|
{
|
2015-12-01 22:35:48 +00:00
|
|
|
|
if (BlocksToMerge blocks_to_merge = getNextBlocksToMerge())
|
|
|
|
|
return aggregator.mergeBlocks(*blocks_to_merge, final);
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
Block res;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
while (true)
|
2015-12-01 22:35:48 +00:00
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
if (parallel_merge_data->exception)
|
|
|
|
|
std::rethrow_exception(parallel_merge_data->exception);
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
if (parallel_merge_data->finish)
|
|
|
|
|
break;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
if (!parallel_merge_data->merged_blocks.empty())
|
|
|
|
|
{
|
|
|
|
|
auto it = parallel_merge_data->merged_blocks.begin();
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
if (it->second)
|
|
|
|
|
{
|
|
|
|
|
res.swap(it->second);
|
|
|
|
|
parallel_merge_data->merged_blocks.erase(it);
|
|
|
|
|
parallel_merge_data->have_space.notify_one();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (parallel_merge_data->exhausted)
|
|
|
|
|
break;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
parallel_merge_data->merged_blocks_changed.wait(lock);
|
|
|
|
|
}
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
if (!res)
|
|
|
|
|
all_read = true;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
return res;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-12-05 03:04:13 +00:00
|
|
|
|
MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEfficientBlockInputStream()
|
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
if (!all_read)
|
|
|
|
|
cancel();
|
2015-12-05 04:20:37 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
finalize();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
2015-12-05 03:04:13 +00:00
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-12-08 21:29:38 +00:00
|
|
|
|
|
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
void MergingAggregatedMemoryEfficientBlockInputStream::finalize()
|
|
|
|
|
{
|
|
|
|
|
if (!started)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Waiting for threads to finish");
|
|
|
|
|
|
|
|
|
|
if (reading_pool)
|
|
|
|
|
reading_pool->wait();
|
|
|
|
|
|
|
|
|
|
if (parallel_merge_data)
|
2015-12-05 03:04:13 +00:00
|
|
|
|
parallel_merge_data->pool.wait();
|
2015-12-13 15:07:01 +00:00
|
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Waited for threads to finish");
|
2015-12-05 03:04:13 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker * memory_tracker)
|
|
|
|
|
{
|
2015-12-06 05:43:36 +00:00
|
|
|
|
setThreadName("MergeAggMergThr");
|
2015-12-01 22:35:48 +00:00
|
|
|
|
current_memory_tracker = memory_tracker;
|
2016-01-21 01:47:28 +00:00
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
while (!parallel_merge_data->finish)
|
2015-12-01 22:35:48 +00:00
|
|
|
|
{
|
2015-12-05 04:20:37 +00:00
|
|
|
|
/** Получение следующих блоков делается в одном пуле потоков, а мердж - в другом.
|
|
|
|
|
* Это весьма сложное взаимодействие.
|
|
|
|
|
* Каждый раз,
|
|
|
|
|
* - reading_threads читают по одному следующему блоку из каждого источника;
|
|
|
|
|
* - из этих блоков составляется группа блоков для слияния;
|
|
|
|
|
* - один из merging_threads выполняет слияние этой группы блоков;
|
|
|
|
|
*/
|
2015-12-01 22:35:48 +00:00
|
|
|
|
BlocksToMerge blocks_to_merge;
|
2015-12-13 15:07:01 +00:00
|
|
|
|
int output_order = -1;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
|
|
|
|
|
|
2015-12-09 04:28:01 +00:00
|
|
|
|
if (parallel_merge_data->exhausted || parallel_merge_data->finish)
|
2015-12-01 22:35:48 +00:00
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
blocks_to_merge = getNextBlocksToMerge();
|
|
|
|
|
|
2015-12-03 02:43:40 +00:00
|
|
|
|
if (!blocks_to_merge || blocks_to_merge->empty())
|
2015-12-01 22:35:48 +00:00
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
parallel_merge_data->exhausted = true;
|
2015-12-13 15:07:01 +00:00
|
|
|
|
parallel_merge_data->merged_blocks_changed.notify_one();
|
2015-12-01 22:35:48 +00:00
|
|
|
|
break;
|
|
|
|
|
}
|
2015-12-13 15:07:01 +00:00
|
|
|
|
|
|
|
|
|
output_order = blocks_to_merge->front().info.is_overflows
|
|
|
|
|
? NUM_BUCKETS /// Блоки "переполнений" отдаются функцией getNextBlocksToMerge позже всех остальных.
|
|
|
|
|
: blocks_to_merge->front().info.bucket_num;
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
|
|
|
|
|
|
|
|
|
while (parallel_merge_data->merged_blocks.size() >= merging_threads)
|
|
|
|
|
parallel_merge_data->have_space.wait(lock);
|
|
|
|
|
|
|
|
|
|
/** Кладём пустой блок, что означает обещание его заполнить.
|
|
|
|
|
* Основной поток должен возвращать результаты строго в порядке output_order, поэтому это важно.
|
|
|
|
|
*/
|
|
|
|
|
parallel_merge_data->merged_blocks[output_order];
|
|
|
|
|
}
|
2015-12-01 22:35:48 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-12-09 01:29:01 +00:00
|
|
|
|
Block res = aggregator.mergeBlocks(*blocks_to_merge, final);
|
|
|
|
|
|
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
2015-12-09 01:29:01 +00:00
|
|
|
|
|
2015-12-09 04:28:01 +00:00
|
|
|
|
if (parallel_merge_data->finish)
|
2015-12-09 01:29:01 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
parallel_merge_data->merged_blocks[output_order] = res;
|
|
|
|
|
parallel_merge_data->merged_blocks_changed.notify_one();
|
2015-12-09 01:29:01 +00:00
|
|
|
|
}
|
2015-12-01 22:35:48 +00:00
|
|
|
|
}
|
2015-09-08 19:53:16 +00:00
|
|
|
|
}
|
2015-12-01 22:35:48 +00:00
|
|
|
|
catch (...)
|
|
|
|
|
{
|
2015-12-13 15:07:01 +00:00
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
|
|
|
|
parallel_merge_data->exception = std::current_exception();
|
|
|
|
|
parallel_merge_data->merged_blocks_changed.notify_one();
|
|
|
|
|
}
|
2015-12-03 02:43:40 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
cancel();
|
|
|
|
|
}
|
2015-12-01 22:35:48 +00:00
|
|
|
|
}
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
|
|
|
|
MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregatedMemoryEfficientBlockInputStream::getNextBlocksToMerge()
|
|
|
|
|
{
|
2015-09-08 19:53:16 +00:00
|
|
|
|
/** Имеем несколько источников.
|
|
|
|
|
* Из каждого из них могут приходить следующие данные:
|
|
|
|
|
*
|
|
|
|
|
* 1. Блок, с указанным bucket_num.
|
|
|
|
|
* Это значит, что на удалённом сервере, данные были разрезаны по корзинам.
|
|
|
|
|
* И данные для одного bucket_num с разных серверов можно независимо объединять.
|
|
|
|
|
* При этом, даннные для разных bucket_num будут идти по возрастанию.
|
|
|
|
|
*
|
|
|
|
|
* 2. Блок без указания bucket_num.
|
|
|
|
|
* Это значит, что на удалённом сервере, данные не были разрезаны по корзинам.
|
|
|
|
|
* В случае, когда со всех серверов прийдут такие данные, их можно всех объединить.
|
|
|
|
|
* А если с другой части серверов прийдут данные, разрезанные по корзинам,
|
|
|
|
|
* то данные, не разрезанные по корзинам, нужно сначала разрезать, а потом объединять.
|
|
|
|
|
*
|
|
|
|
|
* 3. Блоки с указанием is_overflows.
|
|
|
|
|
* Это дополнительные данные для строк, не прошедших через max_rows_to_group_by.
|
|
|
|
|
* Они должны объединяться друг с другом отдельно.
|
|
|
|
|
*/
|
|
|
|
|
++current_bucket_num;
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
/// Получить из источника следующий блок с номером корзины не больше current_bucket_num.
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
auto need_that_input = [this] (Input & input)
|
|
|
|
|
{
|
|
|
|
|
return !input.is_exhausted
|
|
|
|
|
&& input.block.info.bucket_num < current_bucket_num;
|
|
|
|
|
};
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
auto read_from_input = [this] (Input & input)
|
|
|
|
|
{
|
2015-09-08 19:53:16 +00:00
|
|
|
|
/// Если придёт блок не с основными данными, а с overflows, то запомним его и повторим чтение.
|
|
|
|
|
while (true)
|
|
|
|
|
{
|
|
|
|
|
// std::cerr << "reading block\n";
|
|
|
|
|
Block block = input.stream->read();
|
|
|
|
|
|
|
|
|
|
if (!block)
|
|
|
|
|
{
|
|
|
|
|
// std::cerr << "input is exhausted\n";
|
|
|
|
|
input.is_exhausted = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (block.info.bucket_num != -1)
|
|
|
|
|
{
|
|
|
|
|
/// Один из разрезанных блоков для двухуровневых данных.
|
|
|
|
|
// std::cerr << "block for bucket " << block.info.bucket_num << "\n";
|
|
|
|
|
|
|
|
|
|
has_two_level = true;
|
|
|
|
|
input.block = block;
|
|
|
|
|
}
|
|
|
|
|
else if (block.info.is_overflows)
|
|
|
|
|
{
|
|
|
|
|
// std::cerr << "block for overflows\n";
|
|
|
|
|
|
|
|
|
|
has_overflows = true;
|
|
|
|
|
input.overflow_block = block;
|
|
|
|
|
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/// Блок для неразрезанных (одноуровневых) данных.
|
|
|
|
|
// std::cerr << "block without bucket\n";
|
|
|
|
|
|
|
|
|
|
input.block = block;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
2015-12-05 04:20:37 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if (reading_threads == 1)
|
|
|
|
|
{
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
|
if (need_that_input(input))
|
|
|
|
|
read_from_input(input);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
size_t num_inputs = inputs.size();
|
|
|
|
|
std::vector<std::packaged_task<void()>> tasks;
|
|
|
|
|
tasks.reserve(num_inputs);
|
|
|
|
|
|
2015-12-05 05:54:10 +00:00
|
|
|
|
for (auto & input : inputs)
|
2015-12-05 04:20:37 +00:00
|
|
|
|
{
|
|
|
|
|
if (need_that_input(input))
|
|
|
|
|
{
|
2015-12-07 20:08:00 +00:00
|
|
|
|
auto memory_tracker = current_memory_tracker;
|
|
|
|
|
tasks.emplace_back([&input, &read_from_input, memory_tracker]
|
|
|
|
|
{
|
|
|
|
|
current_memory_tracker = memory_tracker;
|
|
|
|
|
setThreadName("MergeAggReadThr");
|
2016-01-21 01:47:28 +00:00
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
|
2015-12-07 20:08:00 +00:00
|
|
|
|
read_from_input(input);
|
|
|
|
|
});
|
2015-12-05 05:54:10 +00:00
|
|
|
|
auto & task = tasks.back();
|
2015-12-05 04:20:37 +00:00
|
|
|
|
reading_pool->schedule([&task] { task(); });
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
reading_pool->wait();
|
|
|
|
|
for (auto & task : tasks)
|
|
|
|
|
task.get_future().get();
|
2015-09-08 19:53:16 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while (true)
|
|
|
|
|
{
|
|
|
|
|
if (current_bucket_num == NUM_BUCKETS)
|
|
|
|
|
{
|
|
|
|
|
/// Обработали все основные данные. Остались, возможно, только overflows-блоки.
|
|
|
|
|
// std::cerr << "at end\n";
|
|
|
|
|
|
|
|
|
|
if (has_overflows)
|
|
|
|
|
{
|
|
|
|
|
// std::cerr << "merging overflows\n";
|
|
|
|
|
|
|
|
|
|
has_overflows = false;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
BlocksToMerge blocks_to_merge = new BlocksList;
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
|
if (input.overflow_block)
|
2015-12-01 22:35:48 +00:00
|
|
|
|
blocks_to_merge->emplace_back(std::move(input.overflow_block));
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
return blocks_to_merge;
|
2015-09-08 19:53:16 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
else if (has_two_level)
|
|
|
|
|
{
|
|
|
|
|
/** Есть двухуровневые данные.
|
|
|
|
|
* Будем обрабатывать номера корзин по возрастанию.
|
|
|
|
|
* Найдём минимальный номер корзины, для которой есть данные,
|
|
|
|
|
* затем померджим эти данные.
|
|
|
|
|
*/
|
|
|
|
|
// std::cerr << "has two level\n";
|
|
|
|
|
|
|
|
|
|
int min_bucket_num = NUM_BUCKETS;
|
|
|
|
|
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
|
{
|
|
|
|
|
/// Изначально разрезанные (двухуровневые) блоки.
|
|
|
|
|
if (input.block.info.bucket_num != -1 && input.block.info.bucket_num < min_bucket_num)
|
|
|
|
|
min_bucket_num = input.block.info.bucket_num;
|
|
|
|
|
|
|
|
|
|
/// Ещё не разрезанный по корзинам блок. Разрезаем его и кладём результат в splitted_blocks.
|
|
|
|
|
if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty())
|
|
|
|
|
{
|
|
|
|
|
LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
|
|
|
|
|
|
|
|
|
|
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
|
2015-09-10 01:58:37 +00:00
|
|
|
|
input.block = Block();
|
2015-09-08 19:53:16 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Блоки, которые мы получили разрезанием одноуровневых блоков.
|
|
|
|
|
if (!input.splitted_blocks.empty())
|
|
|
|
|
{
|
|
|
|
|
for (const auto & block : input.splitted_blocks)
|
|
|
|
|
{
|
|
|
|
|
if (block && block.info.bucket_num < min_bucket_num)
|
|
|
|
|
{
|
|
|
|
|
min_bucket_num = block.info.bucket_num;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
current_bucket_num = min_bucket_num;
|
|
|
|
|
|
|
|
|
|
// std::cerr << "current_bucket_num = " << current_bucket_num << "\n";
|
|
|
|
|
|
|
|
|
|
/// Блоков с основными данными больше нет.
|
|
|
|
|
if (current_bucket_num == NUM_BUCKETS)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
/// Теперь собираем блоки для current_bucket_num, чтобы их померджить.
|
2015-12-01 22:35:48 +00:00
|
|
|
|
BlocksToMerge blocks_to_merge = new BlocksList;
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
|
{
|
|
|
|
|
if (input.block.info.bucket_num == current_bucket_num)
|
|
|
|
|
{
|
|
|
|
|
// std::cerr << "having block for current_bucket_num\n";
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
blocks_to_merge->emplace_back(std::move(input.block));
|
2015-09-08 19:53:16 +00:00
|
|
|
|
input.block = Block();
|
|
|
|
|
}
|
|
|
|
|
else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num])
|
|
|
|
|
{
|
|
|
|
|
// std::cerr << "having splitted data for bucket\n";
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
blocks_to_merge->emplace_back(std::move(input.splitted_blocks[min_bucket_num]));
|
2015-09-08 19:53:16 +00:00
|
|
|
|
input.splitted_blocks[min_bucket_num] = Block();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
return blocks_to_merge;
|
2015-09-08 19:53:16 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/// Есть только одноуровневые данные. Просто мерджим их.
|
|
|
|
|
// std::cerr << "don't have two level\n";
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
BlocksToMerge blocks_to_merge = new BlocksList;
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
|
if (input.block)
|
2015-12-01 22:35:48 +00:00
|
|
|
|
blocks_to_merge->emplace_back(std::move(input.block));
|
2015-09-08 19:53:16 +00:00
|
|
|
|
|
|
|
|
|
current_bucket_num = NUM_BUCKETS;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
return blocks_to_merge;
|
2015-09-08 19:53:16 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|