dbms: less copying of blocks; improvement [#METR-2944].

This commit is contained in:
Alexey Milovidov 2013-11-17 19:14:17 +00:00
parent 037b52e848
commit 405f03d437
5 changed files with 71 additions and 14 deletions

View File

@ -38,7 +38,9 @@ public:
/// нужны, чтобы правильно скопировались индексы
Block(const Block & other);
// Block(Block && other) noexcept; TODO: включить, когда для сборки будет использоваться C++11.
Block & operator= (const Block & other);
// Block & operator= (Block && other) noexcept;
/// вставить столбец в заданную позицию
void insert(size_t position, const ColumnWithNameAndType & elem);
@ -98,6 +100,10 @@ public:
void optimizeNestedArraysOffsets();
/** Тоже самое, только без замены смещений. */
void checkNestedArraysOffsets() const;
void clear();
void swap(Block & other);
Block & ref() { return *this; } /// Используется, чтобы сделать swap с rvalue. Вместо Block tmp = f(); block.swap(tmp); можно написать block.swap(f.ref());
};
typedef std::vector<Block> Blocks;

View File

@ -18,6 +18,11 @@ Block::Block(const Block & other)
*this = other;
}
/*Block::Block(Block && other) noexcept
{
*this = other;
}*/
void Block::addDefaults(NamesAndTypesListPtr required_columns)
{
@ -51,6 +56,15 @@ Block & Block::operator= (const Block & other)
return *this;
}
/*Block & Block::operator= (Block && other) noexcept
{
data = std::move(other.data);
index_by_position = std::move(other.index_by_position);
index_by_name = std::move(other.index_by_name);
return *this;
}*/
void Block::rebuildIndexByPosition()
{
@ -74,16 +88,21 @@ void Block::insert(size_t position, const ColumnWithNameAndType & elem)
}
Container_t::iterator it = data.insert(index_by_position[position], elem);
rebuildIndexByPosition();
index_by_name[elem.name] = it;
index_by_name[elem.name] = it;
index_by_position.resize(index_by_position.size() + 1);
for (size_t i = index_by_position.size() - 1; i > position; --i)
index_by_position[i] = index_by_position[i - 1];
index_by_position[position] = it;
}
void Block::insert(const ColumnWithNameAndType & elem)
{
Container_t::iterator it = data.insert(data.end(), elem);
rebuildIndexByPosition();
index_by_name[elem.name] = it;
index_by_position.push_back(it);
}
@ -103,7 +122,11 @@ void Block::erase(size_t position)
Container_t::iterator it = index_by_position[position];
index_by_name.erase(index_by_name.find(it->name));
data.erase(it);
rebuildIndexByPosition();
for (size_t i = position, size = index_by_position.size() - 1; i < size; ++i)
index_by_position[i] = index_by_position[i + 1];
index_by_position.resize(index_by_position.size() - 1);
}
@ -116,8 +139,13 @@ void Block::erase(const String & name)
Container_t::iterator it = index_it->second;
index_by_name.erase(index_it);
size_t position = std::distance(data.begin(), it);
data.erase(it);
rebuildIndexByPosition();
for (size_t i = position, size = index_by_position.size() - 1; i < size; ++i)
index_by_position[i] = index_by_position[i + 1];
index_by_position.resize(index_by_position.size() - 1);
}
@ -345,4 +373,19 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
return true;
}
void Block::clear()
{
data.clear();
index_by_name.clear();
index_by_position.clear();
}
void Block::swap(Block & other)
{
data.swap(other.data);
index_by_name.swap(other.index_by_name);
index_by_position.swap(other.index_by_position);
}
}

View File

@ -22,12 +22,14 @@ FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, const
Block FilterBlockInputStream::readImpl()
{
Block res;
/// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится.
while (1)
{
Block res = children.back()->read();
res.swap(children.back()->read().ref()); /// Трюк, чтобы работало RVO.
if (!res)
return Block();
return res;
/// Найдём настоящую позицию столбца с фильтром в блоке.
if (filter_column == -1)
@ -42,9 +44,10 @@ Block FilterBlockInputStream::readImpl()
ColumnConstUInt8 * column_const = dynamic_cast<ColumnConstUInt8 *>(&*column);
if (column_const)
{
return column_const->getData()
? res
: Block();
if (!column_const->getData())
res.clear();
return res;
}
ColumnUInt8 * column_vec = dynamic_cast<ColumnUInt8 *>(&*column);

View File

@ -170,11 +170,13 @@ Block IProfilingBlockInputStream::read()
info.started = true;
}
Block res;
if (is_cancelled)
return Block();
return res;
info.work_stopwatch.start();
Block res = readImpl();
res.swap(readImpl().ref()); /// Трюк, чтобы работало RVO.
info.work_stopwatch.stop();
/* if (res)
@ -204,7 +206,10 @@ Block IProfilingBlockInputStream::read()
updateExtremes(res);
if (!checkLimits())
return Block();
{
res.clear();
return res;
}
if (quota != NULL)
checkQuota(res);

View File

@ -255,7 +255,7 @@ void ExpressionActions::Action::execute(Block & block) const
new_block.insert(column);
}
block = new_block;
block.swap(new_block);
break;
}