less copying

This commit is contained in:
Alexander Kuzmenkov 2020-04-21 17:59:57 +03:00
parent ef80a3bac6
commit a32ef00bf1
6 changed files with 79 additions and 52 deletions

View File

@ -240,23 +240,23 @@ create table queries engine File(TSVWithNamesAndTypes, 'queries.rep')
-- Difference > 15% and > rd(99%) -- changed. We can't filter out flaky
-- queries by rd(5%), because it can be zero when the difference is smaller
-- than a typical distribution width. The difference is still real though.
not short and abs(diff) > 0.15 and abs(diff) > rd[4] as changed,
not short and abs(diff) > 0.05 and abs(diff) > rd[4] as changed,
-- Not changed but rd(99%) > 10% -- unstable.
not short and not changed and rd[4] > 0.10 as unstable,
left, right, diff, rd,
replaceAll(_file, '-report.tsv', '') test,
query
if(length(query) < 300, query, substr(query, 1, 298) || '...') query
from file('*-report.tsv', TSV, 'left float, right float, diff float, rd Array(float), query text');
create table changed_perf_tsv engine File(TSV, 'changed-perf.tsv') as
select left, right, diff, rd, test, query from queries where changed
order by rd[3] desc;
order by abs(diff) desc;
create table unstable_queries_tsv engine File(TSV, 'unstable-queries.tsv') as
select left, right, diff, rd, test, query from queries where unstable
order by rd[3] desc;
order by rd[4] desc;
create table unstable_tests_tsv engine File(TSV, 'bad-tests.tsv') as
select test, sum(unstable) u, sum(changed) c, u + c s from queries

View File

@ -169,12 +169,14 @@ if args.report == 'main':
attrs = ['' for c in columns]
for row in rows:
if float(row[2]) < 0.:
faster_queries += 1
attrs[2] = 'style="background: #adbdff"'
else:
slower_queries += 1
attrs[2] = 'style="background: #ffb0a0"'
attrs[2] = ''
if abs(float(row[2])) > 0.10:
if float(row[2]) < 0.:
faster_queries += 1
attrs[2] = 'style="background: #adbdff"'
else:
slower_queries += 1
attrs[2] = 'style="background: #ffb0a0"'
print(tableRow(row, attrs))

View File

@ -20,10 +20,10 @@ Block SquashingBlockInputStream::readImpl()
if (!block)
all_read = true;
auto columns = transform.add(block);
if (!columns.empty())
auto squashed_block = transform.add(std::move(block));
if (squashed_block)
{
return header.cloneWithColumns(std::move(columns));
return squashed_block;
}
}
return {};

View File

@ -12,9 +12,9 @@ SquashingBlockOutputStream::SquashingBlockOutputStream(BlockOutputStreamPtr dst,
void SquashingBlockOutputStream::write(const Block & block)
{
auto squashed_columns = transform.add(block);
if (!squashed_columns.empty())
output->write(header.cloneWithColumns(std::move(squashed_columns)));
auto squashed_block = transform.add(block);
if (squashed_block)
output->write(squashed_block);
}
@ -25,9 +25,9 @@ void SquashingBlockOutputStream::finalize()
all_written = true;
auto squashed_columns = transform.add({});
if (!squashed_columns.empty())
output->write(header.cloneWithColumns(std::move(squashed_columns)));
auto squashed_block = transform.add({});
if (squashed_block)
output->write(squashed_block);
}

View File

@ -15,86 +15,106 @@ SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_b
{
}
Block SquashingTransform::add(Block && input_block)
{
return addImpl<Block &&>(std::move(input_block));
}
Columns SquashingTransform::add(const Block & block)
Block SquashingTransform::add(const Block & input_block)
{
return addImpl<const Block &>(input_block);
}
/*
* To minimize copying, accept two types of argument: const reference for output
* stream, and rvalue reference for input stream, and decide whether to copy
* inside this function. This allows us not to copy Block unless we absolutely
* have to.
*/
template <typename ReferenceType>
Block SquashingTransform::addImpl(ReferenceType input_block)
{
/// End of input stream.
if (!block)
if (!input_block)
{
Columns to_return;
std::swap(to_return, accumulated_columns);
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
auto block_columns = block.getColumns();
/// Just read block is already enough.
if (isEnoughSize(block_columns))
if (isEnoughSize(input_block))
{
/// If no accumulated data, return just read block.
if (accumulated_columns.empty())
if (!accumulated_block)
{
return block_columns;
return std::move(input_block);
}
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
block_columns.swap(accumulated_columns);
return block_columns;
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
/// Accumulated block is already enough.
if (isEnoughSize(accumulated_columns))
if (isEnoughSize(accumulated_block))
{
/// Return accumulated data and place new block to accumulated data.
std::swap(block_columns, accumulated_columns);
return block_columns;
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
append(std::move(block_columns));
append<ReferenceType>(std::move(input_block));
if (isEnoughSize(accumulated_columns))
if (isEnoughSize(accumulated_block))
{
Columns to_return;
std::swap(to_return, accumulated_columns);
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
/// Squashed block is not ready.
return Columns();
return {};
}
void SquashingTransform::append(Columns && block_columns)
template <typename ReferenceType>
void SquashingTransform::append(ReferenceType input_block)
{
if (accumulated_columns.empty())
if (!accumulated_block)
{
std::swap(accumulated_columns, block_columns);
accumulated_block = std::move(input_block);
return;
}
assert(block_columns.size() == accumulated_columns.size());
assert(blocksHaveEqualStructure(input_block, accumulated_block));
for (size_t i = 0, size = block_columns.size(); i < size; ++i)
for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i)
{
auto mutable_column = std::move(*accumulated_columns[i]).mutate();
const auto source_column = input_block.getByPosition(i).column;
auto mutable_column = (*std::move(
accumulated_block.getByPosition(i).column)).mutate();
if (reserve_memory)
{
mutable_column->reserve(min_block_size_bytes);
}
mutable_column->insertRangeFrom(*block_columns[i], 0,
block_columns[i]->size());
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
accumulated_columns[i] = std::move(mutable_column);
accumulated_block.getByPosition(i).column = std::move(mutable_column);
}
}
bool SquashingTransform::isEnoughSize(const Columns & columns)
bool SquashingTransform::isEnoughSize(const Block & block)
{
size_t rows = 0;
size_t bytes = 0;
for (const auto & column : columns)
for (const auto & [column, type, name] : block)
{
if (!rows)
rows = column->size();

View File

@ -28,18 +28,23 @@ public:
/** Add next block and possibly returns squashed block.
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
Columns add(const Block & block);
Block add(Block && block);
Block add(const Block & block);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
bool reserve_memory;
Columns accumulated_columns;
Block accumulated_block;
void append(Columns && block_columns);
template <typename ReferenceType>
Block addImpl(ReferenceType block);
template <typename ReferenceType>
void append(ReferenceType block);
bool isEnoughSize(const Columns & columns);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};