This commit is contained in:
Pavel Kartavyy 2015-12-09 14:58:48 +03:00
commit 3231c7fdfb
55 changed files with 905 additions and 485 deletions

View File

@ -211,7 +211,7 @@ public:
memcpy(&data[old_size], &src_concrete.getData()[start], length * sizeof(data[0]));
}
ColumnPtr filter(const Filter & filter) const override
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override
{
size_t size = getData().size();
if (size != filter.size())
@ -225,7 +225,9 @@ public:
auto & res_data = res_->getData();
res_data.reserve(size);
if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
for (size_t i = 0; i < size; ++i)
if (filter[i])
res_data.push_back(getData()[i]);

View File

@ -175,7 +175,7 @@ public:
getOffsets().push_back(getOffsets().size() == 0 ? 0 : getOffsets().back());
}
ColumnPtr filter(const Filter & filt) const override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
@ -310,10 +310,10 @@ private:
/// Специализации для функции filter.
template <typename T>
ColumnPtr filterNumber(const Filter & filt) const;
ColumnPtr filterNumber(const Filter & filt, ssize_t result_size_hint) const;
ColumnPtr filterString(const Filter & filt) const;
ColumnPtr filterGeneric(const Filter & filt) const;
ColumnPtr filterString(const Filter & filt, ssize_t result_size_hint) const;
ColumnPtr filterGeneric(const Filter & filt, ssize_t result_size_hint) const;
};

View File

@ -26,6 +26,29 @@ public:
};
namespace ColumnConstDetails
{
template <typename T>
inline bool equals(const T & x, const T & y)
{
return x == y;
}
/// Проверяет побитовую идентичность элементов, даже если они являются NaN-ами.
template <>
inline bool equals(const Float32 & x, const Float32 & y)
{
return 0 == memcmp(&x, &y, sizeof(x));
}
template <>
inline bool equals(const Float64 & x, const Float64 & y)
{
return 0 == memcmp(&x, &y, sizeof(x));
}
}
/** Столбец-константа может содержать внутри себя само значение,
* или, в случае массивов, SharedPtr от значения-массива,
* чтобы избежать проблем производительности при копировании очень больших массивов.
@ -65,7 +88,7 @@ public:
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override
{
if (getDataFromHolder() != static_cast<const Derived &>(src).getDataFromHolder())
if (!ColumnConstDetails::equals(getDataFromHolder(), static_cast<const Derived &>(src).getDataFromHolder()))
throw Exception("Cannot insert different element into constant column " + getName(),
ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN);
@ -74,7 +97,7 @@ public:
void insert(const Field & x) override
{
if (x.get<FieldType>() != FieldType(getDataFromHolder()))
if (!ColumnConstDetails::equals(x.get<FieldType>(), FieldType(getDataFromHolder())))
throw Exception("Cannot insert different element into constant column " + getName(),
ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN);
++s;
@ -87,7 +110,7 @@ public:
void insertFrom(const IColumn & src, size_t n) override
{
if (getDataFromHolder() != static_cast<const Derived &>(src).getDataFromHolder())
if (!ColumnConstDetails::equals(getDataFromHolder(), static_cast<const Derived &>(src).getDataFromHolder()))
throw Exception("Cannot insert different element into constant column " + getName(),
ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN);
++s;
@ -105,7 +128,7 @@ public:
throw Exception("Method deserializeAndInsertFromArena is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
ColumnPtr filter(const Filter & filt) const override
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override
{
if (s != filt.size())
throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);

View File

@ -189,7 +189,7 @@ public:
memcpy(&chars[old_size], &src_concrete.chars[start * n], length * n);
}
ColumnPtr filter(const IColumn::Filter & filt) const override
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override
{
size_t col_size = size();
if (col_size != filt.size())
@ -197,7 +197,9 @@ public:
ColumnFixedString * res_ = new ColumnFixedString(n);
ColumnPtr res = res_;
res_->chars.reserve(chars.size());
if (result_size_hint)
res_->chars.reserve(result_size_hint > 0 ? result_size_hint * n : chars.size());
size_t offset = 0;
for (size_t i = 0; i < col_size; ++i, offset += n)
@ -276,6 +278,11 @@ public:
max = String();
}
void reserve(size_t size) override
{
chars.reserve(n * size);
};
Chars_t & getChars() { return chars; }
const Chars_t & getChars() const { return chars; }

View File

@ -178,7 +178,7 @@ public:
}
}
ColumnPtr filter(const Filter & filt) const override
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override
{
if (offsets.size() == 0)
return new ColumnString;
@ -189,7 +189,7 @@ public:
Chars_t & res_chars = res->chars;
Offsets_t & res_offsets = res->offsets;
filterArraysImpl<UInt8>(chars, offsets, res_chars, res_offsets, filt);
filterArraysImpl<UInt8>(chars, offsets, res_chars, res_offsets, filt, result_size_hint);
return res_;
}

View File

@ -123,12 +123,12 @@ public:
start, length);
}
ColumnPtr filter(const Filter & filt) const override
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override
{
Block res_block = data.cloneEmpty();
for (size_t i = 0; i < columns.size(); ++i)
res_block.unsafeGetByPosition(i).column = data.unsafeGetByPosition(i).column->filter(filt);
res_block.unsafeGetByPosition(i).column = data.unsafeGetByPosition(i).column->filter(filt, result_size_hint);
return new ColumnTuple(res_block);
}

View File

@ -270,7 +270,7 @@ public:
memcpy(&data[old_size], &src_vec.data[start], length * sizeof(data[0]));
}
ColumnPtr filter(const IColumn::Filter & filt) const override
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override
{
size_t size = data.size();
if (size != filt.size())
@ -279,7 +279,9 @@ public:
Self * res_ = new Self;
ColumnPtr res = res_;
typename Self::Container_t & res_data = res_->getData();
res_data.reserve(size);
if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
/** Чуть более оптимизированная версия.
* Исходит из допущения, что часто куски последовательно идущих значений

View File

@ -17,6 +17,6 @@ template <typename T>
void filterArraysImpl(
const PODArray<T> & src_elems, const IColumn::Offsets_t & src_offsets,
PODArray<T> & res_elems, IColumn::Offsets_t & res_offsets,
const IColumn::Filter & filt);
const IColumn::Filter & filt, ssize_t result_size_hint);
}

View File

@ -176,9 +176,12 @@ public:
/** Оставить только значения, соответствующие фильтру.
* Используется для операции WHERE / HAVING.
* Если result_size_hint > 0, то сделать reserve этого размера у результата;
* если 0, то не делать reserve,
* иначе сделать reserve по размеру исходного столбца.
*/
typedef PODArray<UInt8> Filter;
virtual SharedPtr<IColumn> filter(const Filter & filt) const = 0;
virtual SharedPtr<IColumn> filter(const Filter & filt, ssize_t result_size_hint) const = 0;
/** Переставить значения местами, используя указанную перестановку.
* Используется при сортировке.

View File

@ -50,7 +50,7 @@ public:
s += length;
}
ColumnPtr filter(const Filter & filt) const override
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override
{
return cloneDummy(countBytesInFilter(filt));
}

View File

@ -302,7 +302,11 @@ protected:
void free()
{
Allocator::free(buf, getBufferSizeInBytes());
if (buf)
{
Allocator::free(buf, getBufferSizeInBytes());
buf = nullptr;
}
}
@ -397,6 +401,14 @@ protected:
}
void destroyElements()
{
if (!__has_trivial_destructor(Cell))
for (iterator it = begin(); it != end(); ++it)
it.ptr->~Cell();
}
public:
typedef Key key_type;
typedef typename Cell::value_type value_type;
@ -421,10 +433,7 @@ public:
~HashTable()
{
if (!__has_trivial_destructor(Cell))
for (iterator it = begin(); it != end(); ++it)
it.ptr->~Cell();
destroyElements();
free();
}
@ -789,6 +798,7 @@ public:
{
Cell::State::read(rb);
destroyElements();
this->clearHasZero();
m_size = 0;
@ -812,6 +822,7 @@ public:
{
Cell::State::readText(rb);
destroyElements();
this->clearHasZero();
m_size = 0;
@ -845,12 +856,23 @@ public:
void clear()
{
if (!__has_trivial_destructor(Cell))
for (iterator it = begin(); it != end(); ++it)
it.ptr->~Cell();
destroyElements();
this->clearHasZero();
m_size = 0;
memset(buf, 0, grower.bufSize() * sizeof(*buf));
}
void clearAndShrink()
{
destroyElements();
this->clearHasZero();
m_size = 0;
free();
Grower new_grower = grower;
new_grower.set(0);
alloc(new_grower);
}
size_t getBufferSizeInBytes() const

View File

@ -14,10 +14,13 @@ namespace DB
{
/** Can allocate memory objects of fixed size with deletion support.
* For small `object_size`s allocated no less than getMinAllocationSize() bytes. */
class SmallObjectPool
{
private:
struct Block { Block * next; };
static constexpr auto getMinAllocationSize() { return sizeof(Block); }
const std::size_t object_size;
Arena pool;
@ -25,16 +28,11 @@ private:
public:
SmallObjectPool(
const std::size_t object_size, const std::size_t initial_size = 4096, const std::size_t growth_factor = 2,
const std::size_t object_size_, const std::size_t initial_size = 4096, const std::size_t growth_factor = 2,
const std::size_t linear_growth_threshold = 128 * 1024 * 1024)
: object_size{object_size}, pool{initial_size, growth_factor, linear_growth_threshold}
: object_size{std::max(object_size_, getMinAllocationSize())},
pool{initial_size, growth_factor, linear_growth_threshold}
{
if (object_size < sizeof(Block))
throw Exception{
"Can't make allocations smaller than sizeof(Block) = " + std::to_string(sizeof(Block)),
ErrorCodes::LOGICAL_ERROR
};
if (pool.size() < object_size)
return;

View File

@ -10,41 +10,39 @@ namespace DB
{
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному блоку из каждого потока.
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному или несколько (до merging_threads) блоков из каждого источника.
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
*
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
*
* Замечания:
*
* На хорошей сети (10Gbit) может работать заметно медленнее, так как чтения блоков с разных
* удалённых серверов делаются последовательно, при этом, чтение упирается в CPU.
* Это несложно исправить.
*
* Можно держать в памяти не по одному блоку из каждого источника, а по несколько, и распараллелить мердж.
* При этом будет расходоваться кратно больше оперативки.
*/
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
{
public:
MergingAggregatedMemoryEfficientBlockInputStream(
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_);
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_,
size_t reading_threads_, size_t merging_threads_);
~MergingAggregatedMemoryEfficientBlockInputStream();
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
String getID() const override;
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
void readPrefix() override;
protected:
Block readImpl() override;
private:
Aggregator aggregator;
bool final;
size_t threads;
size_t reading_threads;
size_t merging_threads;
bool started = false;
bool has_two_level = false;
bool has_overflows = false;
volatile bool has_two_level = false;
volatile bool has_overflows = false;
int current_bucket_num = -1;
struct Input
@ -62,9 +60,13 @@ private:
using BlocksToMerge = Poco::SharedPtr<BlocksList>;
void start();
/// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках.
BlocksToMerge getNextBlocksToMerge();
std::unique_ptr<boost::threadpool::pool> reading_pool;
/// Для параллельного мерджа.
struct OutputData
{
@ -81,7 +83,8 @@ private:
boost::threadpool::pool pool;
std::mutex get_next_blocks_mutex;
ConcurrentBoundedQueue<OutputData> result_queue;
bool exhausted = false;
bool exhausted = false; /// Данных больше нет.
bool finish = false; /// Нужно завершить работу раньше, чем данные закончились.
std::atomic<size_t> active_threads;
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {}

View File

@ -88,13 +88,8 @@ protected:
if (!aggregator.hasTemporaryFiles())
{
/** Если все частично-агрегированные данные в оперативке, то мерджим их параллельно, тоже в оперативке.
* NOTE Если израсходовано больше половины допустимой памяти, то мерджить следовало бы более экономно.
*/
AggregatedDataVariantsPtr data_variants = aggregator.merge(many_data, max_threads);
if (data_variants)
impl.reset(new BlocksListBlockInputStream(
aggregator.convertToBlocks(*data_variants, final, max_threads)));
impl = aggregator.mergeAndConvertToBlocks(many_data, final, max_threads);
}
else
{
@ -116,7 +111,8 @@ protected:
<< (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
<< (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final, temporary_data_merge_threads));
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(
input_streams, params, final, temporary_data_merge_threads, temporary_data_merge_threads));
}
}
@ -208,10 +204,14 @@ private:
void onFinishThread(size_t thread_num)
{
if (parent.aggregator.hasTemporaryFiles())
if (!parent.isCancelled() && parent.aggregator.hasTemporaryFiles())
{
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще их потом объединять.
auto & data = *parent.many_data[thread_num];
if (data.isConvertibleToTwoLevel())
data.convertToTwoLevel();
size_t rows = data.sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(data, rows);
@ -220,12 +220,15 @@ private:
void onFinish()
{
if (parent.aggregator.hasTemporaryFiles())
if (!parent.isCancelled() && parent.aggregator.hasTemporaryFiles())
{
/// Может так получиться, что какие-то данные ещё не сброшены на диск,
/// потому что во время вызова onFinishThread ещё никакие данные не были сброшены на диск, а потом какие-то - были.
for (auto & data : parent.many_data)
{
if (data->isConvertibleToTwoLevel())
data->convertToTwoLevel();
size_t rows = data->sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(*data, rows);

View File

@ -211,7 +211,7 @@ private:
}
}
handler.onFinish();
handler.onFinish(); /// TODO Если в onFinish или onFinishThread эксепшен, то вызывается std::terminate.
}
}

View File

@ -77,7 +77,7 @@ public:
types.push_back(value_type_t::Int64);
else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(value_type_t::Float32);
else if (typeid_cast<const DataTypeInt64 *>(type))
else if (typeid_cast<const DataTypeFloat64 *>(type))
types.push_back(value_type_t::Float64);
else if (typeid_cast<const DataTypeString *>(type))
types.push_back(value_type_t::String);

View File

@ -77,7 +77,7 @@ struct ArrayFilterImpl
}
const IColumn::Filter & filter = column_filter->getData();
ColumnPtr filtered = array->getData().filter(filter);
ColumnPtr filtered = array->getData().filter(filter, -1);
const IColumn::Offsets_t & in_offsets = array->getOffsets();
ColumnArray::ColumnOffsets_t * column_offsets = new ColumnArray::ColumnOffsets_t(in_offsets.size());

View File

@ -129,7 +129,7 @@ public:
const ColumnConstArray * array_from = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[1]).column);
const ColumnConstArray * array_to = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[2]).column);
if (!array_from && !array_to)
if (!array_from || !array_to)
throw Exception("Second and third arguments of function " + getName() + " must be constant arrays.", ErrorCodes::ILLEGAL_COLUMN);
prepare(array_from->getData(), array_to->getData(), block, arguments);

