This commit is contained in:
Evgeniy Gatov 2015-12-09 18:53:40 +03:00
commit f8862af2d2
26 changed files with 271 additions and 334 deletions

View File

@ -26,6 +26,29 @@ public:
}; };
namespace ColumnConstDetails
{
template <typename T>
inline bool equals(const T & x, const T & y)
{
return x == y;
}
/// Проверяет побитовую идентичность элементов, даже если они являются NaN-ами.
template <>
inline bool equals(const Float32 & x, const Float32 & y)
{
return 0 == memcmp(&x, &y, sizeof(x));
}
template <>
inline bool equals(const Float64 & x, const Float64 & y)
{
return 0 == memcmp(&x, &y, sizeof(x));
}
}
/** Столбец-константа может содержать внутри себя само значение, /** Столбец-константа может содержать внутри себя само значение,
* или, в случае массивов, SharedPtr от значения-массива, * или, в случае массивов, SharedPtr от значения-массива,
* чтобы избежать проблем производительности при копировании очень больших массивов. * чтобы избежать проблем производительности при копировании очень больших массивов.
@ -65,7 +88,7 @@ public:
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override void insertRangeFrom(const IColumn & src, size_t start, size_t length) override
{ {
if (getDataFromHolder() != static_cast<const Derived &>(src).getDataFromHolder()) if (!ColumnConstDetails::equals(getDataFromHolder(), static_cast<const Derived &>(src).getDataFromHolder()))
throw Exception("Cannot insert different element into constant column " + getName(), throw Exception("Cannot insert different element into constant column " + getName(),
ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN);
@ -74,7 +97,7 @@ public:
void insert(const Field & x) override void insert(const Field & x) override
{ {
if (x.get<FieldType>() != FieldType(getDataFromHolder())) if (!ColumnConstDetails::equals(x.get<FieldType>(), FieldType(getDataFromHolder())))
throw Exception("Cannot insert different element into constant column " + getName(), throw Exception("Cannot insert different element into constant column " + getName(),
ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN);
++s; ++s;
@ -87,7 +110,7 @@ public:
void insertFrom(const IColumn & src, size_t n) override void insertFrom(const IColumn & src, size_t n) override
{ {
if (getDataFromHolder() != static_cast<const Derived &>(src).getDataFromHolder()) if (!ColumnConstDetails::equals(getDataFromHolder(), static_cast<const Derived &>(src).getDataFromHolder()))
throw Exception("Cannot insert different element into constant column " + getName(), throw Exception("Cannot insert different element into constant column " + getName(),
ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN);
++s; ++s;

View File

@ -14,10 +14,13 @@ namespace DB
{ {
/** Can allocate memory objects of fixed size with deletion support.
* For small `object_size`s allocated no less than getMinAllocationSize() bytes. */
class SmallObjectPool class SmallObjectPool
{ {
private: private:
struct Block { Block * next; }; struct Block { Block * next; };
static constexpr auto getMinAllocationSize() { return sizeof(Block); }
const std::size_t object_size; const std::size_t object_size;
Arena pool; Arena pool;
@ -25,16 +28,11 @@ private:
public: public:
SmallObjectPool( SmallObjectPool(
const std::size_t object_size, const std::size_t initial_size = 4096, const std::size_t growth_factor = 2, const std::size_t object_size_, const std::size_t initial_size = 4096, const std::size_t growth_factor = 2,
const std::size_t linear_growth_threshold = 128 * 1024 * 1024) const std::size_t linear_growth_threshold = 128 * 1024 * 1024)
: object_size{object_size}, pool{initial_size, growth_factor, linear_growth_threshold} : object_size{std::max(object_size_, getMinAllocationSize())},
pool{initial_size, growth_factor, linear_growth_threshold}
{ {
if (object_size < sizeof(Block))
throw Exception{
"Can't make allocations smaller than sizeof(Block) = " + std::to_string(sizeof(Block)),
ErrorCodes::LOGICAL_ERROR
};
if (pool.size() < object_size) if (pool.size() < object_size)
return; return;

View File

@ -83,7 +83,8 @@ private:
boost::threadpool::pool pool; boost::threadpool::pool pool;
std::mutex get_next_blocks_mutex; std::mutex get_next_blocks_mutex;
ConcurrentBoundedQueue<OutputData> result_queue; ConcurrentBoundedQueue<OutputData> result_queue;
bool exhausted = false; bool exhausted = false; /// Данных больше нет.
bool finish = false; /// Нужно завершить работу раньше, чем данные закончились.
std::atomic<size_t> active_threads; std::atomic<size_t> active_threads;
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {} ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {}

View File

@ -76,7 +76,7 @@ public:
types.push_back(value_type_t::Int64); types.push_back(value_type_t::Int64);
else if (typeid_cast<const DataTypeFloat32 *>(type)) else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(value_type_t::Float32); types.push_back(value_type_t::Float32);
else if (typeid_cast<const DataTypeInt64 *>(type)) else if (typeid_cast<const DataTypeFloat64 *>(type))
types.push_back(value_type_t::Float64); types.push_back(value_type_t::Float64);
else if (typeid_cast<const DataTypeString *>(type)) else if (typeid_cast<const DataTypeString *>(type))
types.push_back(value_type_t::String); types.push_back(value_type_t::String);
@ -127,9 +127,10 @@ private:
for (const auto idx : ext::range(0, size)) for (const auto idx : ext::range(0, size))
{ {
const auto value = row[names[idx]]; const auto & name = names[idx];
const auto value = row[name];
if (value.ok()) if (value.ok())
insertValue(columns[idx], types[idx], value); insertValue(columns[idx], types[idx], value, name);
else else
insertDefaultValue(columns[idx], *sample_columns[idx]); insertDefaultValue(columns[idx], *sample_columns[idx]);
} }
@ -142,26 +143,29 @@ private:
return block; return block;
} }
static void insertValue(IColumn * const column, const value_type_t type, const mongo::BSONElement & value) static void insertValue(
IColumn * const column, const value_type_t type, const mongo::BSONElement & value, const mongo::StringData & name)
{ {
switch (type) switch (type)
{ {
case value_type_t::UInt8: case value_type_t::UInt8:
{ {
if (value.type() != mongo::Bool) if (!value.isNumber() && value.type() != mongo::Bool)
throw Exception{ throw Exception{
"Type mismatch, expected Bool, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number or Bool, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
static_cast<ColumnFloat64 *>(column)->insert(value.boolean()); static_cast<ColumnUInt8 *>(column)->insert(value.isNumber() ? value.numberInt() : value.boolean());
break; break;
} }
case value_type_t::UInt16: case value_type_t::UInt16:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -172,7 +176,8 @@ private:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -183,7 +188,8 @@ private:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -192,20 +198,22 @@ private:
} }
case value_type_t::Int8: case value_type_t::Int8:
{ {
if (!value.isNumber()) if (!value.isNumber() && value.type() != mongo::Bool)
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number or Bool, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
static_cast<ColumnInt8 *>(column)->insert(value.numberInt()); static_cast<ColumnInt8 *>(column)->insert(value.isNumber() ? value.numberInt() : value.numberInt());
break; break;
} }
case value_type_t::Int16: case value_type_t::Int16:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -216,7 +224,8 @@ private:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -227,7 +236,8 @@ private:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -238,7 +248,8 @@ private:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -249,7 +260,8 @@ private:
{ {
if (!value.isNumber()) if (!value.isNumber())
throw Exception{ throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -260,7 +272,8 @@ private:
{ {
if (value.type() != mongo::String) if (value.type() != mongo::String)
throw Exception{ throw Exception{
"Type mismatch, expected String, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected String, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -272,7 +285,8 @@ private:
{ {
if (value.type() != mongo::Date) if (value.type() != mongo::Date)
throw Exception{ throw Exception{
"Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };
@ -284,7 +298,8 @@ private:
{ {
if (value.type() != mongo::Date) if (value.type() != mongo::Date)
throw Exception{ throw Exception{
"Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())}, "Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())} +
" for column " + name.toString(),
ErrorCodes::TYPE_MISMATCH ErrorCodes::TYPE_MISMATCH
}; };

View File

@ -77,7 +77,7 @@ public:
types.push_back(value_type_t::Int64); types.push_back(value_type_t::Int64);
else if (typeid_cast<const DataTypeFloat32 *>(type)) else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(value_type_t::Float32); types.push_back(value_type_t::Float32);
else if (typeid_cast<const DataTypeInt64 *>(type)) else if (typeid_cast<const DataTypeFloat64 *>(type))
types.push_back(value_type_t::Float64); types.push_back(value_type_t::Float64);
else if (typeid_cast<const DataTypeString *>(type)) else if (typeid_cast<const DataTypeString *>(type))
types.push_back(value_type_t::String); types.push_back(value_type_t::String);

View File

@ -129,7 +129,7 @@ public:
const ColumnConstArray * array_from = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[1]).column); const ColumnConstArray * array_from = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[1]).column);
const ColumnConstArray * array_to = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[2]).column); const ColumnConstArray * array_to = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[2]).column);
if (!array_from && !array_to) if (!array_from || !array_to)
throw Exception("Second and third arguments of function " + getName() + " must be constant arrays.", ErrorCodes::ILLEGAL_COLUMN); throw Exception("Second and third arguments of function " + getName() + " must be constant arrays.", ErrorCodes::ILLEGAL_COLUMN);
prepare(array_from->getData(), array_to->getData(), block, arguments); prepare(array_from->getData(), array_to->getData(), block, arguments);

View File

@ -849,18 +849,11 @@ public:
* которые могут быть затем объединены с другими состояниями (для распределённой обработки запроса). * которые могут быть затем объединены с другими состояниями (для распределённой обработки запроса).
* Если final = true, то в качестве столбцов-агрегатов создаются столбцы с готовыми значениями. * Если final = true, то в качестве столбцов-агрегатов создаются столбцы с готовыми значениями.
*/ */
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads); BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
/** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.)
* После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить,
* пока не будет вызвана функция convertToBlocks.
* Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации.
*/
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads);
/** Объединить несколько структур данных агрегации и выдать результат в виде потока блоков. /** Объединить несколько структур данных агрегации и выдать результат в виде потока блоков.
*/ */
std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads); std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
/** Объединить поток частично агрегированных блоков в одну структуру данных. /** Объединить поток частично агрегированных блоков в одну структуру данных.
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.) * (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.)
@ -975,10 +968,15 @@ protected:
TemporaryFiles temporary_files; TemporaryFiles temporary_files;
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов. /** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата. * Сформировать блок - пример результата. Он используется в методах convertToBlocks, mergeAndConvertToBlocks.
*/ */
void initialize(const Block & block); void initialize(const Block & block);
/** Установить блок - пример результата,
* только если он ещё не был установлен.
*/
void setSampleBlock(const Block & block);
/** Выбрать способ агрегации на основе количества и типов ключей. */ /** Выбрать способ агрегации на основе количества и типов ключей. */
AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes); AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes);
@ -1084,12 +1082,6 @@ protected:
Table & table_dst, Table & table_dst,
Table & table_src) const; Table & table_src) const;
/// Слить все ключи, оставшиеся после предыдущего метода, в overflows.
template <typename Method, typename Table>
void mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const;
void mergeWithoutKeyDataImpl( void mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const; ManyAggregatedDataVariants & non_empty_data) const;
@ -1097,11 +1089,6 @@ protected:
void mergeSingleLevelDataImpl( void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const; ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method>
void mergeTwoLevelDataImpl(
ManyAggregatedDataVariants & many_data,
boost::threadpool::pool * thread_pool) const;
template <typename Method, typename Table> template <typename Method, typename Table>
void convertToBlockImpl( void convertToBlockImpl(
Method & method, Method & method,

View File

@ -53,11 +53,10 @@ public:
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение. /** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancel(). * Все новые вызовы будут бросать исключения, пока не будет вызван uncancel().
* Считает количество таких вызовов для поддержки нескольких наложенных друг на друга отмен.
*/ */
void cancel() { ++cancelled; } void cancel() { cancelled = true; }
void uncancel() { --cancelled; } void uncancel() { cancelled = false; }
bool isCancelled() const { return cancelled > 0; } bool isCancelled() const { return cancelled; }
private: private:
MergeTreeData & data; MergeTreeData & data;
@ -67,7 +66,7 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто). /// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0; time_t disk_space_warning_time = 0;
std::atomic<int> cancelled {0}; std::atomic<bool> cancelled {false};
}; };

View File

@ -128,7 +128,8 @@ private:
void flushAllBuffers(bool check_thresholds = true); void flushAllBuffers(bool check_thresholds = true);
/// Сбросить буфер. Если выставлено check_thresholds - сбрасывает только если превышены пороги. /// Сбросить буфер. Если выставлено check_thresholds - сбрасывает только если превышены пороги.
void flushBuffer(Buffer & buffer, bool check_thresholds); void flushBuffer(Buffer & buffer, bool check_thresholds);
bool checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0); bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
/// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у. /// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у.
void writeBlockToDestination(const Block & block, StoragePtr table); void writeBlockToDestination(const Block & block, StoragePtr table);

View File

@ -12,17 +12,17 @@ Block AggregatingBlockInputStream::readImpl()
if (!executed) if (!executed)
{ {
executed = true; executed = true;
AggregatedDataVariants data_variants; AggregatedDataVariantsPtr data_variants = new AggregatedDataVariants;
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); }; Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook); aggregator.setCancellationHook(hook);
aggregator.execute(children.back(), data_variants); aggregator.execute(children.back(), *data_variants);
if (!aggregator.hasTemporaryFiles()) if (!aggregator.hasTemporaryFiles())
{ {
impl.reset(new BlocksListBlockInputStream( ManyAggregatedDataVariants many_data { data_variants };
aggregator.convertToBlocks(data_variants, final, 1))); impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
} }
else else
{ {
@ -35,9 +35,9 @@ Block AggregatingBlockInputStream::readImpl()
if (!isCancelled()) if (!isCancelled())
{ {
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще. /// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще.
size_t rows = data_variants.sizeWithoutOverflowRow(); size_t rows = data_variants->sizeWithoutOverflowRow();
if (rows) if (rows)
aggregator.writeToTemporaryFile(data_variants, rows); aggregator.writeToTemporaryFile(*data_variants, rows);
} }
const auto & files = aggregator.getTemporaryFiles(); const auto & files = aggregator.getTemporaryFiles();

View File

@ -62,7 +62,6 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
auto memory_tracker = current_memory_tracker; auto memory_tracker = current_memory_tracker;
task = std::packaged_task<void()>([&child, memory_tracker] task = std::packaged_task<void()>([&child, memory_tracker]
{ {
/// memory_tracker и имя потока устанавливается здесь. Далее для всех задач в reading_pool это уже не требуется.
current_memory_tracker = memory_tracker; current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr"); setThreadName("MergeAggReadThr");
child->readPrefix(); child->readPrefix();
@ -129,6 +128,12 @@ MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEffici
if (parallel_merge_data) if (parallel_merge_data)
{ {
LOG_TRACE((&Logger::get("MergingAggregatedMemoryEfficientBlockInputStream")), "Waiting for threads to finish"); LOG_TRACE((&Logger::get("MergingAggregatedMemoryEfficientBlockInputStream")), "Waiting for threads to finish");
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
parallel_merge_data->finish = true;
}
parallel_merge_data->result_queue.clear(); parallel_merge_data->result_queue.clear();
parallel_merge_data->pool.wait(); parallel_merge_data->pool.wait();
} }
@ -156,7 +161,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
{ {
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex); std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
if (parallel_merge_data->exhausted) if (parallel_merge_data->exhausted || parallel_merge_data->finish)
break; break;
blocks_to_merge = getNextBlocksToMerge(); blocks_to_merge = getNextBlocksToMerge();
@ -168,7 +173,16 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
} }
} }
parallel_merge_data->result_queue.push(aggregator.mergeBlocks(*blocks_to_merge, final)); Block res = aggregator.mergeBlocks(*blocks_to_merge, final);
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
if (parallel_merge_data->finish)
break;
parallel_merge_data->result_queue.push(OutputData(std::move(res)));
}
} }
} }
catch (...) catch (...)
@ -276,7 +290,13 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
{ {
if (need_that_input(input)) if (need_that_input(input))
{ {
tasks.emplace_back([&input, &read_from_input] { read_from_input(input); }); auto memory_tracker = current_memory_tracker;
tasks.emplace_back([&input, &read_from_input, memory_tracker]
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
read_from_input(input);
});
auto & task = tasks.back(); auto & task = tasks.back();
reading_pool->schedule([&task] { task(); }); reading_pool->schedule([&task] { task(); });
} }

View File

@ -90,7 +90,8 @@ void Aggregator::initialize(const Block & block)
initialized = true; initialized = true;
memory_usage_before_aggregation = current_memory_tracker->get(); if (current_memory_tracker)
memory_usage_before_aggregation = current_memory_tracker->get();
aggregate_functions.resize(params.aggregates_size); aggregate_functions.resize(params.aggregates_size);
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
@ -154,6 +155,15 @@ void Aggregator::initialize(const Block & block)
} }
void Aggregator::setSampleBlock(const Block & block)
{
std::lock_guard<std::mutex> lock(mutex);
if (!sample)
sample = block.cloneEmpty();
}
void Aggregator::compileIfPossible(AggregatedDataVariants::Type type) void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
@ -732,7 +742,10 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
} }
size_t result_size = result.sizeWithoutOverflowRow(); size_t result_size = result.sizeWithoutOverflowRow();
auto current_memory_usage = current_memory_tracker->get(); Int64 current_memory_usage = 0;
if (current_memory_tracker)
current_memory_usage = current_memory_tracker->get();
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Здесь учитываются все результаты в сумме, из разных потоков. auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Здесь учитываются все результаты в сумме, из разных потоков.
bool worth_convert_to_two_level bool worth_convert_to_two_level
@ -1265,7 +1278,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
} }
BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{ {
if (isCancelled()) if (isCancelled())
return BlocksList(); return BlocksList();
@ -1438,31 +1451,6 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
} }
} }
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
if (Method::getAggregateData(it->second) == nullptr)
continue;
AggregateDataPtr res_data = overflows;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr;
}
}
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const ManyAggregatedDataVariants & non_empty_data) const
@ -1522,193 +1510,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
} }
template <typename Method>
void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data,
boost::threadpool::pool * thread_pool) const
{
AggregatedDataVariantsPtr & res = non_empty_data[0];
/// В данном случае, no_more_keys будет выставлено, только если в первом (самом большом) состоянии достаточно много строк.
bool no_more_keys = false;
if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys))
return;
/// Слияние распараллеливается по корзинам - первому уровню TwoLevelHashMap.
auto merge_bucket = [&non_empty_data, &res, no_more_keys, this](size_t bucket, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
/// Все результаты агрегации соединяем с первым.
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
AggregatedDataVariants & current = *non_empty_data[i];
if (!no_more_keys)
{
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
getDataVariant<Method>(current).data.impls[bucket].clearAndShrink();
}
else
{
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
}
}
};
/// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
std::vector<std::packaged_task<void()>> tasks(Method::Data::NUM_BUCKETS);
try
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
tasks[bucket] = std::packaged_task<void()>(std::bind(merge_bucket, bucket, current_memory_tracker));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
else
tasks[bucket]();
}
}
catch (...)
{
/// Если этого не делать, то в случае исключения, tasks уничтожится раньше завершения потоков, и будет плохо.
if (thread_pool)
thread_pool->wait();
throw;
}
if (thread_pool)
thread_pool->wait();
for (auto & task : tasks)
if (task.valid())
task.get_future().get();
if (no_more_keys && params.overflow_row)
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
AggregatedDataVariants & current = *non_empty_data[i];
mergeDataRemainingKeysToOverflowsImpl<Method>(
res->without_key,
getDataVariant<Method>(current).data.impls[bucket]);
}
}
}
/// aggregator не будет уничтожать состояния агрегатных функций в деструкторе
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
non_empty_data[i]->aggregator = nullptr;
}
AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads)
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::merge.", ErrorCodes::EMPTY_DATA_PASSED);
LOG_TRACE(log, "Merging aggregated data");
Stopwatch watch;
ManyAggregatedDataVariants non_empty_data;
non_empty_data.reserve(data_variants.size());
for (auto & data : data_variants)
if (!data->empty())
non_empty_data.push_back(data);
if (non_empty_data.empty())
return data_variants[0];
if (non_empty_data.size() == 1)
return non_empty_data[0];
/// Отсортируем состояния по убыванию размера, чтобы мердж был более эффективным (так как все состояния мерджатся в первое).
std::sort(non_empty_data.begin(), non_empty_data.end(),
[](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
{
return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();
});
/// Если хотя бы один из вариантов двухуровневый, то переконвертируем все варианты в двухуровневые, если есть не такие.
/// Замечание - возможно, было бы более оптимально не конвертировать одноуровневые варианты перед мерджем, а мерджить их отдельно, в конце.
bool has_at_least_one_two_level = false;
for (const auto & variant : non_empty_data)
{
if (variant->isTwoLevel())
{
has_at_least_one_two_level = true;
break;
}
}
if (has_at_least_one_two_level)
for (auto & variant : non_empty_data)
if (!variant->isTwoLevel())
variant->convertToTwoLevel();
AggregatedDataVariantsPtr & res = non_empty_data[0];
size_t rows = res->size();
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
rows += non_empty_data[i]->size();
AggregatedDataVariants & current = *non_empty_data[i];
if (res->type != current.type)
throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
res->aggregates_pools.insert(res->aggregates_pools.end(), current.aggregates_pools.begin(), current.aggregates_pools.end());
}
/// В какой структуре данных агрегированы данные?
if (res->type == AggregatedDataVariants::Type::without_key || params.overflow_row)
mergeWithoutKeyDataImpl(non_empty_data);
std::unique_ptr<boost::threadpool::pool> thread_pool;
if (max_threads > 1 && res->isTwoLevel())
thread_pool.reset(new boost::threadpool::pool(max_threads));
if (false) {}
#define M(NAME) \
else if (res->type == AggregatedDataVariants::Type::NAME) \
mergeSingleLevelDataImpl<decltype(res->NAME)::element_type>(non_empty_data);
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
#define M(NAME) \
else if (res->type == AggregatedDataVariants::Type::NAME) \
mergeTwoLevelDataImpl<decltype(res->NAME)::element_type>(non_empty_data, thread_pool.get());
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else if (res->type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
double elapsed_seconds = watch.elapsedSeconds();
size_t res_rows = res->size();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Merged aggregated data. "
<< "From " << rows << " to " << res_rows << " rows (efficiency: " << static_cast<double>(rows) / res_rows << ")"
<< " in " << elapsed_seconds << " sec."
<< " (" << rows / elapsed_seconds << " rows/sec.)");
return res;
}
template <typename Method> template <typename Method>
void NO_INLINE Aggregator::mergeBucketImpl( void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket) const ManyAggregatedDataVariants & data, Int32 bucket) const
@ -1730,8 +1531,6 @@ void NO_INLINE Aggregator::mergeBucketImpl(
* Если состояния агрегации двухуровневые, то выдаёт блоки строго по порядку bucket_num. * Если состояния агрегации двухуровневые, то выдаёт блоки строго по порядку bucket_num.
* (Это важно при распределённой обработке.) * (Это важно при распределённой обработке.)
* При этом, может обрабатывать разные bucket-ы параллельно, используя до threads потоков. * При этом, может обрабатывать разные bucket-ы параллельно, используя до threads потоков.
*
* TODO Удалить обычную функцию Aggregator::merge и связанные с ней, в случае невостребованности.
*/ */
class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
{ {
@ -1739,7 +1538,7 @@ public:
/** На вход подаётся набор непустых множеств частично агрегированных данных, /** На вход подаётся набор непустых множеств частично агрегированных данных,
* которые все либо являются одноуровневыми, либо являются двухуровневыми. * которые все либо являются одноуровневыми, либо являются двухуровневыми.
*/ */
MergingAndConvertingBlockInputStream(Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_) MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_)
: aggregator(aggregator_), data(data_), final(final_), threads(threads_) {} : aggregator(aggregator_), data(data_), final(final_), threads(threads_) {}
String getName() const override { return "MergingAndConverting"; } String getName() const override { return "MergingAndConverting"; }
@ -1845,7 +1644,7 @@ protected:
} }
private: private:
Aggregator & aggregator; const Aggregator & aggregator;
ManyAggregatedDataVariants data; ManyAggregatedDataVariants data;
bool final; bool final;
size_t threads; size_t threads;
@ -1915,7 +1714,7 @@ private:
}; };
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{ {
if (data_variants.empty()) if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED); throw Exception("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED);
@ -2105,8 +1904,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
AggregateColumnsData aggregate_columns(params.aggregates_size); AggregateColumnsData aggregate_columns(params.aggregates_size);
Block empty_block; initialize({});
initialize(empty_block);
if (isCancelled()) if (isCancelled())
return; return;
@ -2139,8 +1937,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
if (bucket_to_blocks.empty()) if (bucket_to_blocks.empty())
return; return;
if (!sample) setSampleBlock(bucket_to_blocks.begin()->second.front());
sample = bucket_to_blocks.begin()->second.front().cloneEmpty();
/// Каким способом выполнять агрегацию? /// Каким способом выполнять агрегацию?
for (size_t i = 0; i < params.keys_size; ++i) for (size_t i = 0; i < params.keys_size; ++i)
@ -2299,11 +2096,8 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
AggregateColumnsData aggregate_columns(params.aggregates_size); AggregateColumnsData aggregate_columns(params.aggregates_size);
Block empty_block; initialize({});
initialize(empty_block); setSampleBlock(blocks.front());
if (!sample)
sample = blocks.front().cloneEmpty();
/// Каким способом выполнять агрегацию? /// Каким способом выполнять агрегацию?
for (size_t i = 0; i < params.keys_size; ++i) for (size_t i = 0; i < params.keys_size; ++i)
@ -2468,11 +2262,8 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
if (!block) if (!block)
return {}; return {};
Block empty_block; initialize({});
initialize(empty_block); setSampleBlock(block);
if (!sample)
sample = block.cloneEmpty();
AggregatedDataVariants data; AggregatedDataVariants data;

View File

@ -16,6 +16,7 @@
#include <DB/Interpreters/InterpreterRenameQuery.h> #include <DB/Interpreters/InterpreterRenameQuery.h>
#include <DB/Interpreters/QueryLog.h> #include <DB/Interpreters/QueryLog.h>
#include <DB/Common/setThreadName.h> #include <DB/Common/setThreadName.h>
#include <common/Revision.h>
namespace DB namespace DB
@ -203,6 +204,7 @@ Block QueryLog::createBlock()
{new ColumnFixedString(16), new DataTypeFixedString(16), "ip_address"}, {new ColumnFixedString(16), new DataTypeFixedString(16), "ip_address"},
{new ColumnString, new DataTypeString, "user"}, {new ColumnString, new DataTypeString, "user"},
{new ColumnString, new DataTypeString, "query_id"}, {new ColumnString, new DataTypeString, "query_id"},
{new ColumnUInt32, new DataTypeUInt32, "revision"},
}; };
} }
@ -262,6 +264,8 @@ void QueryLog::flush()
block.unsafeGetByPosition(i++).column.get()->insertData(elem.user.data(), elem.user.size()); block.unsafeGetByPosition(i++).column.get()->insertData(elem.user.data(), elem.user.size());
block.unsafeGetByPosition(i++).column.get()->insertData(elem.query_id.data(), elem.query_id.size()); block.unsafeGetByPosition(i++).column.get()->insertData(elem.query_id.data(), elem.query_id.size());
block.unsafeGetByPosition(i++).column.get()->insert(static_cast<UInt64>(Revision::get()));
} }
BlockOutputStreamPtr stream = table->write({}, {}); BlockOutputStreamPtr stream = table->write({}, {});

