fix performance of parsing row formats

This commit is contained in:
Anton Popov 2024-09-02 14:16:00 +00:00
parent b7fccd8617
commit d932d0ae4f
19 changed files with 97 additions and 19 deletions

View File

@ -374,6 +374,12 @@ ColumnCheckpointPtr ColumnArray::getCheckpoint() const
return std::make_shared<ColumnCheckpointWithNested>(size(), getData().getCheckpoint());
}
void ColumnArray::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
getData().updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnArray::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);

View File

@ -162,6 +162,7 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override

View File

@ -309,6 +309,11 @@ public:
return variant_column_ptr->getCheckpoint();
}
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override
{
variant_column_ptr->updateCheckpoint(checkpoint);
}
void rollback(const ColumnCheckpoint & checkpoint) override
{
variant_column_ptr->rollback(checkpoint);

View File

@ -317,6 +317,11 @@ ColumnCheckpointPtr ColumnMap::getCheckpoint() const
return nested->getCheckpoint();
}
void ColumnMap::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
nested->updateCheckpoint(checkpoint);
}
void ColumnMap::rollback(const ColumnCheckpoint & checkpoint)
{
nested->rollback(checkpoint);

View File

@ -103,6 +103,7 @@ public:
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;

View File

@ -310,6 +310,12 @@ ColumnCheckpointPtr ColumnNullable::getCheckpoint() const
return std::make_shared<ColumnCheckpointWithNested>(size(), nested_column->getCheckpoint());
}
void ColumnNullable::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
nested_column->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint)
{
getNullMapData().resize_assume_reserved(checkpoint.size);

View File

@ -144,6 +144,7 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override

View File

@ -686,22 +686,44 @@ ColumnCheckpointPtr ColumnObject::getCheckpoint() const
return std::make_shared<ColumnObjectCheckpoint>(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint());
}
void ColumnObject::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & object_checkpoint = assert_cast<ColumnObjectCheckpoint &>(checkpoint);
auto update_checkpoints = [&](const auto & columns_map, auto & checkpoints_map)
{
for (const auto & [name, column] : columns_map)
{
auto & nested = checkpoints_map[name];
if (!nested)
nested = column->getCheckpoint();
else
column->updateCheckpoint(*nested);
}
};
checkpoint.size = size();
update_checkpoints(typed_paths, object_checkpoint.typed_paths);
update_checkpoints(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->updateCheckpoint(*object_checkpoint.shared_data);
}
void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
for (auto & [name, column] : typed_paths)
{
const auto & nested_checkpoint = object_checkpoint.typed_paths.at(name);
chassert(nested_checkpoint);
column->rollback(*nested_checkpoint);
const auto & nested = object_checkpoint.typed_paths.at(name);
chassert(nested);
column->rollback(*nested);
}
for (auto & [name, column] : dynamic_paths_ptrs)
{
const auto & nested_checkpoint = object_checkpoint.dynamic_paths.at(name);
chassert(nested_checkpoint);
column->rollback(*nested_checkpoint);
const auto & nested = object_checkpoint.dynamic_paths.at(name);
chassert(nested);
column->rollback(*nested);
}
shared_data->rollback(*object_checkpoint.shared_data);

View File

@ -160,6 +160,7 @@ public:
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;

View File

@ -313,6 +313,12 @@ ColumnCheckpointPtr ColumnSparse::getCheckpoint() const
return std::make_shared<ColumnCheckpointWithNested>(size(), values->getCheckpoint());
}
void ColumnSparse::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
values->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint)
{
_size = checkpoint.size;

View File

@ -150,6 +150,7 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;

View File

@ -265,6 +265,16 @@ ColumnCheckpointPtr ColumnTuple::getCheckpoint() const
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnTuple::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == columns.size());
checkpoint.size = size();
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint)
{
column_length = checkpoint.size;

View File

@ -119,6 +119,7 @@ public:
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;

View File

@ -750,6 +750,16 @@ ColumnCheckpointPtr ColumnVariant::getCheckpoint() const
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnVariant::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == variants.size());
checkpoint.size = size();
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);

View File

@ -249,6 +249,7 @@ public:
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;

View File

@ -60,7 +60,7 @@ struct ColumnCheckpoint
virtual ~ColumnCheckpoint() = default;
};
using ColumnCheckpointPtr = std::shared_ptr<const ColumnCheckpoint>;
using ColumnCheckpointPtr = std::shared_ptr<ColumnCheckpoint>;
using ColumnCheckpoints = std::vector<ColumnCheckpointPtr>;
struct ColumnCheckpointWithNested : public ColumnCheckpoint
@ -546,9 +546,12 @@ public:
/// Returns checkpoint of current state of column.
virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared<ColumnCheckpoint>(size()); }
/// Updates the checkpoint with current state. It is used to avoid extra allocations in 'getCheckpoint'.
virtual void updateCheckpoint(ColumnCheckpoint & checkpoint) const { checkpoint.size = size(); }
/// Rollbacks column to the checkpoint.
/// Unlike 'popBack' this method should work correctly even if column has invalid state.
/// Sizes of columns in checkpoint must be less or equal than current.
/// Sizes of columns in checkpoint must be less or equal than current size.
virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); }
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.

View File

@ -25,6 +25,9 @@ StreamingFormatExecutor::StreamingFormatExecutor(
, checkpoints(result_columns.size())
{
connect(format->getPort(), port);
for (size_t i = 0; i < result_columns.size(); ++i)
checkpoints[i] = result_columns[i]->getCheckpoint();
}
MutableColumns StreamingFormatExecutor::getResultColumns()
@ -54,7 +57,8 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
size_t StreamingFormatExecutor::execute()
{
setCheckpoints();
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->updateCheckpoint(*checkpoints[i]);
try
{
@ -117,11 +121,4 @@ size_t StreamingFormatExecutor::insertChunk(Chunk chunk)
return chunk_rows;
}
void StreamingFormatExecutor::setCheckpoints()
{
for (size_t i = 0; i < result_columns.size(); ++i)
checkpoints[i] = result_columns[i]->getCheckpoint();
}
}

View File

@ -43,8 +43,6 @@ public:
void setQueryParameters(const NameToNameMap & parameters);
private:
void setCheckpoints();
const Block header;
const InputFormatPtr format;
const ErrorCallback on_error;

View File

@ -104,7 +104,10 @@ Chunk IRowInputFormat::read()
size_t num_columns = header.columns();
MutableColumns columns = header.cloneEmptyColumns();
ColumnCheckpoints checkpoints(columns.size());
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
checkpoints[column_idx] = columns[column_idx]->getCheckpoint();
block_missing_values.clear();
@ -132,7 +135,7 @@ Chunk IRowInputFormat::read()
try
{
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
checkpoints[column_idx] = columns[column_idx]->getCheckpoint();
columns[column_idx]->updateCheckpoint(*checkpoints[column_idx]);
info.read_columns.clear();
continue_reading = readRow(columns, info);