View File

@ -714,6 +714,10 @@ struct AggregatedDataVariants : private boost::noncopyable
M(key8) \
M(key16) \
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
bool isConvertibleToTwoLevel() const
{
switch (type)
@ -845,14 +849,11 @@ public:
* которые могут быть затем объединены с другими состояниями (для распределённой обработки запроса).
* Если final = true, то в качестве столбцов-агрегатов создаются столбцы с готовыми значениями.
*/
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads);
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
/** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.)
* После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить,
* пока не будет вызвана функция convertToBlocks.
* Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации.
/** Объединить несколько структур данных агрегации и выдать результат в виде потока блоков.
*/
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads);
std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
/** Объединить поток частично агрегированных блоков в одну структуру данных.
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.)
@ -900,6 +901,7 @@ public:
protected:
friend struct AggregatedDataVariants;
friend class MergingAndConvertingBlockInputStream;
Params params;
@ -966,10 +968,15 @@ protected:
TemporaryFiles temporary_files;
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.
* Сформировать блок - пример результата. Он используется в методах convertToBlocks, mergeAndConvertToBlocks.
*/
void initialize(const Block & block);
/** Установить блок - пример результата,
* только если он ещё не был установлен.
*/
void setSampleBlock(const Block & block);
/** Выбрать способ агрегации на основе количества и типов ключей. */
AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes);
@ -1075,12 +1082,6 @@ protected:
Table & table_dst,
Table & table_src) const;
/// Слить все ключи, оставшиеся после предыдущего метода, в overflows.
template <typename Method, typename Table>
void mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const;
void mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
@ -1088,11 +1089,6 @@ protected:
void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method>
void mergeTwoLevelDataImpl(
ManyAggregatedDataVariants & many_data,
boost::threadpool::pool * thread_pool) const;
template <typename Method, typename Table>
void convertToBlockImpl(
Method & method,
@ -1126,6 +1122,13 @@ protected:
size_t rows,
Filler && filler) const;
template <typename Method>
Block convertOneBucketToBlock(
AggregatedDataVariants & data_variants,
Method & method,
bool final,
size_t bucket) const;
BlocksList prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
BlocksList prepareBlocksAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, boost::threadpool::pool * thread_pool) const;
@ -1160,6 +1163,10 @@ protected:
Block & block,
AggregatedDataVariants & result) const;
template <typename Method>
void mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const;
template <typename Method>
void convertBlockToTwoLevelImpl(
Method & method,
@ -1170,9 +1177,13 @@ protected:
const Block & source,
std::vector<Block> & destinations) const;
template <typename Method>
template <typename Method, typename Table>
void destroyImpl(
Method & method) const;
Method & method,
Table & data) const;
void destroyWithoutKey(
AggregatedDataVariants & result) const;
/** Проверяет ограничения на максимальное количество ключей для агрегации.

View File

@ -124,12 +124,15 @@ private:
auto filters = createFilters(block);
const auto num_shards = storage.cluster.getShardsInfo().size();
ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад.
for (size_t i = 0; i < num_shards; ++i)
{
auto target_block = block.cloneEmpty();
for (size_t col = 0; col < num_cols; ++col)
target_block.getByPosition(col).column = columns[col]->filter(filters[i]);
target_block.getByPosition(col).column = columns[col]->filter(filters[i], size_hint);
if (target_block.rowsInFirstColumn())
writeImpl(target_block, i);

View File

@ -324,7 +324,7 @@ protected:
ColumnWithTypeAndName & column = res.getByPosition(i);
if (column.name == prewhere_column && res.columns() > 1)
continue;
column.column = column.column->filter(column_name_set.count(column.name) ? post_filter : pre_filter);
column.column = column.column->filter(column_name_set.count(column.name) ? post_filter : pre_filter, -1);
rows = column.column->size();
}

View File

@ -53,11 +53,10 @@ public:
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancel().
* Считает количество таких вызовов для поддержки нескольких наложенных друг на друга отмен.
*/
void cancel() { ++cancelled; }
void uncancel() { --cancelled; }
bool isCancelled() const { return cancelled > 0; }
void cancel() { cancelled = true; }
void uncancel() { cancelled = false; }
bool isCancelled() const { return cancelled; }
private:
MergeTreeData & data;
@ -67,7 +66,7 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0;
std::atomic<int> cancelled {0};
std::atomic<bool> cancelled {false};
};