View File

@ -90,7 +90,8 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
return false; return false;
ws.ignore(pos, end); ws.ignore(pos, end);
parser_col_decl.parse(pos, end, params.col_decl, max_parsed_pos, expected); if (!parser_col_decl.parse(pos, end, params.col_decl, max_parsed_pos, expected))
return false;
ws.ignore(pos, end); ws.ignore(pos, end);
if (s_after.ignore(pos, end, max_parsed_pos, expected)) if (s_after.ignore(pos, end, max_parsed_pos, expected))

View File

@ -584,10 +584,22 @@ int Server::main(const std::vector<std::string> & args)
global_context->setPath(path); global_context->setPath(path);
/// Директория для временных файлов при обработке тяжёлых запросов. /// Директория для временных файлов при обработке тяжёлых запросов.
std::string tmp_path = config().getString("tmp_path", path + "tmp/"); {
global_context->setTemporaryPath(tmp_path); std::string tmp_path = config().getString("tmp_path", path + "tmp/");
Poco::File(tmp_path).createDirectories(); global_context->setTemporaryPath(tmp_path);
/// TODO Очистка временных файлов. Проверка, что директория с временными файлами не совпадает и не содержит в себе основной path. Poco::File(tmp_path).createDirectories();
/// Очистка временных файлов.
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(tmp_path); it != dir_end; ++it)
{
if (it->isFile() && 0 == it.name().compare(0, 3, "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file " << it->path());
it->remove();
}
}
}
bool has_zookeeper = false; bool has_zookeeper = false;
if (config().has("zookeeper")) if (config().has("zookeeper"))

