dbms: removed old code [#METR-17000].

This commit is contained in:
Alexey Milovidov 2015-12-09 05:55:35 +03:00
parent 69942f38d4
commit 7560351942
3 changed files with 6 additions and 235 deletions

View File

@ -851,10 +851,6 @@ public:
*/
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
/** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.)
*/
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) const;
/** Объединить несколько структур данных агрегации и выдать результат в виде потока блоков.
*/
std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
@ -1081,12 +1077,6 @@ protected:
Table & table_dst,
Table & table_src) const;
/// Слить все ключи, оставшиеся после предыдущего метода, в overflows.
template <typename Method, typename Table>
void mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const;
void mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
@ -1094,11 +1084,6 @@ protected:
void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method>
void mergeTwoLevelDataImpl(
ManyAggregatedDataVariants & many_data,
boost::threadpool::pool * thread_pool) const;
template <typename Method, typename Table>
void convertToBlockImpl(
Method & method,

View File

@ -12,17 +12,17 @@ Block AggregatingBlockInputStream::readImpl()
if (!executed)
{
executed = true;
AggregatedDataVariants data_variants;
AggregatedDataVariantsPtr data_variants = new AggregatedDataVariants;
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
aggregator.execute(children.back(), data_variants);
aggregator.execute(children.back(), *data_variants);
if (!aggregator.hasTemporaryFiles())
{
impl.reset(new BlocksListBlockInputStream(
aggregator.convertToBlocks(data_variants, final, 1)));
ManyAggregatedDataVariants many_data { data_variants };
impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
}
else
{
@ -35,9 +35,9 @@ Block AggregatingBlockInputStream::readImpl()
if (!isCancelled())
{
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще.
size_t rows = data_variants.sizeWithoutOverflowRow();
size_t rows = data_variants->sizeWithoutOverflowRow();
if (rows)
aggregator.writeToTemporaryFile(data_variants, rows);
aggregator.writeToTemporaryFile(*data_variants, rows);
}
const auto & files = aggregator.getTemporaryFiles();

View File

@ -1442,31 +1442,6 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
}
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
if (Method::getAggregateData(it->second) == nullptr)
continue;
AggregateDataPtr res_data = overflows;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr;
}
}
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
@ -1526,193 +1501,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
}
template <typename Method>
void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data,
boost::threadpool::pool * thread_pool) const
{
AggregatedDataVariantsPtr & res = non_empty_data[0];
/// В данном случае, no_more_keys будет выставлено, только если в первом (самом большом) состоянии достаточно много строк.
bool no_more_keys = false;
if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys))
return;
/// Слияние распараллеливается по корзинам - первому уровню TwoLevelHashMap.
auto merge_bucket = [&non_empty_data, &res, no_more_keys, this](size_t bucket, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
/// Все результаты агрегации соединяем с первым.
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
AggregatedDataVariants & current = *non_empty_data[i];
if (!no_more_keys)
{
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
getDataVariant<Method>(current).data.impls[bucket].clearAndShrink();
}
else
{
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
}
}
};
/// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
std::vector<std::packaged_task<void()>> tasks(Method::Data::NUM_BUCKETS);
try
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
tasks[bucket] = std::packaged_task<void()>(std::bind(merge_bucket, bucket, current_memory_tracker));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
else
tasks[bucket]();
}
}
catch (...)
{
/// Если этого не делать, то в случае исключения, tasks уничтожится раньше завершения потоков, и будет плохо.
if (thread_pool)
thread_pool->wait();
throw;
}
if (thread_pool)
thread_pool->wait();
for (auto & task : tasks)
if (task.valid())
task.get_future().get();
if (no_more_keys && params.overflow_row)
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
AggregatedDataVariants & current = *non_empty_data[i];
mergeDataRemainingKeysToOverflowsImpl<Method>(
res->without_key,
getDataVariant<Method>(current).data.impls[bucket]);
}
}
}
/// aggregator не будет уничтожать состояния агрегатных функций в деструкторе
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
non_empty_data[i]->aggregator = nullptr;
}
AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) const
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::merge.", ErrorCodes::EMPTY_DATA_PASSED);
LOG_TRACE(log, "Merging aggregated data");
Stopwatch watch;
ManyAggregatedDataVariants non_empty_data;
non_empty_data.reserve(data_variants.size());
for (auto & data : data_variants)
if (!data->empty())
non_empty_data.push_back(data);
if (non_empty_data.empty())
return data_variants[0];
if (non_empty_data.size() == 1)
return non_empty_data[0];
/// Отсортируем состояния по убыванию размера, чтобы мердж был более эффективным (так как все состояния мерджатся в первое).
std::sort(non_empty_data.begin(), non_empty_data.end(),
[](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
{
return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();
});
/// Если хотя бы один из вариантов двухуровневый, то переконвертируем все варианты в двухуровневые, если есть не такие.
/// Замечание - возможно, было бы более оптимально не конвертировать одноуровневые варианты перед мерджем, а мерджить их отдельно, в конце.
bool has_at_least_one_two_level = false;
for (const auto & variant : non_empty_data)
{
if (variant->isTwoLevel())
{
has_at_least_one_two_level = true;
break;
}
}
if (has_at_least_one_two_level)
for (auto & variant : non_empty_data)
if (!variant->isTwoLevel())
variant->convertToTwoLevel();
AggregatedDataVariantsPtr & res = non_empty_data[0];
size_t rows = res->size();
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
rows += non_empty_data[i]->size();
AggregatedDataVariants & current = *non_empty_data[i];
if (res->type != current.type)
throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
res->aggregates_pools.insert(res->aggregates_pools.end(), current.aggregates_pools.begin(), current.aggregates_pools.end());
}
/// В какой структуре данных агрегированы данные?
if (res->type == AggregatedDataVariants::Type::without_key || params.overflow_row)
mergeWithoutKeyDataImpl(non_empty_data);
std::unique_ptr<boost::threadpool::pool> thread_pool;
if (max_threads > 1 && res->isTwoLevel())
thread_pool.reset(new boost::threadpool::pool(max_threads));
if (false) {}
#define M(NAME) \
else if (res->type == AggregatedDataVariants::Type::NAME) \
mergeSingleLevelDataImpl<decltype(res->NAME)::element_type>(non_empty_data);
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
#define M(NAME) \
else if (res->type == AggregatedDataVariants::Type::NAME) \
mergeTwoLevelDataImpl<decltype(res->NAME)::element_type>(non_empty_data, thread_pool.get());
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else if (res->type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
double elapsed_seconds = watch.elapsedSeconds();
size_t res_rows = res->size();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Merged aggregated data. "
<< "From " << rows << " to " << res_rows << " rows (efficiency: " << static_cast<double>(rows) / res_rows << ")"
<< " in " << elapsed_seconds << " sec."
<< " (" << rows / elapsed_seconds << " rows/sec.)");
return res;
}
template <typename Method>
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const
@ -1734,8 +1522,6 @@ void NO_INLINE Aggregator::mergeBucketImpl(
* Если состояния агрегации двухуровневые, то выдаёт блоки строго по порядку bucket_num.
* (Это важно при распределённой обработке.)
* При этом, может обрабатывать разные bucket-ы параллельно, используя до threads потоков.
*
* TODO Удалить обычную функцию Aggregator::merge и связанные с ней, в случае невостребованности.
*/
class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
{