View File

@ -150,7 +150,7 @@ public:
per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx]);
}
public:
private:
std::vector<std::size_t> fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,
const bool check_columns)

View File

@ -255,7 +255,7 @@ private:
if (col.name == prewhere_column && res.columns() > 1)
continue;
col.column =
col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter);
col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter, -1);
rows = col.column->size();
}

View File

@ -128,7 +128,8 @@ private:
void flushAllBuffers(bool check_thresholds = true);
/// Сбросить буфер. Если выставлено check_thresholds - сбрасывает только если превышены пороги.
void flushBuffer(Buffer & buffer, bool check_thresholds);
bool checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
/// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у.
void writeBlockToDestination(const Block & block, StoragePtr table);

View File

@ -41,24 +41,24 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
}
ColumnPtr ColumnArray::filter(const Filter & filt) const
ColumnPtr ColumnArray::filter(const Filter & filt, ssize_t result_size_hint) const
{
if (typeid_cast<const ColumnUInt8 *>(data.get())) return filterNumber<UInt8>(filt);
if (typeid_cast<const ColumnUInt16 *>(data.get())) return filterNumber<UInt16>(filt);
if (typeid_cast<const ColumnUInt32 *>(data.get())) return filterNumber<UInt32>(filt);
if (typeid_cast<const ColumnUInt64 *>(data.get())) return filterNumber<UInt64>(filt);
if (typeid_cast<const ColumnInt8 *>(data.get())) return filterNumber<Int8>(filt);
if (typeid_cast<const ColumnInt16 *>(data.get())) return filterNumber<Int16>(filt);
if (typeid_cast<const ColumnInt32 *>(data.get())) return filterNumber<Int32>(filt);
if (typeid_cast<const ColumnInt64 *>(data.get())) return filterNumber<Int64>(filt);
if (typeid_cast<const ColumnFloat32 *>(data.get())) return filterNumber<Float32>(filt);
if (typeid_cast<const ColumnFloat64 *>(data.get())) return filterNumber<Float64>(filt);
if (typeid_cast<const ColumnString *>(data.get())) return filterString(filt);
return filterGeneric(filt);
if (typeid_cast<const ColumnUInt8 *>(data.get())) return filterNumber<UInt8>(filt, result_size_hint);
if (typeid_cast<const ColumnUInt16 *>(data.get())) return filterNumber<UInt16>(filt, result_size_hint);
if (typeid_cast<const ColumnUInt32 *>(data.get())) return filterNumber<UInt32>(filt, result_size_hint);
if (typeid_cast<const ColumnUInt64 *>(data.get())) return filterNumber<UInt64>(filt, result_size_hint);
if (typeid_cast<const ColumnInt8 *>(data.get())) return filterNumber<Int8>(filt, result_size_hint);
if (typeid_cast<const ColumnInt16 *>(data.get())) return filterNumber<Int16>(filt, result_size_hint);
if (typeid_cast<const ColumnInt32 *>(data.get())) return filterNumber<Int32>(filt, result_size_hint);
if (typeid_cast<const ColumnInt64 *>(data.get())) return filterNumber<Int64>(filt, result_size_hint);
if (typeid_cast<const ColumnFloat32 *>(data.get())) return filterNumber<Float32>(filt, result_size_hint);
if (typeid_cast<const ColumnFloat64 *>(data.get())) return filterNumber<Float64>(filt, result_size_hint);
if (typeid_cast<const ColumnString *>(data.get())) return filterString(filt, result_size_hint);
return filterGeneric(filt, result_size_hint);
}
template <typename T>
ColumnPtr ColumnArray::filterNumber(const Filter & filt) const
ColumnPtr ColumnArray::filterNumber(const Filter & filt, ssize_t result_size_hint) const
{
if (getOffsets().size() == 0)
return new ColumnArray(data);
@ -69,11 +69,11 @@ ColumnPtr ColumnArray::filterNumber(const Filter & filt) const
PODArray<T> & res_elems = static_cast<ColumnVector<T> &>(res->getData()).getData();
Offsets_t & res_offsets = res->getOffsets();
filterArraysImpl<T>(static_cast<const ColumnVector<T> &>(*data).getData(), getOffsets(), res_elems, res_offsets, filt);
filterArraysImpl<T>(static_cast<const ColumnVector<T> &>(*data).getData(), getOffsets(), res_elems, res_offsets, filt, result_size_hint);
return res_;
}
ColumnPtr ColumnArray::filterString(const Filter & filt) const
ColumnPtr ColumnArray::filterString(const Filter & filt, ssize_t result_size_hint) const
{
size_t col_size = getOffsets().size();
if (col_size != filt.size())
@ -94,9 +94,12 @@ ColumnPtr ColumnArray::filterString(const Filter & filt) const
Offsets_t & res_string_offsets = typeid_cast<ColumnString &>(res->getData()).getOffsets();
Offsets_t & res_offsets = res->getOffsets();
res_chars.reserve(src_chars.size());
res_string_offsets.reserve(src_string_offsets.size());
res_offsets.reserve(col_size);
if (result_size_hint < 0) /// Остальные случаи не рассматриваем.
{
res_chars.reserve(src_chars.size());
res_string_offsets.reserve(src_string_offsets.size());
res_offsets.reserve(col_size);
}
Offset_t prev_src_offset = 0;
Offset_t prev_src_string_offset = 0;
@ -139,7 +142,7 @@ ColumnPtr ColumnArray::filterString(const Filter & filt) const
return res_;
}
ColumnPtr ColumnArray::filterGeneric(const Filter & filt) const
ColumnPtr ColumnArray::filterGeneric(const Filter & filt, ssize_t result_size_hint) const
{
size_t size = getOffsets().size();
if (size != filt.size())
@ -159,10 +162,18 @@ ColumnPtr ColumnArray::filterGeneric(const Filter & filt) const
ColumnArray * res_ = new ColumnArray(data);
ColumnPtr res = res_;
res_->data = data->filter(nested_filt);
ssize_t nested_result_size_hint = 0;
if (result_size_hint < 0)
nested_result_size_hint = result_size_hint;
else if (result_size_hint && result_size_hint < 1000000000 && data->size() < 1000000000) /// Избегаем переполнения.
nested_result_size_hint = result_size_hint * data->size() / size;
res_->data = data->filter(nested_filt, nested_result_size_hint);
Offsets_t & res_offsets = res_->getOffsets();
res_offsets.reserve(size);
if (result_size_hint)
res_offsets.reserve(result_size_hint > 0 ? result_size_hint : size);
size_t current_offset = 0;
for (size_t i = 0; i < size; ++i)

View File

@ -47,14 +47,21 @@ template <typename T>
void filterArraysImpl(
const PODArray<T> & src_elems, const IColumn::Offsets_t & src_offsets,
PODArray<T> & res_elems, IColumn::Offsets_t & res_offsets,
const IColumn::Filter & filt)
const IColumn::Filter & filt, ssize_t result_size_hint)
{
const size_t size = src_offsets.size();
if (size != filt.size())
throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
res_elems.reserve(src_elems.size());
res_offsets.reserve(size);
if (result_size_hint)
{
res_offsets.reserve(result_size_hint > 0 ? result_size_hint : size);
if (result_size_hint < 0)
res_elems.reserve(src_elems.size());
else if (result_size_hint < 1000000000 && src_elems.size() < 1000000000) /// Избегаем переполнения.
res_elems.reserve(result_size_hint * src_elems.size() / size);
}
IColumn::Offset_t current_src_offset = 0;
@ -150,24 +157,24 @@ void filterArraysImpl(
/// Явные инстанцирования - чтобы не размещать реализацию функции выше в заголовочном файле.
template void filterArraysImpl<UInt8>(
const PODArray<UInt8> &, const IColumn::Offsets_t &, PODArray<UInt8> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<UInt8> &, const IColumn::Offsets_t &, PODArray<UInt8> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<UInt16>(
const PODArray<UInt16> &, const IColumn::Offsets_t &, PODArray<UInt16> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<UInt16> &, const IColumn::Offsets_t &, PODArray<UInt16> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<UInt32>(
const PODArray<UInt32> &, const IColumn::Offsets_t &, PODArray<UInt32> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<UInt32> &, const IColumn::Offsets_t &, PODArray<UInt32> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<UInt64>(
const PODArray<UInt64> &, const IColumn::Offsets_t &, PODArray<UInt64> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<UInt64> &, const IColumn::Offsets_t &, PODArray<UInt64> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<Int8>(
const PODArray<Int8> &, const IColumn::Offsets_t &, PODArray<Int8> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<Int8> &, const IColumn::Offsets_t &, PODArray<Int8> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<Int16>(
const PODArray<Int16> &, const IColumn::Offsets_t &, PODArray<Int16> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<Int16> &, const IColumn::Offsets_t &, PODArray<Int16> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<Int32>(
const PODArray<Int32> &, const IColumn::Offsets_t &, PODArray<Int32> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<Int32> &, const IColumn::Offsets_t &, PODArray<Int32> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<Int64>(
const PODArray<Int64> &, const IColumn::Offsets_t &, PODArray<Int64> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<Int64> &, const IColumn::Offsets_t &, PODArray<Int64> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<Float32>(
const PODArray<Float32> &, const IColumn::Offsets_t &, PODArray<Float32> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<Float32> &, const IColumn::Offsets_t &, PODArray<Float32> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
template void filterArraysImpl<Float64>(
const PODArray<Float64> &, const IColumn::Offsets_t &, PODArray<Float64> &, IColumn::Offsets_t &, const IColumn::Filter &);
const PODArray<Float64> &, const IColumn::Offsets_t &, PODArray<Float64> &, IColumn::Offsets_t &, const IColumn::Filter &, ssize_t);
}

View File

@ -169,7 +169,7 @@ bool filterBlockWithQuery(ASTPtr query, Block & block, const Context & context)
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnPtr & column = block.getByPosition(i).column;
column = column->filter(filter);
column = column->filter(filter, -1);
}
return true;

View File

@ -12,17 +12,17 @@ Block AggregatingBlockInputStream::readImpl()
if (!executed)
{
executed = true;
AggregatedDataVariants data_variants;
AggregatedDataVariantsPtr data_variants = new AggregatedDataVariants;
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
aggregator.execute(children.back(), data_variants);
aggregator.execute(children.back(), *data_variants);
if (!aggregator.hasTemporaryFiles())
{
impl.reset(new BlocksListBlockInputStream(
aggregator.convertToBlocks(data_variants, final, 1)));
ManyAggregatedDataVariants many_data { data_variants };
impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
}
else
{
@ -32,10 +32,13 @@ Block AggregatingBlockInputStream::readImpl()
ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge);
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще.
size_t rows = data_variants.sizeWithoutOverflowRow();
if (rows)
aggregator.writeToTemporaryFile(data_variants, rows);
if (!isCancelled())
{
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще.
size_t rows = data_variants->sizeWithoutOverflowRow();
if (rows)
aggregator.writeToTemporaryFile(*data_variants, rows);
}
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
@ -49,7 +52,7 @@ Block AggregatingBlockInputStream::readImpl()
<< (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
<< (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final, 1));
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final, 1, 1));
}
}

View File

@ -153,7 +153,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
Block block = merging_block->block;
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(merging_block->filter);
block.getByPosition(i).column = block.getByPosition(i).column->filter(merging_block->filter, -1);
output_blocks.pop_back();
delete merging_block;

View File

@ -107,7 +107,7 @@ Block DistinctBlockInputStream::readImpl()
size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(filter);
block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, -1);
return block;
}

View File

@ -77,7 +77,7 @@ Block FilterBlockInputStream::readImpl()
if (first_non_constant_column != static_cast<size_t>(filter_column))
{
ColumnWithTypeAndName & current_column = res.getByPosition(first_non_constant_column);
current_column.column = current_column.column->filter(filter);
current_column.column = current_column.column->filter(filter, -1);
filtered_rows = current_column.column->size();
}
else
@ -116,7 +116,7 @@ Block FilterBlockInputStream::readImpl()
if (current_column.column->isConst())
current_column.column = current_column.column->cut(0, filtered_rows);
else
current_column.column = current_column.column->filter(filter);
current_column.column = current_column.column->filter(filter, -1);
}
return res;

View File

@ -8,12 +8,15 @@ namespace DB
MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficientBlockInputStream(
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_)
: aggregator(params), final(final_), threads(threads_), inputs(inputs_.begin(), inputs_.end())
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t reading_threads_, size_t merging_threads_)
: aggregator(params), final(final_),
reading_threads(std::min(reading_threads_, inputs_.size())), merging_threads(merging_threads_),
inputs(inputs_.begin(), inputs_.end())
{
children = inputs_;
}
String MergingAggregatedMemoryEfficientBlockInputStream::getID() const
{
std::stringstream res;
@ -24,21 +27,61 @@ String MergingAggregatedMemoryEfficientBlockInputStream::getID() const
return res.str();
}
Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix()
{
if (threads == 1)
start();
}
void MergingAggregatedMemoryEfficientBlockInputStream::start()
{
if (started)
return;
started = true;
/// Если child - RemoteBlockInputStream, то child->readPrefix() отправляет запрос на удалённый сервер, инициируя вычисления.
if (reading_threads == 1)
{
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
/** NOTE: Если соединения ещё не установлены, то устанавливает их последовательно.
* И отправляет запрос последовательно. Это медленно.
*/
if (!started)
for (auto & child : children)
child->readPrefix();
}
else
{
reading_pool.reset(new boost::threadpool::pool(reading_threads));
size_t num_children = children.size();
std::vector<std::packaged_task<void()>> tasks(num_children);
for (size_t i = 0; i < num_children; ++i)
{
started = true;
for (auto & child : children)
auto & child = children[i];
auto & task = tasks[i];
auto memory_tracker = current_memory_tracker;
task = std::packaged_task<void()>([&child, memory_tracker]
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
child->readPrefix();
});
reading_pool->schedule([&task] { task(); });
}
reading_pool->wait();
for (auto & task : tasks)
task.get_future().get();
}
}
Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
{
start();
if (merging_threads == 1)
{
if (BlocksToMerge blocks_to_merge = getNextBlocksToMerge())
return aggregator.mergeBlocks(*blocks_to_merge, final);
return {};
@ -51,32 +94,14 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
if (!parallel_merge_data)
{
parallel_merge_data.reset(new ParallelMergeData(threads));
parallel_merge_data.reset(new ParallelMergeData(merging_threads));
auto & pool = parallel_merge_data->pool;
/** Если child - RemoteBlockInputStream, то соединения и отправку запроса тоже будем делать параллельно.
*/
started = true;
size_t num_children = children.size();
std::vector<std::packaged_task<void()>> tasks(num_children);
for (size_t i = 0; i < num_children; ++i)
{
auto & child = children[i];
auto & task = tasks[i];
task = std::packaged_task<void()>([&child] { child->readPrefix(); });
pool.schedule([&task] { task(); });
}
pool.wait();
for (auto & task : tasks)
task.get_future().get();
/** Создаём потоки, которые будут получать и мерджить данные.
*/
for (size_t i = 0; i < threads; ++i)
for (size_t i = 0; i < merging_threads; ++i)
pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread,
this, current_memory_tracker));
}
@ -95,22 +120,48 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
}
MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEfficientBlockInputStream()
{
if (reading_pool)
reading_pool->wait();
if (parallel_merge_data)
{
LOG_TRACE((&Logger::get("MergingAggregatedMemoryEfficientBlockInputStream")), "Waiting for threads to finish");
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
parallel_merge_data->finish = true;
}
parallel_merge_data->result_queue.clear();
parallel_merge_data->pool.wait();
}
}
void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker * memory_tracker)
{
setThreadName("MrgAggMemEffThr");
setThreadName("MergeAggMergThr");
current_memory_tracker = memory_tracker;
try
{
while (true)
{
/// Получение следующих блоков делается последовательно, а мердж - параллельно.
/** Получение следующих блоков делается в одном пуле потоков, а мердж - в другом.
* Это весьма сложное взаимодействие.
* Каждый раз,
* - reading_threads читают по одному следующему блоку из каждого источника;
* - из этих блоков составляется группа блоков для слияния;
* - один из merging_threads выполняет слияние этой группы блоков;
*/
BlocksToMerge blocks_to_merge;
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
if (parallel_merge_data->exhausted)
if (parallel_merge_data->exhausted || parallel_merge_data->finish)
break;
blocks_to_merge = getNextBlocksToMerge();
@ -122,7 +173,16 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
}
}
parallel_merge_data->result_queue.push(aggregator.mergeBlocks(*blocks_to_merge, final));
Block res = aggregator.mergeBlocks(*blocks_to_merge, final);
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
if (parallel_merge_data->finish)
break;
parallel_merge_data->result_queue.push(OutputData(std::move(res)));
}
}
}
catch (...)
@ -162,14 +222,16 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
++current_bucket_num;
for (auto & input : inputs)
/// Получить из источника следующий блок с номером корзины не больше current_bucket_num.
auto need_that_input = [this] (Input & input)
{
if (input.is_exhausted)
continue;
if (input.block.info.bucket_num >= current_bucket_num)
continue;
return !input.is_exhausted
&& input.block.info.bucket_num < current_bucket_num;
};
auto read_from_input = [this] (Input & input)
{
/// Если придёт блок не с основными данными, а с overflows, то запомним его и повторим чтение.
while (true)
{
@ -210,6 +272,39 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
break;
}
};
if (reading_threads == 1)
{
for (auto & input : inputs)
if (need_that_input(input))
read_from_input(input);
}
else
{
size_t num_inputs = inputs.size();
std::vector<std::packaged_task<void()>> tasks;
tasks.reserve(num_inputs);
for (auto & input : inputs)
{
if (need_that_input(input))
{
auto memory_tracker = current_memory_tracker;
tasks.emplace_back([&input, &read_from_input, memory_tracker]
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
read_from_input(input);
});
auto & task = tasks.back();
reading_pool->schedule([&task] { task(); });
}
}
reading_pool->wait();
for (auto & task : tasks)
task.get_future().get();
}
while (true)

