dbms: improved performance of INSERT SELECT [#METR-19025].

This commit is contained in:
Alexey Milovidov 2015-11-29 16:18:12 +03:00
parent 39004c76f5
commit cc75d4603b
3 changed files with 103 additions and 12 deletions

View File

@ -104,6 +104,7 @@ struct SortCursorImpl
rows = all_columns[0]->size(); rows = all_columns[0]->size();
} }
bool isFirst() const { return pos == 0; }
bool isLast() const { return pos + 1 >= rows; } bool isLast() const { return pos + 1 >= rows; }
void next() { ++pos; } void next() { ++pos; }
}; };
@ -118,13 +119,13 @@ struct SortCursor
SortCursorImpl * operator-> () { return impl; } SortCursorImpl * operator-> () { return impl; }
const SortCursorImpl * operator-> () const { return impl; } const SortCursorImpl * operator-> () const { return impl; }
/// Инвертировано, чтобы из priority queue элементы вынимались в нужном порядке. /// Указанная строка данного курсора больше указанной строки другого курсора.
bool operator< (const SortCursor & rhs) const bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
{ {
for (size_t i = 0; i < impl->sort_columns_size; ++i) for (size_t i = 0; i < impl->sort_columns_size; ++i)
{ {
int direction = impl->desc[i].direction; int direction = impl->desc[i].direction;
int res = direction * impl->sort_columns[i]->compareAt(impl->pos, rhs.impl->pos, *(rhs.impl->sort_columns[i]), direction); int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), direction);
if (res > 0) if (res > 0)
return true; return true;
if (res < 0) if (res < 0)
@ -132,6 +133,27 @@ struct SortCursor
} }
return impl->order > rhs.impl->order; return impl->order > rhs.impl->order;
} }
/// Проверяет, что все строки в текущем блоке данного курсора меньше или равны, чем все строки текущего блока другого курсора.
bool totallyLessOrEquals(const SortCursor & rhs) const
{
if (impl->rows == 0 || rhs.impl->rows == 0)
return false;
/// Последняя строка данного курсора не больше первой строки другого.
return !greaterAt(rhs, impl->rows - 1, 0);
}
bool greater(const SortCursor & rhs) const
{
return greaterAt(rhs, impl->pos, rhs.impl->pos);
}
/// Инвертировано, чтобы из priority queue элементы вынимались в порядке по возрастанию.
bool operator< (const SortCursor & rhs) const
{
return greater(rhs);
}
}; };
@ -144,8 +166,7 @@ struct SortCursorWithCollation
SortCursorImpl * operator-> () { return impl; } SortCursorImpl * operator-> () { return impl; }
const SortCursorImpl * operator-> () const { return impl; } const SortCursorImpl * operator-> () const { return impl; }
/// Инвертировано, чтобы из priority queue элементы вынимались в нужном порядке. bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const
bool operator< (const SortCursorWithCollation & rhs) const
{ {
for (size_t i = 0; i < impl->sort_columns_size; ++i) for (size_t i = 0; i < impl->sort_columns_size; ++i)
{ {
@ -154,10 +175,10 @@ struct SortCursorWithCollation
if (impl->need_collation[i]) if (impl->need_collation[i])
{ {
const ColumnString & column_string = typeid_cast<const ColumnString &>(*impl->sort_columns[i]); const ColumnString & column_string = typeid_cast<const ColumnString &>(*impl->sort_columns[i]);
res = column_string.compareAtWithCollation(impl->pos, rhs.impl->pos, *(rhs.impl->sort_columns[i]), *impl->desc[i].collator); res = column_string.compareAtWithCollation(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), *impl->desc[i].collator);
} }
else else
res = impl->sort_columns[i]->compareAt(impl->pos, rhs.impl->pos, *(rhs.impl->sort_columns[i]), direction); res = impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), direction);
res *= direction; res *= direction;
if (res > 0) if (res > 0)
@ -167,6 +188,25 @@ struct SortCursorWithCollation
} }
return impl->order > rhs.impl->order; return impl->order > rhs.impl->order;
} }
bool totallyLessOrEquals(const SortCursorWithCollation & rhs) const
{
if (impl->rows == 0 || rhs.impl->rows == 0)
return false;
/// Последняя строка данного курсора не больше первой строки другого.
return !greaterAt(rhs, impl->rows - 1, 0);
}
bool greater(const SortCursorWithCollation & rhs) const
{
return greaterAt(rhs, impl->pos, rhs.impl->pos);
}
bool operator< (const SortCursorWithCollation & rhs) const
{
return greater(rhs);
}
}; };
} }

View File

@ -136,7 +136,7 @@ private:
void initQueue(std::priority_queue<TSortCursor> & queue); void initQueue(std::priority_queue<TSortCursor> & queue);
template <typename TSortCursor> template <typename TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue); void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
Logger * log = &Logger::get("MergingSortedBlockInputStream"); Logger * log = &Logger::get("MergingSortedBlockInputStream");

View File

@ -111,15 +111,15 @@ Block MergingSortedBlockInputStream::readImpl()
return Block(); return Block();
if (has_collation) if (has_collation)
merge(merged_columns, queue_with_collation); merge(merged_block, merged_columns, queue_with_collation);
else else
merge(merged_columns, queue); merge(merged_block, merged_columns, queue);
return merged_block; return merged_block;
} }
template <typename TSortCursor> template <typename TSortCursor>
void MergingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue) void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{ {
size_t merged_rows = 0; size_t merged_rows = 0;
@ -155,6 +155,57 @@ void MergingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
while (true) while (true)
{ {
/** А вдруг для текущего курсора блок целиком меньше или равен, чем остальные?
* Или в очереди остался только один источник данных? Тогда можно целиком взять блок текущего курсора.
*/
if (queue.empty() || (current.impl->isFirst() && current.totallyLessOrEquals(queue.top())))
{
// std::cerr << "current block is totally less or equals\n";
/// Если в текущем блоке уже есть данные, то сначала вернём его. Мы попадём сюда снова при следующем вызове функции merge.
if (merged_rows != 0)
{
// std::cerr << "merged rows is non-zero\n";
queue.push(current);
return;
}
size_t source_num = 0;
size_t size = cursors.size();
for (; source_num < size; ++source_num)
if (&cursors[source_num] == current.impl)
break;
if (source_num == size)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < num_columns; ++i)
merged_block.unsafeGetByPosition(i).column = source_blocks[source_num].unsafeGetByPosition(i).column;
// std::cerr << "copied columns\n";
size_t merged_rows = merged_block.rows();
if (limit && total_merged_rows + merged_rows > limit)
{
merged_rows = limit - total_merged_rows;
for (size_t i = 0; i < num_columns; ++i)
{
auto & column = merged_block.unsafeGetByPosition(i).column;
column = column->cut(0, merged_rows);
}
cancel();
finished = true;
}
// std::cerr << "fetching next block\n";
total_merged_rows += merged_rows;
fetchNextBlock(current, queue);
return;
}
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
// std::cerr << "Inserting row\n"; // std::cerr << "Inserting row\n";
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
@ -165,7 +216,7 @@ void MergingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
// std::cerr << "moving to next row\n"; // std::cerr << "moving to next row\n";
current->next(); current->next();
if (queue.empty() || !(current < queue.top())) if (queue.empty() || !(current.greater(queue.top())))
{ {
if (count_row_and_check_limit()) if (count_row_and_check_limit())
{ {