View File

@ -118,6 +118,9 @@ void BackgroundProcessingPool::threadFunction()
/// O(n), n - число задач. По сути, количество таблиц. Обычно их мало. /// O(n), n - число задач. По сути, количество таблиц. Обычно их мало.
for (const auto & handle : tasks) for (const auto & handle : tasks)
{ {
if (handle->removed)
continue;
time_t next_time_to_execute = handle->next_time_to_execute; time_t next_time_to_execute = handle->next_time_to_execute;
if (next_time_to_execute < min_time) if (next_time_to_execute < min_time)
@ -144,9 +147,6 @@ void BackgroundProcessingPool::threadFunction()
continue; continue;
} }
if (task->removed)
continue;
/// Лучшей задачи не нашлось, а эта задача в прошлый раз ничего не сделала, и поэтому ей назначено некоторое время спать. /// Лучшей задачи не нашлось, а эта задача в прошлый раз ничего не сделала, и поэтому ей назначено некоторое время спать.
time_t current_time = time(0); time_t current_time = time(0);
if (min_time > current_time) if (min_time > current_time)

View File

@ -457,6 +457,13 @@ bool PKCondition::mayBeTrueInRange(const Field * left_pk, const Field * right_pk
applyFunction(func, current_type, key_range_transformed.left, new_type, key_range_transformed.left); applyFunction(func, current_type, key_range_transformed.left, new_type, key_range_transformed.left);
if (!key_range_transformed.right.isNull()) if (!key_range_transformed.right.isNull())
applyFunction(func, current_type, key_range_transformed.right, new_type, key_range_transformed.right); applyFunction(func, current_type, key_range_transformed.right, new_type, key_range_transformed.right);
if (!new_type)
{
evaluation_is_not_possible = true;
break;
}
current_type.swap(new_type); current_type.swap(new_type);
if (!monotonicity.is_positive) if (!monotonicity.is_positive)

View File

@ -141,6 +141,9 @@ BlockInputStreams StorageBuffer::read(
static void appendBlock(const Block & from, Block & to) static void appendBlock(const Block & from, Block & to)
{ {
if (!to)
throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);
size_t rows = from.rows(); size_t rows = from.rows();
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no) for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{ {
@ -235,26 +238,30 @@ private:
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock) void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock)
{ {
time_t current_time = time(0);
/// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки. /// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки.
Block sorted_block = block.sortColumns(); Block sorted_block = block.sortColumns();
if (!buffer.data) if (!buffer.data)
{ {
buffer.first_write_time = time(0);
buffer.data = sorted_block.cloneEmpty(); buffer.data = sorted_block.cloneEmpty();
} }
else if (storage.checkThresholds(buffer, current_time, sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
/** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу,
* будет выкинуто исключение, а новые данные не будут добавлены в буфер.
*/
if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
{ {
/** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу,
* будет выкинуто исключение, а новые данные не будут добавлены в буфер.
*/
lock.unlock(); lock.unlock();
storage.flushBuffer(buffer, false); storage.flushBuffer(buffer, false);
lock.lock(); lock.lock();
} }
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
appendBlock(sorted_block, buffer.data); appendBlock(sorted_block, buffer.data);
} }
}; };
@ -292,7 +299,7 @@ bool StorageBuffer::optimize(const Settings & settings)
} }
bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{ {
time_t time_passed = 0; time_t time_passed = 0;
if (buffer.first_write_time) if (buffer.first_write_time)
@ -301,14 +308,15 @@ bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t
size_t rows = buffer.data.rowsInFirstColumn() + additional_rows; size_t rows = buffer.data.rowsInFirstColumn() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes; size_t bytes = buffer.data.bytes() + additional_bytes;
bool res = return checkThresholdsImpl(rows, bytes, time_passed);
}
bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
{
return
(time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
|| (time_passed > max_thresholds.time || rows > max_thresholds.rows || bytes > max_thresholds.bytes); || (time_passed > max_thresholds.time || rows > max_thresholds.rows || bytes > max_thresholds.bytes);
if (res)
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
return res;
} }
@ -321,8 +329,12 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds)
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{ {
Block block_to_write; Block block_to_write = buffer.data.cloneEmpty();
time_t current_time = check_thresholds ? time(0) : 0; time_t current_time = time(0);
size_t rows = 0;
size_t bytes = 0;
time_t time_passed = 0;
/** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время. /** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время.
* Под блокировкой, получаем из буфера блок, и заменяем в нём блок на новый пустой. * Под блокировкой, получаем из буфера блок, и заменяем в нём блок на новый пустой.
@ -333,14 +345,19 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{ {
std::lock_guard<std::mutex> lock(buffer.mutex); std::lock_guard<std::mutex> lock(buffer.mutex);
rows = buffer.data.rowsInFirstColumn();
bytes = buffer.data.bytes();
if (buffer.first_write_time)
time_passed = current_time - buffer.first_write_time;
if (check_thresholds) if (check_thresholds)
{ {
if (!checkThresholds(buffer, current_time)) if (!checkThresholdsImpl(rows, bytes, time_passed))
return; return;
} }
else else
{ {
if (buffer.data.rowsInFirstColumn() == 0) if (rows == 0)
return; return;
} }
@ -348,6 +365,8 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
buffer.first_write_time = 0; buffer.first_write_time = 0;
} }
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
if (no_destination) if (no_destination)
return; return;

View File

@ -3054,10 +3054,10 @@ void StorageReplicatedMergeTree::drop()
if (is_readonly) if (is_readonly)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
auto zookeeper = getZooKeeper();
shutdown(); shutdown();
auto zookeeper = getZooKeeper();
LOG_INFO(log, "Removing replica " << replica_path); LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr; replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path); zookeeper->tryRemoveRecursive(replica_path);

View File

@ -22,6 +22,7 @@
#include <DB/DataStreams/IBlockOutputStream.h> #include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h> #include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h> #include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/Columns/ColumnArray.h> #include <DB/Columns/ColumnArray.h>
@ -232,6 +233,9 @@ BlockInputStreams StorageStripeLog::read(
NameSet column_names_set(column_names.begin(), column_names.end()); NameSet column_names_set(column_names.begin(), column_names.end());
if (!Poco::File(full_path() + "index.mrk").exists())
return { new NullBlockInputStream };
CompressedReadBufferFromFile index_in(full_path() + "index.mrk", 0, 0, INDEX_BUFFER_SIZE); CompressedReadBufferFromFile index_in(full_path() + "index.mrk", 0, 0, INDEX_BUFFER_SIZE);
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)}; std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};

View File

@ -0,0 +1 @@
nan 1

View File

@ -0,0 +1 @@
SELECT * FROM (SELECT nan, number FROM system.numbers) WHERE number % 100 = 1 LIMIT 1;

View File

@ -0,0 +1,2 @@
1
2

View File

@ -0,0 +1,8 @@
DROP TABLE IF EXISTS test.stripelog;
CREATE TABLE test.stripelog (x UInt8) ENGINE = StripeLog;
SELECT * FROM test.stripelog ORDER BY x;
INSERT INTO test.stripelog VALUES (1), (2);
SELECT * FROM test.stripelog ORDER BY x;
DROP TABLE test.stripelog;

View File

@ -0,0 +1,2 @@
20000 1 20000 200010000 20000
20000 1 20000 200010000 20000

View File

@ -0,0 +1,41 @@
#!/bin/bash
clickhouse-client -n --query="
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.buffer;
CREATE TABLE test.dst (x UInt64, d Date DEFAULT today()) ENGINE = MergeTree(d, x, 8192);
CREATE TABLE test.buffer (x UInt64, d Date DEFAULT today()) ENGINE = Buffer(test, dst, 16, 1, 10, 100, 1000, 10000, 100000);
";
seq 1 1000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 1001 2000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 2001 3000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 3001 4000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 4001 5000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 5001 6000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 6001 7000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 7001 8000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 8001 9000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 9001 10000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 10001 11000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 11001 12000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 12001 13000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 13001 14000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 14001 15000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 15001 16000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 16001 17000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 17001 18000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 18001 19000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
seq 19001 20000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
wait
clickhouse-client --query="SELECT count(), min(x), max(x), sum(x), uniqExact(x) FROM test.buffer;";
clickhouse-client --query="OPTIMIZE TABLE test.buffer;";
clickhouse-client --query="SELECT count(), min(x), max(x), sum(x), uniqExact(x) FROM test.dst;";
clickhouse-client -n --query="
DROP TABLE test.dst;
DROP TABLE test.buffer;
";