View File

@ -107,7 +107,7 @@ Block TotalsHavingBlockInputStream::readImpl()
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName & current_column = finalized.getByPosition(i);
current_column.column = current_column.column->filter(filter);
current_column.column = current_column.column->filter(filter, -1);
if (current_column.column->empty())
{
finalized.clear();

View File

@ -5,6 +5,7 @@
#include <cxxabi.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Common/setThreadName.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/Columns/ColumnsNumber.h>
@ -12,6 +13,7 @@
#include <DB/AggregateFunctions/AggregateFunctionCount.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
@ -88,7 +90,8 @@ void Aggregator::initialize(const Block & block)
initialized = true;
memory_usage_before_aggregation = current_memory_tracker->get();
if (current_memory_tracker)
memory_usage_before_aggregation = current_memory_tracker->get();
aggregate_functions.resize(params.aggregates_size);
for (size_t i = 0; i < params.aggregates_size; ++i)
@ -152,6 +155,15 @@ void Aggregator::initialize(const Block & block)
}
void Aggregator::setSampleBlock(const Block & block)
{
std::lock_guard<std::mutex> lock(mutex);
if (!sample)
sample = block.cloneEmpty();
}
void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
{
std::lock_guard<std::mutex> lock(mutex);
@ -730,7 +742,10 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
}
size_t result_size = result.sizeWithoutOverflowRow();
auto current_memory_usage = current_memory_tracker->get();
Int64 current_memory_usage = 0;
if (current_memory_tracker)
current_memory_usage = current_memory_tracker->get();
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Здесь учитываются все результаты в сумме, из разных потоков.
bool worth_convert_to_two_level
@ -824,6 +839,30 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
}
template <typename Method>
Block Aggregator::convertOneBucketToBlock(
AggregatedDataVariants & data_variants,
Method & method,
bool final,
size_t bucket) const
{
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
[bucket, &method, this] (
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
bool final)
{
convertToBlockImpl(method, method.data.impls[bucket],
key_columns, aggregate_columns, final_aggregate_columns, key_sizes, final);
});
block.info.bucket_num = bucket;
return block;
}
template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
@ -839,19 +878,7 @@ void Aggregator::writeToTemporaryFileImpl(
if (method.data.impls[bucket].empty())
continue;
Block block = prepareBlockAndFill(data_variants, false, method.data.impls[bucket].size(),
[bucket, &method, this] (
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
bool final)
{
convertToBlockImpl(method, method.data.impls[bucket],
key_columns, aggregate_columns, final_aggregate_columns, key_sizes, final);
});
block.info.bucket_num = bucket;
Block block = convertOneBucketToBlock(data_variants, method, false, bucket);
out.write(block);
size_t block_size_rows = block.rowsInFirstColumn();
@ -954,6 +981,9 @@ void Aggregator::convertToBlockImpl(
convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns, key_sizes);
else
convertToBlockImplNotFinal(method, data, key_columns, aggregate_columns, key_sizes);
/// Для того, чтобы пораньше освободить память.
data.clearAndShrink();
}
@ -974,6 +1004,8 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
destroyImpl(method, data); /// NOTE Можно сделать лучше.
}
template <typename Method, typename Table>
@ -1109,6 +1141,11 @@ BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & d
if (is_overflows)
block.info.is_overflows = true;
if (final)
destroyWithoutKey(data_variants);
else
data_variants.without_key = nullptr;
BlocksList blocks;
blocks.emplace_back(std::move(block));
return blocks;
@ -1125,13 +1162,13 @@ BlocksList Aggregator::prepareBlocksAndFillSingleLevel(AggregatedDataVariants &
const Sizes & key_sizes,
bool final)
{
#define M(NAME, IS_TWO_LEVEL) \
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
key_columns, aggregate_columns, final_aggregate_columns, data_variants.key_sizes, final);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -1164,35 +1201,10 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
bool final,
boost::threadpool::pool * thread_pool) const
{
auto filler = [&method, this](
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
bool final,
size_t bucket)
{
convertToBlockImpl(method, method.data.impls[bucket],
key_columns, aggregate_columns, final_aggregate_columns, key_sizes, final);
};
auto converter = [&](size_t bucket, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
[bucket, &filler] (
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
bool final)
{
filler(key_columns, aggregate_columns, final_aggregate_columns, key_sizes, final, bucket);
});
block.info.bucket_num = bucket;
return block;
return convertOneBucketToBlock(data_variants, method, final, bucket);
};
/// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
@ -1266,7 +1278,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
}
BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads)
BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{
if (isCancelled())
return BlocksList();
@ -1439,31 +1451,6 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
}
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
if (Method::getAggregateData(it->second) == nullptr)
continue;
AggregateDataPtr res_data = overflows;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr;
}
}
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
@ -1518,109 +1505,222 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
/// current не будет уничтожать состояния агрегатных функций в деструкторе
current.aggregator = nullptr;
getDataVariant<Method>(current).data.clearAndShrink();
}
}
template <typename Method>
void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data,
boost::threadpool::pool * thread_pool) const
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const
{
AggregatedDataVariantsPtr & res = non_empty_data[0];
/// В данном случае, no_more_keys будет выставлено, только если в первом (самом большом) состоянии достаточно много строк.
bool no_more_keys = false;
if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys))
return;
/// Слияние распараллеливается по корзинам - первому уровню TwoLevelHashMap.
auto merge_bucket = [&non_empty_data, &res, no_more_keys, this](size_t bucket, MemoryTracker * memory_tracker)
/// Все результаты агрегации соединяем с первым.
AggregatedDataVariantsPtr & res = data[0];
for (size_t i = 1, size = data.size(); i < size; ++i)
{
current_memory_tracker = memory_tracker;
AggregatedDataVariants & current = *data[i];
/// Все результаты агрегации соединяем с первым.
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
AggregatedDataVariants & current = *non_empty_data[i];
if (!no_more_keys)
{
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
}
else
{
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
}
}
};
/// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
std::vector<std::packaged_task<void()>> tasks(Method::Data::NUM_BUCKETS);
try
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
tasks[bucket] = std::packaged_task<void()>(std::bind(merge_bucket, bucket, current_memory_tracker));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
else
tasks[bucket]();
}
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
}
catch (...)
{
/// Если этого не делать, то в случае исключения, tasks уничтожится раньше завершения потоков, и будет плохо.
if (thread_pool)
thread_pool->wait();
throw;
}
if (thread_pool)
thread_pool->wait();
for (auto & task : tasks)
if (task.valid())
task.get_future().get();
if (no_more_keys && params.overflow_row)
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
AggregatedDataVariants & current = *non_empty_data[i];
mergeDataRemainingKeysToOverflowsImpl<Method>(
res->without_key,
getDataVariant<Method>(current).data.impls[bucket]);
}
}
}
/// aggregator не будет уничтожать состояния агрегатных функций в деструкторе
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
non_empty_data[i]->aggregator = nullptr;
}
AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads)
/** Объединят вместе состояния агрегации, превращает их в блоки и выдаёт потоково.
* Если состояния агрегации двухуровневые, то выдаёт блоки строго по порядку bucket_num.
* (Это важно при распределённой обработке.)
* При этом, может обрабатывать разные bucket-ы параллельно, используя до threads потоков.
*/
class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
{
public:
/** На вход подаётся набор непустых множеств частично агрегированных данных,
* которые все либо являются одноуровневыми, либо являются двухуровневыми.
*/
MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_)
: aggregator(aggregator_), data(data_), final(final_), threads(threads_) {}
String getName() const override { return "MergingAndConverting"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
~MergingAndConvertingBlockInputStream()
{
if (parallel_merge_data)
{
LOG_TRACE(&Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish");
parallel_merge_data->pool.wait();
}
}
protected:
Block readImpl() override
{
if (data.empty())
return {};
if (current_bucket_num >= NUM_BUCKETS)
return {};
AggregatedDataVariantsPtr & first = data[0];
if (current_bucket_num == -1)
{
++current_bucket_num;
if (first->type == AggregatedDataVariants::Type::without_key || aggregator.params.overflow_row)
{
aggregator.mergeWithoutKeyDataImpl(data);
return aggregator.prepareBlocksAndFillWithoutKey(
*first, final, first->type != AggregatedDataVariants::Type::without_key).front();
}
}
if (!first->isTwoLevel())
{
if (current_bucket_num > 0)
return {};
if (first->type == AggregatedDataVariants::Type::without_key)
return {};
++current_bucket_num;
#define M(NAME) \
else if (first->type == AggregatedDataVariants::Type::NAME) \
aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(data);
if (false) {}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
return aggregator.prepareBlocksAndFillSingleLevel(*first, final).front();
}
else
{
if (!parallel_merge_data)
{
parallel_merge_data.reset(new ParallelMergeData(threads));
for (size_t i = 0; i < threads; ++i)
scheduleThreadForNextBucket();
}
Block res;
while (true)
{
std::unique_lock<std::mutex> lock(parallel_merge_data->mutex);
if (parallel_merge_data->exception)
std::rethrow_exception(parallel_merge_data->exception);
auto it = parallel_merge_data->ready_blocks.find(current_bucket_num);
if (it != parallel_merge_data->ready_blocks.end())
{
++current_bucket_num;
scheduleThreadForNextBucket();
if (it->second)
{
res.swap(it->second);
break;
}
else if (current_bucket_num >= NUM_BUCKETS)
break;
}
parallel_merge_data->condvar.wait(lock);
}
return res;
}
}
private:
const Aggregator & aggregator;
ManyAggregatedDataVariants data;
bool final;
size_t threads;
Int32 current_bucket_num = -1;
Int32 max_scheduled_bucket_num = -1;
static constexpr Int32 NUM_BUCKETS = 256;
struct ParallelMergeData
{
boost::threadpool::pool pool;
std::map<Int32, Block> ready_blocks;
std::exception_ptr exception;
std::mutex mutex;
std::condition_variable condvar;
ParallelMergeData(size_t threads) : pool(threads) {}
};
std::unique_ptr<ParallelMergeData> parallel_merge_data;
void scheduleThreadForNextBucket()
{
++max_scheduled_bucket_num;
if (max_scheduled_bucket_num >= NUM_BUCKETS)
return;
parallel_merge_data->pool.schedule(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
max_scheduled_bucket_num, current_memory_tracker));
}
void thread(Int32 bucket_num, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
setThreadName("MergingAggregtd");
try
{
/// TODO Возможно, поддержать no_more_keys
auto & merged_data = *data[0];
auto method = merged_data.type;
Block block;
if (false) {}
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num); \
block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
std::lock_guard<std::mutex> lock(parallel_merge_data->mutex);
parallel_merge_data->ready_blocks[bucket_num] = std::move(block);
}
catch (...)
{
std::lock_guard<std::mutex> lock(parallel_merge_data->mutex);
parallel_merge_data->exception = std::current_exception();
}
parallel_merge_data->condvar.notify_all();
}
};
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::merge().", ErrorCodes::EMPTY_DATA_PASSED);
throw Exception("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED);
LOG_TRACE(log, "Merging aggregated data");
Stopwatch watch;
ManyAggregatedDataVariants non_empty_data;
non_empty_data.reserve(data_variants.size());
for (auto & data : data_variants)
@ -1628,17 +1728,17 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
non_empty_data.push_back(data);
if (non_empty_data.empty())
return data_variants[0];
return std::unique_ptr<IBlockInputStream>(new NullBlockInputStream);
if (non_empty_data.size() == 1)
return non_empty_data[0];
/// Отсортируем состояния по убыванию размера, чтобы мердж был более эффективным (так как все состояния мерджатся в первое).
std::sort(non_empty_data.begin(), non_empty_data.end(),
[](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
{
return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();
});
if (non_empty_data.size() > 1)
{
/// Отсортируем состояния по убыванию размера, чтобы мердж был более эффективным (так как все состояния мерджатся в первое).
std::sort(non_empty_data.begin(), non_empty_data.end(),
[](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
{
return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();
});
}
/// Если хотя бы один из вариантов двухуровневый, то переконвертируем все варианты в двухуровневые, если есть не такие.
/// Замечание - возможно, было бы более оптимально не конвертировать одноуровневые варианты перед мерджем, а мерджить их отдельно, в конце.
@ -1658,83 +1758,13 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
if (!variant->isTwoLevel())
variant->convertToTwoLevel();
AggregatedDataVariantsPtr & res = non_empty_data[0];
AggregatedDataVariantsPtr & first = non_empty_data[0];
size_t rows = res->size();
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
rows += non_empty_data[i]->size();
AggregatedDataVariants & current = *non_empty_data[i];
if (res->type != current.type)
if (first->type != non_empty_data[i]->type)
throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
res->aggregates_pools.insert(res->aggregates_pools.end(), current.aggregates_pools.begin(), current.aggregates_pools.end());
}
/// В какой структуре данных агрегированы данные?
if (res->type == AggregatedDataVariants::Type::without_key || params.overflow_row)
mergeWithoutKeyDataImpl(non_empty_data);
std::unique_ptr<boost::threadpool::pool> thread_pool;
if (max_threads > 1 && rows > 100000 /// TODO Сделать настраиваемый порог.
&& res->isTwoLevel())
thread_pool.reset(new boost::threadpool::pool(max_threads));
/// TODO Упростить.
if (res->type == AggregatedDataVariants::Type::key8)
mergeSingleLevelDataImpl<decltype(res->key8)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::key16)
mergeSingleLevelDataImpl<decltype(res->key16)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::key32)
mergeSingleLevelDataImpl<decltype(res->key32)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::key64)
mergeSingleLevelDataImpl<decltype(res->key64)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::key_string)
mergeSingleLevelDataImpl<decltype(res->key_string)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::key_fixed_string)
mergeSingleLevelDataImpl<decltype(res->key_fixed_string)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::keys128)
mergeSingleLevelDataImpl<decltype(res->keys128)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::keys256)
mergeSingleLevelDataImpl<decltype(res->keys256)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::hashed)
mergeSingleLevelDataImpl<decltype(res->hashed)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::concat)
mergeSingleLevelDataImpl<decltype(res->concat)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::serialized)
mergeSingleLevelDataImpl<decltype(res->serialized)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::key32_two_level)
mergeTwoLevelDataImpl<decltype(res->key32_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::key64_two_level)
mergeTwoLevelDataImpl<decltype(res->key64_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::key_string_two_level)
mergeTwoLevelDataImpl<decltype(res->key_string_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::key_fixed_string_two_level)
mergeTwoLevelDataImpl<decltype(res->key_fixed_string_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::keys128_two_level)
mergeTwoLevelDataImpl<decltype(res->keys128_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::keys256_two_level)
mergeTwoLevelDataImpl<decltype(res->keys256_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::hashed_two_level)
mergeTwoLevelDataImpl<decltype(res->hashed_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::concat_two_level)
mergeTwoLevelDataImpl<decltype(res->concat_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::serialized_two_level)
mergeTwoLevelDataImpl<decltype(res->serialized_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
double elapsed_seconds = watch.elapsedSeconds();
size_t res_rows = res->size();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Merged aggregated data. "
<< "From " << rows << " to " << res_rows << " rows (efficiency: " << static_cast<double>(rows) / res_rows << ")"
<< " in " << elapsed_seconds << " sec."
<< " (" << rows / elapsed_seconds << " rows/sec.)");
return res;
return std::unique_ptr<IBlockInputStream>(new MergingAndConvertingBlockInputStream(*this, non_empty_data, final, max_threads));
}
@ -1874,8 +1904,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
AggregateColumnsData aggregate_columns(params.aggregates_size);
Block empty_block;
initialize(empty_block);
initialize({});
if (isCancelled())
return;
@ -1908,8 +1937,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
if (bucket_to_blocks.empty())
return;
if (!sample)
sample = bucket_to_blocks.begin()->second.front().cloneEmpty();
setSampleBlock(bucket_to_blocks.begin()->second.front());
/// Каким способом выполнять агрегацию?
for (size_t i = 0; i < params.keys_size; ++i)
@ -2068,11 +2096,8 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
AggregateColumnsData aggregate_columns(params.aggregates_size);
Block empty_block;
initialize(empty_block);
if (!sample)
sample = blocks.front().cloneEmpty();
initialize({});
setSampleBlock(blocks.front());
/// Каким способом выполнять агрегацию?
for (size_t i = 0; i < params.keys_size; ++i)
@ -2206,6 +2231,9 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
filter[i] = 1;
}
ssize_t size_hint = ((source.rowsInFirstColumn() + method.data.NUM_BUCKETS - 1)
/ method.data.NUM_BUCKETS) * 1.1; /// Число 1.1 выбрано наугад.
for (size_t bucket = 0, size = destinations.size(); bucket < size; ++bucket)
{
const auto & filter = filters[bucket];
@ -2219,7 +2247,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
for (size_t j = 0; j < columns; ++j)
{
const ColumnWithTypeAndName & src_col = source.unsafeGetByPosition(j);
dst.insert({src_col.column->filter(filter), src_col.type, src_col.name});
dst.insert({src_col.column->filter(filter, size_hint), src_col.type, src_col.name});
/** Вставленные в блок столбцы типа ColumnAggregateFunction будут владеть состояниями агрегатных функций
* путём удержания SharedPtr-а на исходный столбец. См. ColumnAggregateFunction.h
@ -2234,11 +2262,8 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
if (!block)
return {};
Block empty_block;
initialize(empty_block);
if (!sample)
sample = block.cloneEmpty();
initialize({});
setSampleBlock(block);
AggregatedDataVariants data;
@ -2295,13 +2320,14 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
}
template <typename Method>
template <typename Method, typename Table>
void NO_INLINE Aggregator::destroyImpl(
Method & method) const
Method & method,
Table & data) const
{
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
for (auto elem : data)
{
char * data = Method::getAggregateData(it->second);
char * data = Method::getAggregateData(elem.second);
/** Если исключение (обычно нехватка памяти, кидается MemoryTracker-ом) возникло
* после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций,
@ -2313,6 +2339,23 @@ void NO_INLINE Aggregator::destroyImpl(
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
data = nullptr;
}
}
void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
{
AggregatedDataWithoutKey & res_data = result.without_key;
if (nullptr != res_data)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
res_data = nullptr;
}
}
@ -2326,18 +2369,11 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
/// В какой структуре данных агрегированы данные?
if (result.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
{
AggregatedDataWithoutKey & res_data = result.without_key;
if (nullptr != res_data)
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}
destroyWithoutKey(result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
destroyImpl(*result.NAME);
destroyImpl(*result.NAME, result.NAME->data);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)

View File

@ -930,6 +930,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
else
{
streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, params, final,
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads
? size_t(settings.aggregation_memory_efficient_merge_threads)
: original_max_threads);
@ -1142,8 +1143,18 @@ void InterpreterSelectQuery::executeLimit()
* если нет WITH TOTALS и есть подзапрос в FROM, и там на одном из уровней есть WITH TOTALS,
* то при использовании LIMIT-а следует читать данные до конца, а не отменять выполнение запроса раньше,
* потому что при отмене выполнения запроса, мы не получим данные для totals с удалённого сервера.
*
* Ещё случай:
* если есть WITH TOTALS и нет ORDER BY, то читать данные до конца,
* иначе TOTALS посчитается по неполным данным.
*/
bool always_read_till_end = false;
if (query.group_by_with_totals && !query.order_expression_list)
{
always_read_till_end = true;
}
if (!query.group_by_with_totals && query.table && typeid_cast<const ASTSelectQuery *>(query.table.get()))
{
const ASTSelectQuery * subquery = static_cast<const ASTSelectQuery *>(query.table.get());

View File

@ -740,7 +740,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
/// Если ANY INNER|RIGHT JOIN - фильтруем все столбцы кроме новых.
if (filter)
for (size_t i = 0; i < existing_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(*filter);
block.getByPosition(i).column = block.getByPosition(i).column->filter(*filter, -1);
/// Если ALL ... JOIN - размножаем все столбцы кроме новых.
if (offsets_to_replicate)

View File

@ -16,6 +16,7 @@
#include <DB/Interpreters/InterpreterRenameQuery.h>
#include <DB/Interpreters/QueryLog.h>
#include <DB/Common/setThreadName.h>
#include <common/Revision.h>
namespace DB
@ -203,6 +204,7 @@ Block QueryLog::createBlock()
{new ColumnFixedString(16), new DataTypeFixedString(16), "ip_address"},
{new ColumnString, new DataTypeString, "user"},
{new ColumnString, new DataTypeString, "query_id"},
{new ColumnUInt32, new DataTypeUInt32, "revision"},
};
}
@ -262,6 +264,8 @@ void QueryLog::flush()
block.unsafeGetByPosition(i++).column.get()->insertData(elem.user.data(), elem.user.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(Revision::get()));
}
BlockOutputStreamPtr stream = table->write({}, {});

View File

@ -90,7 +90,8 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
return false;
ws.ignore(pos, end);
parser_col_decl.parse(pos, end, params.col_decl, max_parsed_pos, expected);
if (!parser_col_decl.parse(pos, end, params.col_decl, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (s_after.ignore(pos, end, max_parsed_pos, expected))

View File

@ -584,10 +584,22 @@ int Server::main(const std::vector<std::string> & args)
global_context->setPath(path);
/// Директория для временных файлов при обработке тяжёлых запросов.
std::string tmp_path = config().getString("tmp_path", path + "tmp/");
global_context->setTemporaryPath(tmp_path);
Poco::File(tmp_path).createDirectories();
/// TODO Очистка временных файлов. Проверка, что директория с временными файлами не совпадает и не содержит в себе основной path.
{
std::string tmp_path = config().getString("tmp_path", path + "tmp/");
global_context->setTemporaryPath(tmp_path);
Poco::File(tmp_path).createDirectories();
/// Очистка временных файлов.
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(tmp_path); it != dir_end; ++it)
{
if (it->isFile() && 0 == it.name().compare(0, 3, "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file " << it->path());
it->remove();
}
}
}
bool has_zookeeper = false;
if (config().has("zookeeper"))

View File

@ -118,6 +118,9 @@ void BackgroundProcessingPool::threadFunction()
/// O(n), n - число задач. По сути, количество таблиц. Обычно их мало.
for (const auto & handle : tasks)
{
if (handle->removed)
continue;
time_t next_time_to_execute = handle->next_time_to_execute;
if (next_time_to_execute < min_time)
@ -144,9 +147,6 @@ void BackgroundProcessingPool::threadFunction()
continue;
}
if (task->removed)
continue;
/// Лучшей задачи не нашлось, а эта задача в прошлый раз ничего не сделала, и поэтому ей назначено некоторое время спать.
time_t current_time = time(0);
if (min_time > current_time)

View File

@ -457,6 +457,13 @@ bool PKCondition::mayBeTrueInRange(const Field * left_pk, const Field * right_pk
applyFunction(func, current_type, key_range_transformed.left, new_type, key_range_transformed.left);
if (!key_range_transformed.right.isNull())
applyFunction(func, current_type, key_range_transformed.right, new_type, key_range_transformed.right);
if (!new_type)
{
evaluation_is_not_possible = true;
break;
}
current_type.swap(new_type);
if (!monotonicity.is_positive)

View File

@ -141,6 +141,9 @@ BlockInputStreams StorageBuffer::read(
static void appendBlock(const Block & from, Block & to)
{
if (!to)
throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);
size_t rows = from.rows();
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
@ -235,26 +238,30 @@ private:
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock)
{
time_t current_time = time(0);
/// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки.
Block sorted_block = block.sortColumns();
if (!buffer.data)
{
buffer.first_write_time = time(0);
buffer.data = sorted_block.cloneEmpty();
}
/** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу,
* будет выкинуто исключение, а новые данные не будут добавлены в буфер.
*/
if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
else if (storage.checkThresholds(buffer, current_time, sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
{
/** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу,
* будет выкинуто исключение, а новые данные не будут добавлены в буфер.
*/
lock.unlock();
storage.flushBuffer(buffer, false);
lock.lock();
}
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
appendBlock(sorted_block, buffer.data);
}
};
@ -292,7 +299,7 @@ bool StorageBuffer::optimize(const Settings & settings)
}
bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes)
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
time_t time_passed = 0;
if (buffer.first_write_time)
@ -301,14 +308,15 @@ bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t
size_t rows = buffer.data.rowsInFirstColumn() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes;
bool res =
return checkThresholdsImpl(rows, bytes, time_passed);
}
bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
{
return
(time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
|| (time_passed > max_thresholds.time || rows > max_thresholds.rows || bytes > max_thresholds.bytes);
if (res)
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
return res;
}
@ -321,8 +329,12 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds)
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{
Block block_to_write;
time_t current_time = check_thresholds ? time(0) : 0;
Block block_to_write = buffer.data.cloneEmpty();
time_t current_time = time(0);
size_t rows = 0;
size_t bytes = 0;
time_t time_passed = 0;
/** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время.
* Под блокировкой, получаем из буфера блок, и заменяем в нём блок на новый пустой.
@ -333,14 +345,19 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{
std::lock_guard<std::mutex> lock(buffer.mutex);
rows = buffer.data.rowsInFirstColumn();
bytes = buffer.data.bytes();
if (buffer.first_write_time)
time_passed = current_time - buffer.first_write_time;
if (check_thresholds)
{
if (!checkThresholds(buffer, current_time))
if (!checkThresholdsImpl(rows, bytes, time_passed))
return;
}
else
{
if (buffer.data.rowsInFirstColumn() == 0)
if (rows == 0)
return;
}
@ -348,6 +365,8 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
buffer.first_write_time = 0;
}
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
if (no_destination)
return;

View File

@ -770,12 +770,19 @@ void StorageReplicatedMergeTree::loadQueue()
Strings children = zookeeper->getChildren(replica_path + "/queue");
std::sort(children.begin(), children.end());
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
futures.reserve(children.size());
for (const String & child : children)
futures.emplace_back(child, zookeeper->asyncGet(replica_path + "/queue/" + child));
for (auto & future : futures)
{
zkutil::Stat stat;
String s = zookeeper->get(replica_path + "/queue/" + child, &stat);
LogEntryPtr entry = LogEntry::parse(s, stat);
entry->znode_name = child;
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
entry->znode_name = future.first;
entry->addResultToVirtualParts(*this);
queue.push_back(entry);
}
@ -3047,10 +3054,10 @@ void StorageReplicatedMergeTree::drop()
if (is_readonly)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
auto zookeeper = getZooKeeper();
shutdown();
auto zookeeper = getZooKeeper();
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path);

View File

@ -22,6 +22,7 @@
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/Columns/ColumnArray.h>
@ -232,6 +233,9 @@ BlockInputStreams StorageStripeLog::read(
NameSet column_names_set(column_names.begin(), column_names.end());
if (!Poco::File(full_path() + "index.mrk").exists())
return { new NullBlockInputStream };
CompressedReadBufferFromFile index_in(full_path() + "index.mrk", 0, 0, INDEX_BUFFER_SIZE);
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};

View File

@ -0,0 +1,66 @@
{
"meta":
[
{
"name": "ignore(x)",
"type": "UInt8"
},
{
"name": "count()",
"type": "UInt64"
}
],
"data":
[
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"]
],
"totals": [0,"2000"],
"rows": 10,
"rows_before_limit_at_least": 1000
}
{
"meta":
[
{
"name": "ignore(x)",
"type": "UInt8"
},
{
"name": "count()",
"type": "UInt64"
}
],
"data":
[
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"],
[0, "2"]
],
"totals": [0,"2000"],
"rows": 10,
"rows_before_limit_at_least": 1000
}

View File

@ -0,0 +1,3 @@
SET group_by_two_level_threshold = 1;
SELECT ignore(x), count() FROM (SELECT number AS x FROM system.numbers LIMIT 1000 UNION ALL SELECT number AS x FROM system.numbers LIMIT 1000) GROUP BY x WITH TOTALS LIMIT 10 FORMAT JSONCompact;
SELECT ignore(x), count() FROM (SELECT number AS x FROM system.numbers LIMIT 1000 UNION ALL SELECT number AS x FROM system.numbers LIMIT 1000) GROUP BY x WITH TOTALS ORDER BY x LIMIT 10 FORMAT JSONCompact;

View File

@ -0,0 +1 @@
-0.0000019073486299999997

View File

@ -0,0 +1 @@
select reinterpretAsFloat64(unhex('875635ffffffbfbe'))

View File

@ -0,0 +1 @@
nan 1

View File

@ -0,0 +1 @@
SELECT * FROM (SELECT nan, number FROM system.numbers) WHERE number % 100 = 1 LIMIT 1;

View File

@ -0,0 +1,2 @@
1
2

View File

@ -0,0 +1,8 @@
DROP TABLE IF EXISTS test.stripelog;
CREATE TABLE test.stripelog (x UInt8) ENGINE = StripeLog;
SELECT * FROM test.stripelog ORDER BY x;
INSERT INTO test.stripelog VALUES (1), (2);
SELECT * FROM test.stripelog ORDER BY x;
DROP TABLE test.stripelog;

View File

@ -0,0 +1,2 @@
20000 1 20000 200010000 20000
20000 1 20000 200010000 20000

View File

@ -0,0 +1,41 @@
#!/bin/bash
clickhouse-client -n --query="
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.buffer;
CREATE TABLE test.dst (x UInt64, d Date DEFAULT today()) ENGINE = MergeTree(d, x, 8192);
CREATE TABLE test.buffer (x UInt64, d Date DEFAULT today()) ENGINE = Buffer(test, dst, 16, 1, 10, 100, 1000, 10000, 100000);
";
seq 1 1000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 1001 2000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 2001 3000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 3001 4000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 4001 5000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 5001 6000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 6001 7000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 7001 8000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 8001 9000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 9001 10000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 10001 11000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 11001 12000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 12001 13000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 13001 14000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 14001 15000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 15001 16000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 16001 17000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 17001 18000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 18001 19000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 19001 20000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
wait
clickhouse-client --query="SELECT count(), min(x), max(x), sum(x), uniqExact(x) FROM test.buffer;";
clickhouse-client --query="OPTIMIZE TABLE test.buffer;";
clickhouse-client --query="SELECT count(), min(x), max(x), sum(x), uniqExact(x) FROM test.dst;";
clickhouse-client -n --query="
DROP TABLE test.dst;
DROP TABLE test.buffer;
";